[Hacker News discussion]

On my journey to learn CUDA, I decided to tackle the One Billion Row Challenge with it.

The challenge is simple, but implementing it in CUDA was not. Here I will share my solution that runs in 16.8 seconds on a V100. It’s certainly not the fastest solution, but it is the first one of its kind (no cudf, hand-written kernels only). I challenge other CUDA enthusiasts to make it faster.

Baseline in pure C++

You can’t improve what you don’t measure. Since I’m going to be writing C++ anyways for CUDA, let’s use a pure C++ baseline. My CUDA code should be faster than this.

The approach is straight-forward: read the file line by line, parse it to get the city and temperature, and accumulate them in a STL map.

while (getline(file, line)) {
    istringstream iss(line);
    string station;
    float temp;
    getline(iss, station, ';');
    iss >> temp;

    auto it = stationStats.find(station);
    if (it == stationStats.end()) {
        stationStats[station] = {temp, temp, temp, 1};
    } else {
        Stat& s = it->second;
        s.min = min(s.min, temp);
        s.max = max(s.max, temp);
        s.sum += temp;
        s.count++;
    }
}

ofstream measurements("measurements.out");
for (auto& pair : stationStats) {
    const Stat& s = pair.second;
    float mean = s.sum / s.count;
    measurements << pair.first << "=" << s.min << "/";
    measurements << fixed << setprecision(1) << mean << "/";
    measurements << s.max << endl;
}

This runs in 16.5 minutes. Let’s improve this with CUDA.

Work Partitioning Approach

One Billion Threads?

The whole promise of CUDA and other parallel programming APIs is that you can parallelize your workload across many processes. For CUDA, it’s a SIMT model - a single instruction is executed across multiple threads in parallel.

Great, so let’s just use one billion threads to process one billion lines concurrently!

Unfortunately, we can’t just launch one billion threads. We first need to prepare each line buffer for each thread to process. However, preparing these one billion line buffers requires reading the entire file, line by line (unless the lines were already placed in one billion files, but that would make this the One Billion Files Challenge).

The effort involved in setting up these buffers would essentially replicate the baseline workload, making this approach counterproductive.

Use Byte Offsets

The solution is to prepare file offsets instead of line buffers. These offsets are obtained iteratively, stepping through the entire file buffer by the desired split size (= total file size / desired number of parts), and marking the position of a new line character:

long long split_size = size / num_parts;
long long offset = 0;
std::vector<Part> parts;
while (offset < size) {
    long long seek_offset = std::max(offset + split_size - MAX_CITY_BYTE, 0LL);
    if (seek_offset > size) {
        parts.back().length += size-offset;
        break;
    }
    file.seekg(seek_offset, std::ios::beg);
    char buf[MAX_CITY_BYTE];
    file.read(buf, MAX_CITY_BYTE);

    // Find the new line character in the vicinity. 
    // That will be the boundary between this and the next offset.
    std::streamsize n = file.gcount();
    std::streamsize newline = -1;
    for (int i = n - 1; i >= 0; --i) {
        if (buf[i] == '\n') {
            newline = i;
            break;
        }
    }
    int remaining = n - newline - 1;
    long long next_offset = seek_offset + n - remaining;
    parts.push_back({offset, next_offset-offset});
    offset = next_offset;
}

This is much faster than reading the entire file because we are working with integer byte values. For example, say we want to partition the 14GB input file for two threads. The while loop iterates twice (offset = 0GB -> 7GB -> 14GB). Contrast this to a line-based approach. We would need to iterate 500M (= 1B / 2) times to load 500M lines into our first partition.

In reality, we need more than just two threads. But one billion is too much. Not because our GPU can’t handle one billion threads, but because finding one billion offsets becomes the bottleneck in our overall runtime. We haven’t even gotten to launching our CUDA kernels. We need to minimize our preparation time as much as possible.

For my solution, I create 1M partitions that takes 2.6 seconds out of the entire 16.8 seconds. (In comparison, creating 100M partitions alone takes over 3 minutes.)

CUDA Kernel

The rest of the time is spent in the CUDA kernel (finally!). The idea behind it is simple. Each thread indexes into a different part of the file buffer, parses it to get the cities and temperatures, and updates the min/max/avg statistics.

The implementation, however, is not trivial due to the following reasons (in order of annoyance):

  • CUDA’s AtomicMin and AtomicMax only work with int values. We’ll make our own float-value-accepting variants.
  • No std::string in CUDA. Time to make our own atof, strcmp, getline. Get ready for null terminators '\0'.
  • No std::map either. How can we pass in the city string to array index lookup table into our CUDA kernel?

Let’s go through these one by one.

AtomicMin & AtomicMax for Floats

Any atomic operation in CUDA can be written with atomicCAS. Let’s adapt the example from the official programming guide to write atomicMin and atomicMax for float values. Here’s AtomicMin:

__device__ static float atomicMin(float* address, float val) {
    int* address_as_i = (int*) address;
    int old = *address_as_i, assumed;
    do {
        assumed = old;
        old = ::atomicCAS(address_as_i, assumed,
        // Use `fmaxf` for atomicMax.
        __float_as_int(::fminf(val, __int_as_float(assumed))));
    } while (assumed != old);
    return __int_as_float(old);
}

Now we can atomically update min and max temperature floats.

C Strings

While we don’t have std::string, we have char*. A string is just an array of 8-bit characters, and the raw file buffer (e.g. "Hamburg;12.0\nBulawayo;8.9\nPalembang;38.8...") is no different.

Each thread iterates through this char array, at different offsets and for different lengths (computed at our work partitioning step):

char city[MAX_CITY_BYTE];
char floatstr[5];  // longest temperature float str is -99.9 i.e. 5 bytes

for (int i = 0; i < parts[bx].length; i++) {   // bx is the global thread index
    char c = buffer[parts[bx].offset-buffer_offset + i];
    if (parsing_city) {  // City characters
        if (c == ';') {
            city[index] = '\0';
            index = 0;
            parsing_city = false;
        } else {
            city[index] = c;
            index++;
        }
    } else {  // Float characters
        if (c == '\n') {  // Reached end of line
            floatstr[index] = '\0';

            int stat_index = get_index(cities, city, n_city);
            float temp = cuda_atof(floatstr);

            // The heart of the CUDA kernel.
            // Update (atomically) the temperature statistics.
            // Identical in spirit to the simple C++ version.
            atomicMin(&stats[stat_index].min, temp);
            atomicMax(&stats[stat_index].max, temp);
            atomicAdd(&stats[stat_index].sum, temp);
            atomicAdd(&stats[stat_index].count, 1);

            // reset for next line read
            parsing_city = true;
            index = 0;
            floatstr[0] = '\0'; city[0] = '\0';
        } else {
            floatstr[index] = c;
            index++;
        }
    }
}

It’s not pretty. But it’s necessary as we don’t have the luxury of a getline. After parsing each line, we now have a pair of strings; a city string char city[] and a temperature string char floatstr[]. (The latter requires a conversion from string to float and since we don’t have atof in CUDA, I made my own again)

City String to Index Lookup

GPU Hash Table?

How do we store temperature statistics for each city? In C++ land, we relied on a hash table - using the city string as key, and temperature statistic floats as value. In CUDA, we don’t have such a convenient std::map.

Okay, let’s write our own. How hard can it be? Turns out, damn near impossible because I have 100-byte city strings as keys.

While I found some implementations online, these approaches are limited to 32-bit keys, due to atomic operations being bounded to limited bits (even on the CPU).

To be clear, you don’t need atomic operations to deal with hash table collisions, but you do need them when the collisions can happen across multiple threads i.e. a parallel setting with concurrent inserts into the hash table.

Ask for forgiveness, not permission

So this is where I bend the rules of the original challenge a bit. I assume that a list of all cities is given along with the input file. This list is the data/weather_stations.csv file, which is actually used to generate the one billion rows for the official challenge.

Knowing the list of all possible cities, I avoid using a hash table. I just sort the list of all cities, and pass it to my CUDA kernel. I use the sorted indices as my lookup table. E.g. Say sorted cities list was ["A","B","C"]. Given city "B", the lookup index is its position, 1.

Sorting is important because we can do binary search to find the index in logarithmic time. While still slower than a hash table’s constant time lookup, it’s much faster than linearly searching 40k+ city entries.

// Never thought that I would be writing binary search in CUDA, but here we are.
// Thanks LeetCode!
__device__ int get_index(char* cities, char* city_target, int n_city) {
    int left = 0;
    int right = n_city - 1;
    while (left <= right) {
        int mid = left + (right - left) / 2;
        const char* city_query = cities + mid * MAX_CITY_BYTE;

        int cmp = cuda_strcmp(city_query, city_target);
        if (cmp == 0)
            return mid;
        else if (cmp < 0)
            left = mid + 1;
        else
            right = mid - 1;
    }
    return -1;
}


Now we’re finally done! The heart of the kernel boils down to:

int stat_index = get_index(cities, city, n_city);
float temp = cuda_atof(floatstr);

atomicMin(&stats[stat_index].min, temp);
atomicMax(&stats[stat_index].max, temp);
atomicAdd(&stats[stat_index].sum, temp);
atomicAdd(&stats[stat_index].count, 1);

All this just to do 4 atomic operations.

Profiling

On a V100, the CUDA solution runs in 16.8 seconds. It’s a 60X improvement compared to the 16.5 minutes for our C++ baseline. Here is the script to reproduce this.

Interestingly, the kernel is ~1.5X slower on a T4, and so I used ncu to profile on both devices. One thing that caught my eye was the difference in control divergence.

(v100) ~ ncu --section SourceCounters fast data/measurements.txt 1000000 600000
...
Section: Source Counters
    ------------------------- ----------- --------------
    Metric Name               Metric Unit   Metric Value
    ------------------------- ----------- --------------
    Branch Instructions Ratio           %           0.18
    Branch Instructions              inst 53,736,337,800
    Branch Efficiency                   %          92.73
    Avg. Divergent Branches                10,041,990.22
    ------------------------- ----------- --------------

(t4) ~ ncu --section SourceCounters fast data/measurements.txt 1000000 600000
...
Section: Source Counters
    ------------------------- ----------- --------------
    Metric Name               Metric Unit   Metric Value
    ------------------------- ----------- --------------
    Branch Instructions Ratio           %           0.17
    Branch Instructions              inst 53,870,435,921
    Branch Efficiency                   %          92.73
    Avg. Divergent Branches                20,156,806.42
    ------------------------- ----------- --------------

I expected there to be a lot of control divergence (since I’m using ifs and loops everywhere in my kernel), but I did not expect the divergence to be worse on the T4. Compared to the V100, it has twice as many avg. divergent branches. Could this be a reason why the kernel runs more slowly on a T4? Also, why is the branch efficiency so high if there are so many divergent branches?

Clearly, I’m still learning about the ncu profiler. I welcome any guidance on this area.

Possible Optimization - Privatization using Shared Memory

As I finish writing this blog, I realize that my struct Stat doesn’t need to hold the city char array. In which case, each struct will be 16 bytes (min, max, sum, count), and the entire array of statistics will be 16N bytes, where N is the # of unique cities.

Here, N=41k (based on data/weather_stations.csv), and so the entire array will be 66KB. This should be small enough to fit in shared memory (96KB per SM for Volta). Then, each block can update a private version of stats to reduce contention of the atomic operations. This should lead to a faster solution.

Takeaway

The more and more I did this challenge, the more and more I realized that not all parallel workloads are meant for CUDA. Especially those involving strings and dynamic hash tables. I eventually had to cheat a bit and make a static lookup table. I am genuinely curious: is there a way to not require a list of all cities? (and still be faster than the baseline, of course)

Still, no regrets taking on the challenge. I got to experience CUDA’s limitations first-hand, and that was worthwhile. Now I know I should just stick to Tensor matmults like I usually do.