ClickHouse Features for Advanced Developers

Author: Alexey Milovidov, 2018-05-29.

ClickHouse Features
for Advanced Developers

Sampling Key

You have a clickstream
and you store it in unaggregated form.

You need to generate reports for clients on the fly.

This is a typical example of ClickHouse usage.

Sampling Key

Most clients are small,
but there are some really large ones.

You need to get reports "instantly" even for the largest clients.

Solution: declare a SAMPLE key in MergeTree family tables.

Sampling Key

CREATE TABLE ... ENGINE = MergeTree ORDER BY (CounterID, Date, intHash32(UserID)) PARTITION BY toYYYYMM(Date) SAMPLE BY intHash32(UserID)

Sampling Key

SELECT uniq(UserID) FROM hits_all WHERE CounterID = 76543 AND EventDate BETWEEN '2018-03-25' AND '2018-04-25' ┌─uniq(UserID)─┐ │ 47362335 │ └──────────────┘ 1 rows in set. Elapsed: 4.571 sec. Processed 1.17 billion rows, 16.37 GB (255.88 million rows/s., 3.58 GB/s.)

Sampling Key

SELECT uniq(UserID) FROM hits_all SAMPLE 1/10 WHERE CounterID = 76543 AND EventDate BETWEEN '2018-03-25' AND '2018-04-25' ┌─uniq(UserID)─┐ │ 4742578 │ └──────────────┘ 1 rows in set. Elapsed: 0.638 sec. Processed 117.73 million rows, 1.65 GB (184.50 million rows/s., 2.58 GB/s.)

Sampling Key

Requirements:
— must be part of the primary key;

— must be uniformly distributed in its data type:
Bad: Timestamp;
Good: intHash32(UserID);

— must be lightweight to read and compute:
Bad: cityHash64(URL);
Good: intHash32(UserID);

— should not be placed after fine-grained part of PK:
Bad: ORDER BY (Timestamp, sample_key);
Good: ORDER BY (CounterID, Date, sample_key).

Sampling Key

Sampling properties:

— sampling is deterministic;

— works consistently across different tables;

— allows reading less data from disk;

Sampling Key, Bonus

SAMPLE 1/10

— select a subset of 1/10 of all possible sample keys;

SAMPLE 1000000

— select a subset of (at least) 1,000,000 rows, within each shard;
— you can use the virtual column _sample_factor to determine the relative sampling coefficient;

SAMPLE 1/10 OFFSET 1/10

— select a subset from the second 1/10 of all possible sample keys;

SET max_parallel_replicas = 3

— parallelize the query across multiple replicas of each shard;

Aggregate Function Combinators

-If
-Array
-ForEach
-Merge
-State

Example: sumIf(x, cond)

Aggregate Function Combinators: -If

SELECT uniqIf(UserID, RefererDomain = 'yandex.ru') AS users_yandex, uniqIf(UserID, RefererDomain = 'google.ru') AS users_google FROM test.hits ┌─users_yandex─┬─users_google─┐ │ 19731 │ 8149 │ └──────────────┴──────────────┘

Aggregate Function Combinators: -Array

SELECT uniq(arr), uniqArray(arr), groupArray(arr), groupUniqArray(arr), groupArrayArray(arr), groupUniqArrayArray(arr) FROM ( SELECT ['hello', 'world'] AS arr UNION ALL SELECT ['goodbye', 'world'] ) FORMAT Vertical

Aggregate Function Combinators: -Array

Row 1: ────── uniq(arr): 2 uniqArray(arr): 3 groupArray(arr): [['hello','world'],['goodbye','world']] groupUniqArray(arr): [['hello','world'],['goodbye','world']] groupArrayArray(arr): ['hello','world','goodbye','world'] groupUniqArrayArray(arr): ['goodbye','world','hello']

Aggregate Function Combinators

... can be combined with each other

Example: sumArrayIf, sumIfArray.

Aggregate Function Combinators

... can be combined with each other

Example: sumForEachStateForEachIfArrayIfState.

Aggregate Function Computation States Are First-Class Objects

The -State combinator — get the computation state
of an aggregate function;

Example: uniqState(user_id) AS state;

— returns a value of type AggregateFunction(...);

— such values can be saved in tables;

— combined together and get the result using -Merge;

Example: uniqMerge(state) AS result;

Aggregate Function Computation States

SELECT avg(x), uniq(x) FROM ( SELECT 123 AS x UNION ALL SELECT 456 ) ┌─avg(x)─┬─uniq(x)─┐ │ 289.5 │ 2 │ └────────┴─────────┘

Aggregate Function Computation States

SELECT avgState(x), uniqState(x) FROM ( SELECT 123 AS x UNION ALL SELECT 456 ) ┌─avgState(x)─────┬─uniqState(x)─┐ │ C\0\0\0\0\0\0 │ \0▒�P���a� │ └─────────────────┴──────────────┘

Aggregate Function Computation States

SELECT toTypeName(avgState(x)), toTypeName(uniqState(x)) FROM ( SELECT 123 AS x UNION ALL SELECT 456 ) FORMAT Vertical Row 1: ────── toTypeName(avgState(x)): AggregateFunction(avg, UInt16) toTypeName(uniqState(x)): AggregateFunction(uniq, UInt16)

Aggregate Function Computation States

CREATE TABLE t ( users_state AggregateFunction(uniq, UInt64), ... ) ENGINE = AggregatingMergeTree ORDER BY ...
SELECT uniqMerge(uniq_state) FROM t GROUP BY ...

Aggregate Function Computation States

Main use case:

Incremental data aggregation
using the AggregatingMergeTree table engine
as a MATERIALIZED VIEW.

How Could We Improve This Feature?

— versioning of aggregate function states;

— determine when different aggregate functions have the same state (sumState and sumIfState should be compatible);

— add the ability to create an aggregate function state using a regular function (currently you can use the arrayReduce function for this);

— add the ability to insert values of AggregateFunction type into tables by passing a tuple of aggregate function arguments;

— adaptive index_granularity depending on row thickness;

Tunable Consistency

By default, ClickHouse implements:

asynchronous, conflict-free, multi-master replication.

Asynchronous:

Client receives confirmation of INSERT after data is written to one replica; replication itself happens asynchronously.

Replicas can lag and not contain part of the data;

At any given moment, all replicas may not contain some different parts of the data.

By default, you only get eventual consistency.

Tunable Consistency

You can enable strict consistency (linearizability).

SET insert_quorum = 2;


— each INSERT is confirmed after data is written to a quorum of replicas;

— all replicas in the quorum are consistent: they contain data from all previously occurred INSERTs (the sequence of INSERTs is linearized);

SET select_sequential_consistency = 1;


— allows using only confirmed data from consistent replicas for SELECT
(which contain all confirmed INSERTs).

GROUP BY in External Memory

GROUP BY in External Memory

First, you can simply increase max_memory_usage

GROUP BY in External Memory

If that's not enough — enable external aggregation:

max_bytes_before_external_group_by

distributed_aggregation_memory_efficient

Working with Geographic Data

— pointInPolygon;

— pointInEllipses;

— greatCircleDistance;

SELECT pointInPolygon((lat, lon), [(6, 0), (8, 4), (5, 8), (0, 2), ...])

Machine Learning Models

SELECT modelEvaluate('name', f1, ... fn) AS ctr_prediction


https://events.yandex.ru/lib/talks/5330/

Machine Learning Models

How could we improve this feature?

— add simpler regression models;

— training models directly in ClickHouse;

— online model training;

— parameterized models (dictionaries of many models);

Processing Data Without a Server

The clickhouse-local utility

$ clickhouse-local \ --input-format=CSV --output-format=PrettyCompact \ --structure="SearchPhrase String, UserID UInt64" \ --query="SELECT SearchPhrase, count(), uniq(UserID) FROM table \ WHERE SearchPhrase != '' GROUP BY SearchPhrase \ ORDER BY count() DESC LIMIT 20" < hits.csv ┌─SearchPhrase────────────┬─count()─┬─uniq(UserID)─┐ │ интерьер ванной комнаты │ 2166 │ 1 │ │ яндекс │ 1655 │ 478 │ │ весна 2014 мода │ 1549 │ 1 │ │ фриформ фото │ 1480 │ 1 │ │ анджелина джоли │ 1245 │ 1 │

Processing Data Without a Server

Bonus: ability to process data from a stopped clickhouse-server.

Processing Data Without a Server

How could we make this feature better?

— add more supported formats for Date and DateTime in text form;

— add Avro, Parquet formats;

— flexible settings for CSV format;

— "template" and "regexp" formats for trash data;

Cross-Cluster Copying

clickhouse-copier

— runs on arbitrary servers in any number of instances;

— tasks are coordinated through ZooKeeper;

— unit of work — one partition on one shard of the resulting table;

— copying is performed reliably and fault-tolerantly;

— configurable limit on network bandwidth usage and number of concurrent tasks;

Cross-Cluster Copying

Example in Yandex.Metrica

— 538 -> 240 servers;

— sharding by CounterID -> sharding by UserID;

— lz4 -> zstd;

.

Web site: https://clickhouse.com/

Google groups: https://groups.google.com/forum/#!forum/clickhouse

Maillist: [email protected]

Telegram chat: https://telegram.me/clickhouse_ru (more than 1500 participants) and https://telegram.me/clickhouse_en

GitHub: https://github.com/ClickHouse/ClickHouse/

Twitter: https://twitter.com/ClickHouseDB

+ meetups. Moscow, Saint Petersburg, Novosibirsk, Yekaterinburg, Minsk, Nizhny Novgorod, Berlin, Palo Alto, Beijing, Sunnyvale, San Francisco...