Parallel and Distributed GROUP BY

Author: Alexey Milovidov, 2016-12-10.

Parallel and Distributed GROUP BY

About Me

Alexey, ClickHouse developer.

Since 2008, I worked on the data processing engine for Yandex.Metrica.

 

ClickHouse — is an analytical DBMS.

One query — a lot of data on input, little on output.

Data needs to be aggregated on the fly.

Metrica 2.0

Example Query

SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u
FROM hits
WHERE MobilePhoneModel != ''
GROUP BY MobilePhoneModel
ORDER BY u DESC

 

To process queries quickly, data needs to be:

 

Query execution pipeline:

— filtering, JOIN, aggregation, sorting...

How to Test Performance?

Benchmarks should be:

Benchmark Example (not the best)

/** Run like this:
for file in MobilePhoneModel PageCharset Params URLDomain UTMSource Referer URL Title; do
 for size in 30000 100000 300000 1000000 5000000; do
  echo
  BEST_METHOD=0
  BEST_RESULT=0
  for method in {1..10}; do
   echo -ne $file $size $method '';
   TOTAL_ELEMS=0
   for i in {0..1000}; do
    TOTAL_ELEMS=$(( $TOTAL_ELEMS + $size ))
    if [[ $TOTAL_ELEMS -gt 25000000 ]]; then break; fi
    ./hash_map_string_3 $size $method < ${file}.bin 2>&1 |
     grep HashMap | grep -oE '[0-9\.]+ elem';
   done | awk -W interactive '{ if ($1 > x) { x = $1 }; printf(".") } END { print x }' |
    tee /tmp/hash_map_string_3_res;
   CUR_RESULT=$(cat /tmp/hash_map_string_3_res | tr -d '.')
   if [[ $CUR_RESULT -gt $BEST_RESULT ]]; then
    BEST_METHOD=$method
    BEST_RESULT=$CUR_RESULT
   fi;
  done;
  echo Best: $BEST_METHOD - $BEST_RESULT
 done;
done
*/

Aggregation

Single Machine, Single Core

Bad Approach

Read data into array; sort by key; iterate through key groups and calculate aggregate functions.

Advantages:

+ simple aggregate function interface; + possibility of more efficient aggregate function implementation; + can run arbitrary reduce scripts in streaming mode.

Disadvantages:

− let N be total data size, and M be number of keys; Works terribly when N > M — in typical case. Wastes O(N) RAM on intermediate data instead of O(M).

Good Approach

Read data, put it in an associative array

key tuple -> states of aggregate functions

update states of aggregate functions.

Which Associative Array?

Lookup table. Hash table.

Binary tree. Skip-list. B-tree.

Trie. Trie+hash table...

Binary Tree

− too much overhead per element;

− terrible cache locality;

− generally slow.

Skip-list. Trie. B-tree...

− designed for a different problem;

Lookup Table

+ perfect for aggregation by numeric keys of no more than ~16 bits;

− doesn't work for slightly more complex cases.

Hash Table

+ my favorite data structure;

− many details.

Trie+Hash Table

+ sometimes there's something to it, see below;

Single Machine, Multiple Cores

1. Trivial Approach

Different threads read different data as possible. They aggregate independently into their local hash tables. When all data is read, merge all hash tables into one. For example, iterate through all local hash tables except the first and move everything to the first one. Read and preliminary aggregation phase is parallelized. Merge phase is sequential. Let N be total data size, and M be number of keys. O(M) work is done sequentially and with large M (GROUP BY cardinality) work doesn't parallelize well. Advantages: trivial. Disadvantages: doesn't scale with high cardinality.

2. Partitioning Approach

For each data block, perform aggregation in two stages: Stage 1. Different threads will process different chunks of the block, whichever they can. In each thread, using a separate hash function, hash the key to thread number and remember it. hash: key -> bucket_num Stage 2. Each thread iterates through entire data block and takes for aggregation only rows with the needed bucket number. Modification: can do everything in one stage — then each thread will calculate hash function from all rows again: works if it's cheap.

Advantages: + scales well with high cardinality and uniform key distribution; + conceptual simplicity. Disadvantages: − if data volume is distributed unevenly across keys, then stage 2 doesn't scale well. This is a typical case. Data volume by keys is almost always distributed by power law. More disadvantages: − if block size is small, the result is too fine-grained multithreading: high synchronization overhead; − if block size is large, poor cache locality; − on second stage, part of memory bandwidth is multiplied by number of threads; − need to compute another hash function, it should be independent from the one in hash table;

3. Parallel Hash Table Merge

Resize hash tables obtained in different threads to the same size. Split them implicitly into different subsets of keys. In different threads merge corresponding subsets of hash table keys. Picture on the board. Disadvantage: − very complex code.

4. Ordered Hash Table Merge

For open addressing linear probing hash tables, or for chaining hash tables, data in hash table is located almost ordered by remainder of dividing hash function by hash table size — up to collision resolution chains. Resize hash tables obtained in different threads to the same size. Make ordered iterator, which will enumerate data in hash table in fixed order. Iteration work volume: number of collision resolution chains * average square of chain lengths. Make merging iterator, which using heap (priority queue) will enumerate all hash tables at once.

Advantages: + no need to move elements anywhere: merge is done inplace. + bonus: suitable for external memory. Disadvantages: − terribly complex code; − for open addressing linear probing hash tables, average square of collision resolution chain lengths is too large; − priority queue is slow; − merge stage doesn't parallelize* * — can be combined with previous approach.

5. Robin Hood Ordered Hash Table Merge

If using Robin Hood hash table, the data (except for O(1) boundary collision resolution chains) will be completely ordered by remainder of dividing hash function by hash table size. Advantages: + seems like a beautiful algorithm. + bonus: suitable for external memory. Disadvantages: − forces use of robin-hood probing; − priority queue is slow; − merge stage doesn't parallelize*

6. Shared Hash Table Under Mutex

Advantages: very simple. Disadvantages: negative scalability.

7. Multiple Small Hash Tables Under Different Mutexes

Which one to use — chosen using separate hash function. Disadvantages: − in typical case data is distributed very unevenly, and threads will compete on one hot bucket. − in case of small hash table, too slow. Advantages: if data is somehow distributed uniformly, it scales somewhat.

8. Shared Hash Table with Spin-lock in Each Cell

Disadvantages: − spin-lock — is very dangerous; very hard to test performance; you will definitely make garbage. − in typical case data is distributed very unevenly, and threads will compete on one hot cell.

9. Lock Free Shared Hash Table

Disadvantages: − lock free hash tables either cannot be resized, or they are very complex; − in typical case data is distributed very unevenly, and threads will compete on one hot cell: false sharing, slow. − complex code, many instructions, everything is slow; − I generally don't like lock-free algorithms.

10. Shared Hash Table + Thread Local Hash Tables

Try to put in shared hash table by locking cell; if cell is already locked — put in local hash table. Then hot keys will go into local hash tables. Local hash tables will be small. At the end merge all local hash tables into global. Additions: can first check for key presence in local hash table. Advantages: + scales excellently; + relatively simple implementation. Disadvantages: − many lookups, many instructions — overall quite slow. Even though thread local hash table is often also cache local.

11. Two-level Hash Table

On first stage, in each thread independently put data into their num_buckets = 256 hash tables, storing different keys. Which one to put into (bucket number) is determined by another hash function, or by separate byte of hash function. Have num_threads * num_buckets hash tables. On second stage merge states num_threads * num_buckets hash tables into one num_buckets hash tables, parallelizing merge by buckets.

Advantages: + scales excellently; + simple implementation; + bonus: hash table resizes are amortized; + bonus: get partitioning for free, which is useful for other pipeline stages. + bonus: suitable for external memory. Disadvantages: − with high cardinality, during merge same amount of work is done as in first stage; − with low cardinality, too many separate hash tables; − with low cardinality, works somewhat slower than trivial approach;

12. Trivial + Two-level Hash Table

Use trivial approach. When there are many different keys, convert to two-level.

This is exactly the approach used in ClickHouse :)

Multiple Machines, Multiple Cores

On different machines are located parts of data, which need to be processed. Differences from shared memory: — almost no possibility of work stealing; — need to explicitly transfer data over network.

1. Trivial Approach

Transfer intermediate results to initiator server. Sequentially put everything into one hash table. Advantages: + trivial; + scales well with low cardinality. Disadvantages: − doesn't scale with high cardinality; − requires RAM for entire result.

2. Ordered Merge

Transfer intermediate results to initiator server in specified order. Merge. Advantages: + uses O(1) RAM; Disadvantages: − doesn't scale with high cardinality; − merging sorted streams (heap) — is slow operation; − requires either sorting results on remote servers, or using one of those fancy algorithms above.

3. Partitioned Merge

Transfer intermediate results to initiator server, split into separate coordinated bucket-partitions, in specified order of buckets. Merge one or several buckets at a time. Advantages: + uses up to num_buckets times less RAM than result size; + can easily parallelize, merging several buckets at once — scales excellently by cores. Disadvantages: − merge is done on one server — query initiator — this stage doesn't scale by servers.

This is exactly the approach used in ClickHouse :)

4. Reshuffle + Partitioned Merge

On remote servers get intermediate results, split into coordinated partitions. Then transfer partitions between servers so, that on different servers are different partitions, and data of one partition ends up on one server. Merge on all servers in parallel, also using multiple cores. Advantages: + scales beautifully; + for INSERT SELECT, result can not be transferred to initiator server at all, but immediately saved into distributed table on cluster. Disadvantages: − complex server coordination;

The End

Questions welcome.