Author: Alexey Milovidov, 2018-05-29.
You have a clickstream
and you store it in unaggregated form.
You need to generate reports for clients on the fly.
This is a typical example of ClickHouse usage.
Most clients are small,
but there are some really large ones.
You need to get reports "instantly" even for the largest clients.
Solution: declare a SAMPLE key in MergeTree family tables.
CREATE TABLE ... ENGINE = MergeTree
ORDER BY (CounterID, Date, intHash32(UserID))
PARTITION BY toYYYYMM(Date)
SAMPLE BY intHash32(UserID)
SELECT uniq(UserID) FROM hits_all
WHERE CounterID = 76543
AND EventDate BETWEEN '2018-03-25' AND '2018-04-25'
┌─uniq(UserID)─┐
│ 47362335 │
└──────────────┘
1 rows in set. Elapsed: 4.571 sec.
Processed 1.17 billion rows, 16.37 GB
(255.88 million rows/s., 3.58 GB/s.)
SELECT uniq(UserID) FROM hits_all
SAMPLE 1/10
WHERE CounterID = 76543
AND EventDate BETWEEN '2018-03-25' AND '2018-04-25'
┌─uniq(UserID)─┐
│ 4742578 │
└──────────────┘
1 rows in set. Elapsed: 0.638 sec.
Processed 117.73 million rows, 1.65 GB
(184.50 million rows/s., 2.58 GB/s.)
Requirements:
— must be part of the primary key;
— must be uniformly distributed in its data type:
Bad: Timestamp;
Good: intHash32(UserID);
— must be lightweight to read and compute:
Bad: cityHash64(URL);
Good: intHash32(UserID);
— should not be placed after fine-grained part of PK:
Bad: ORDER BY (Timestamp, sample_key);
Good: ORDER BY (CounterID, Date, sample_key).
Sampling properties:
— sampling is deterministic;
— works consistently across different tables;
— allows reading less data from disk;
SAMPLE 1/10
— select a subset of 1/10 of all possible sample keys;
SAMPLE 1000000
— select a subset of (at least) 1,000,000 rows, within each shard;
— you can use the virtual column _sample_factor to determine the relative sampling coefficient;
SAMPLE 1/10 OFFSET 1/10
— select a subset from the second 1/10 of all possible sample keys;
SET max_parallel_replicas = 3
— parallelize the query across multiple replicas of each shard;
-If
-Array
-ForEach
-Merge
-State
Example: sumIf(x, cond)
SELECT
uniqIf(UserID, RefererDomain = 'yandex.ru')
AS users_yandex,
uniqIf(UserID, RefererDomain = 'google.ru')
AS users_google
FROM test.hits
┌─users_yandex─┬─users_google─┐
│ 19731 │ 8149 │
└──────────────┴──────────────┘
SELECT
uniq(arr),
uniqArray(arr),
groupArray(arr),
groupUniqArray(arr),
groupArrayArray(arr),
groupUniqArrayArray(arr)
FROM
(
SELECT ['hello', 'world'] AS arr
UNION ALL
SELECT ['goodbye', 'world']
)
FORMAT Vertical
Row 1:
──────
uniq(arr): 2
uniqArray(arr): 3
groupArray(arr): [['hello','world'],['goodbye','world']]
groupUniqArray(arr): [['hello','world'],['goodbye','world']]
groupArrayArray(arr): ['hello','world','goodbye','world']
groupUniqArrayArray(arr): ['goodbye','world','hello']
... can be combined with each other
Example: sumArrayIf, sumIfArray.
... can be combined with each other
Example: sumForEachStateForEachIfArrayIfState.
The -State combinator — get the computation state
of an aggregate function;
Example: uniqState(user_id) AS state;
— returns a value of type AggregateFunction(...);
— such values can be saved in tables;
— combined together and get the result using -Merge;
Example: uniqMerge(state) AS result;
SELECT
avg(x),
uniq(x)
FROM
(
SELECT 123 AS x
UNION ALL
SELECT 456
)
┌─avg(x)─┬─uniq(x)─┐
│ 289.5 │ 2 │
└────────┴─────────┘
SELECT
avgState(x),
uniqState(x)
FROM
(
SELECT 123 AS x
UNION ALL
SELECT 456
)
┌─avgState(x)─────┬─uniqState(x)─┐
│ C\0\0\0\0\0\0 │ \0▒�P���a� │
└─────────────────┴──────────────┘
SELECT
toTypeName(avgState(x)),
toTypeName(uniqState(x))
FROM
(
SELECT 123 AS x
UNION ALL
SELECT 456
)
FORMAT Vertical
Row 1:
──────
toTypeName(avgState(x)): AggregateFunction(avg, UInt16)
toTypeName(uniqState(x)): AggregateFunction(uniq, UInt16)
CREATE TABLE t
(
users_state AggregateFunction(uniq, UInt64),
...
) ENGINE = AggregatingMergeTree ORDER BY ...
SELECT uniqMerge(uniq_state)
FROM t GROUP BY ...
Main use case:
Incremental data aggregation
using the AggregatingMergeTree table engine
as a MATERIALIZED VIEW.
— versioning of aggregate function states;
— determine when different aggregate functions have the same state (sumState and sumIfState should be compatible);
— add the ability to create an aggregate function state using a regular function (currently you can use the arrayReduce function for this);
— add the ability to insert values of AggregateFunction type into tables by passing a tuple of aggregate function arguments;
— adaptive index_granularity depending on row thickness;
By default, ClickHouse implements:
asynchronous, conflict-free, multi-master replication.
Asynchronous:
Client receives confirmation of INSERT after data is written to one replica; replication itself happens asynchronously.
Replicas can lag and not contain part of the data;
At any given moment, all replicas may not contain some different parts of the data.
By default, you only get eventual consistency.
You can enable strict consistency (linearizability).
SET insert_quorum = 2;
— each INSERT is confirmed after data is written to a quorum of replicas;
— all replicas in the quorum are consistent: they contain data from all previously occurred INSERTs (the sequence of INSERTs is linearized);
SET select_sequential_consistency = 1;
— allows using only confirmed data from consistent replicas for SELECT
(which contain all confirmed INSERTs).
First, you can simply increase max_memory_usage
If that's not enough — enable external aggregation:
max_bytes_before_external_group_by
distributed_aggregation_memory_efficient
— pointInPolygon;
— pointInEllipses;
— greatCircleDistance;
SELECT pointInPolygon((lat, lon),
[(6, 0), (8, 4), (5, 8), (0, 2), ...])
SELECT modelEvaluate('name', f1, ... fn)
AS ctr_prediction
How could we improve this feature?
— add simpler regression models;
— training models directly in ClickHouse;
— online model training;
— parameterized models (dictionaries of many models);
The clickhouse-local utility
$ clickhouse-local \
--input-format=CSV --output-format=PrettyCompact \
--structure="SearchPhrase String, UserID UInt64" \
--query="SELECT SearchPhrase, count(), uniq(UserID)
FROM table \
WHERE SearchPhrase != '' GROUP BY SearchPhrase \
ORDER BY count() DESC LIMIT 20" < hits.csv
┌─SearchPhrase────────────┬─count()─┬─uniq(UserID)─┐
│ интерьер ванной комнаты │ 2166 │ 1 │
│ яндекс │ 1655 │ 478 │
│ весна 2014 мода │ 1549 │ 1 │
│ фриформ фото │ 1480 │ 1 │
│ анджелина джоли │ 1245 │ 1 │
Bonus: ability to process data from a stopped clickhouse-server.
How could we make this feature better?
— add more supported formats for Date and DateTime in text form;
— add Avro, Parquet formats;
— flexible settings for CSV format;
— "template" and "regexp" formats for trash data;
clickhouse-copier
— runs on arbitrary servers in any number of instances;
— tasks are coordinated through ZooKeeper;
— unit of work — one partition on one shard of the resulting table;
— copying is performed reliably and fault-tolerantly;
— configurable limit on network bandwidth usage and number of concurrent tasks;
Example in Yandex.Metrica
— 538 -> 240 servers;
— sharding by CounterID -> sharding by UserID;
— lz4 -> zstd;
Web site: https://clickhouse.com/
Google groups: https://groups.google.com/forum/#!forum/clickhouse
Maillist: [email protected]
Telegram chat: https://telegram.me/clickhouse_ru (more than 1500 participants) and https://telegram.me/clickhouse_en
GitHub: https://github.com/ClickHouse/ClickHouse/
Twitter: https://twitter.com/ClickHouseDB
+ meetups. Moscow, Saint Petersburg, Novosibirsk, Yekaterinburg, Minsk, Nizhny Novgorod, Berlin, Palo Alto, Beijing, Sunnyvale, San Francisco...