First of all, this post is not a recommendation but more like a “what if” story. What if we use ClickHouse (which is a columnar analytical database) as our main datastore? Well, typically, an analytical database is not a replacement for a transactional or key/value datastore. However, ClickHouse is super efficient for timeseries and provides “sharding” out of the box (scalability beyond one node). So can we use it as our main datastore?
Let’s imagine we are running a webservice and provide a public API. Public API as -a-service has become a good business model: examples include social networks like Facebook/Twitter, messaging as a service like Twilio, and even credit card authorization platforms like Marqeta. Let’s also imagine we need to store all messages (SMS messages, email messages, etc) we are sending and allow our customers to get various information about the message. This information can be a mix of analytical (OLAP) queries (i.e. how many messages was send for some time period and how much it cost) and a typical key/value queries like: “return 1 message by the message id”.
Using a columnar analytical database can be a big challenge here. Although such databases can be very efficient with counts and averages, some queries will be slow or simply non existent. Analytical databases are optimized for a low number of slow queries. The most important limitations of the analytical databases are:
- Deletes and updates are non-existent or slow
- Inserts are efficient for bulk inserts only
- No secondary indexes means that point selects (select by ID) tend to be very slow
This is all true for ClickHouse, however, we may be able to live with it for our task.
To simulate text messages I have used ~3 billion of reddit comments (10 years from 2007 to 2017), downloaded from pushshift.io . Vadim published a blog post about analyzing reddit comments with ClickHouse. In my case, I’m using this data as a simulation of text messages, and will show how we can use ClickHouse as a backend for an API.
Loading the JSON data to Clickhouse
I used the following table in Clickhouse to load all data:
CREATE TABLE reddit.rc( body String, score_hidden Nullable(UInt8), archived Nullable(UInt8), name String, author String, author_flair_text Nullable(String), downs Nullable(Int32), created_utc UInt32, subreddit_id String, link_id Nullable(String), parent_id Nullable(String), score Nullable(Int16), retrieved_on Nullable(UInt32), controversiality Nullable(Int8), gilded Nullable(Int8), id String, subreddit String, ups Nullable(Int16), distinguished Nullable(String), author_flair_css_class Nullable(String), stickied Nullable(UInt8), edited Nullable(UInt8) ) ENGINE = MergeTree() PARTITION BY toYYYYMM(toDate(created_utc)) ORDER BY created_utc ;
Then I used the following command to load the JSON data (downloaded from pushshift.io) to ClickHouse:
$ bzip2 -d -c RC_20*.bz2 | clickhouse-client --input_format_skip_unknown_fields 1 --input_format_allow_errors_num 1000000 -d reddit -n --query="INSERT INTO rc FORMAT JSONEachRow"
The data on disk in ClickHouse is not significantly larger than compressed files, which is great:
# du -sh /data/clickhouse/data/reddit/rc/ 638G /data/clickhouse/data/reddit/rc/ # du -sh /data/reddit/ 404G /data/reddit/
We have ~4 billion rows:
SELECT toDate(min(created_utc)), toDate(max(created_utc)), count(*) FROM rc ┌─toDate(min(created_utc))─┬─toDate(max(created_utc))─┬────count()─┐ │ 2006-01-01 │ 2018-05-31 │ 4148248585 │ └──────────────────────────┴──────────────────────────┴────────────┘ 1 rows in set. Elapsed: 11.554 sec. Processed 4.15 billion rows, 16.59 GB (359.02 million rows/s., 1.44 GB/s.)
The data is partitioned and sorted by created_utc so queries which include created_utc will be able to using partition pruning: therefore skip the not-needed partitions. However, let’s say our API needs to support the following features, which are not common for analytical databases:
- Selecting a single comment/message by ID
- Retrieving the last 10 or 100 of the messages/comments
- Updating a single message in the past (e.g. in the case of messages, we may need to update the final price; in the case of comments, we may need to upvote or downvote a comment)
- Deleting messages
- Text search
With the latest ClickHouse version, all of these features are available, but some of them may not perform fast enough.
Retrieving a single row in ClickHouse
Again, this is not a typical operation in any analytical database, those databases are simply not optimized for it. ClickHouse does not have secondary indexes, and we are using created_utc as a primary key (sort by). So, selecting a message by just ID will require a full table scan:
SELECT id, created_utc FROM rc WHERE id = 'dbumnpz' ┌─id──────┬─created_utc─┐ │ dbumnpz │ 1483228800 │ └─────────┴─────────────┘ 1 rows in set. Elapsed: 18.070 sec. Processed 4.15 billion rows, 66.37 GB (229.57 million rows/s., 3.67 GB/s.)
Only if we know the timestamp (created_utc)… Then it will be lighting fast: ClickHouse will use the primary key:
SELECT * FROM rc WHERE (id = 'dbumnpz') AND (created_utc = 1483228800) ... 1 rows in set. Elapsed: 0.010 sec. Processed 8.19 thousand rows, 131.32 KB (840.27 thousand rows/s., 13.47 MB/s.)
Actually, we can simulate an additional index set by creating a materialized view in ClickHouse:
create materialized view rc_id_v ENGINE MergeTree() PARTITION BY toYYYYMM(toDate(created_utc)) ORDER BY (id) POPULATE AS SELECT id, created_utc from rc;
Here I’m creating a materialized view and populating it initially from the main (rc) table. The view will be updated automatically when there are any inserts into table reddit.rc. The view is actually another MergeTree table sorted by id. Now we can use this query:
SELECT * FROM rc WHERE (id = 'dbumnpz') AND (created_utc = ( SELECT created_utc FROM rc_id_v WHERE id = 'dbumnpz' )) ... 1 rows in set. Elapsed: 0.053 sec. Processed 8.19 thousand rows, 131.32 KB (153.41 thousand rows/s., 2.46 MB/s.)
This is a single query which will join our materialized view to pass the created_utc (timestamp) to the original table. It is a little bit slower but still less than 100ms response time.
Using this trick (materialized views) we can potentially simulate other indexes.
Retrieving the last 10 messages
This is where ClickHouse is not very efficient. Let’s say we want to retrieve the last 10 comments:
SELECT id, created_utc FROM rc ORDER BY created_utc DESC LIMIT 10 ┌─id──────┬─created_utc─┐ │ dzwso7l │ 1527811199 │ │ dzwso7j │ 1527811199 │ │ dzwso7k │ 1527811199 │ │ dzwso7m │ 1527811199 │ │ dzwso7h │ 1527811199 │ │ dzwso7n │ 1527811199 │ │ dzwso7o │ 1527811199 │ │ dzwso7p │ 1527811199 │ │ dzwso7i │ 1527811199 │ │ dzwso7g │ 1527811199 │ └─────────┴─────────────┘ 10 rows in set. Elapsed: 24.281 sec. Processed 4.15 billion rows, 82.96 GB (170.84 million rows/s., 3.42 GB/s.)
In a conventional relational database (like MySQL) this can be done by reading a btree index sequentially from the end, as the index is sorted (like “tail” command on linux). In a partitioned massively parallel database system, the storage format and sorting algorithm may not be optimized for that operation as we are reading multiple partitions in parallel. Currently, an issue has been opened to make the “tailing” based on the primary key much faster: slow order by primary key with small limit on big data. As a temporary workaround we can do something like this:
SELECT count() FROM rc WHERE (created_utc > ( ( SELECT max(created_utc) FROM rc ) - ((60 * 60) * 24))) AND (subreddit = 'programming') ┌─count()─┐ │ 1248 │ └─────────┘ 1 rows in set. Elapsed: 4.510 sec. Processed 3.05 million rows, 56.83 MB (675.38 thousand rows/s., 12.60 MB/s.) ```
It is still a five seconds query. Hopefully, this type of query will become faster in ClickHouse.
Updating / deleting data in ClickHouse
The latest ClickHouse version allows running update/delete in the form of “ALTER TABLE .. UPDATE / DELETE” (it is called mutations in ClickHouse terms). For example, we may want to upvote a specific comment.
SELECT score FROM rc_2017 WHERE (id = 'dbumnpz') AND (created_utc = ( SELECT created_utc FROM rc_id_v WHERE id = 'dbumnpz' )) ┌─score─┐ │ 2 │ └───────┘ 1 rows in set. Elapsed: 0.048 sec. Processed 8.19 thousand rows, 131.08 KB (168.93 thousand rows/s., 2.70 MB/s.) :) alter table rc_2017 update score = score +1 where id = 'dbumnpz' and created_utc = (select created_utc from rc_id_v where id = 'dbumnpz'); ALTER TABLE rc_2017 UPDATE score = score + 1 WHERE (id = 'dbumnpz') AND (created_utc = ( SELECT created_utc FROM rc_id_v WHERE id = 'dbumnpz' )) Ok. 0 rows in set. Elapsed: 0.052 sec.
“Mutation” queries will return immediately and will be executed asynchronously. We can see the progress by reading from the system.mutations table:
select * from system.mutations\G SELECT * FROM system.mutations Row 1: ────── database: reddit table: rc_2017 mutation_id: mutation_857.txt command: UPDATE score = score + 1 WHERE (id = 'dbumnpz') AND (created_utc = (SELECT created_utc FROM reddit.rc_id_v WHERE id = 'dbumnpz')) create_time: 2018-12-27 22:22:05 block_numbers.partition_id: [''] block_numbers.number:  parts_to_do: 0 is_done: 1 1 rows in set. Elapsed: 0.002 sec.
Now we can try deleting comments that have been marked for deletion (body showing “[deleted]”):
ALTER TABLE rc_2017 DELETE WHERE body = '[deleted]' Ok. 0 rows in set. Elapsed: 0.002 sec. :) select * from system.mutations\G SELECT * FROM system.mutations ... Row 2: ────── database: reddit table: rc_2017 mutation_id: mutation_858.txt command: DELETE WHERE body = '[deleted]' create_time: 2018-12-27 22:41:01 block_numbers.partition_id: [''] block_numbers.number:  parts_to_do: 64 is_done: 0 2 rows in set. Elapsed: 0.017 sec.
After a while, we can do the count again:
:) select * from system.mutations\G SELECT * FROM system.mutations ... Row 2: ────── database: reddit table: rc_2017 mutation_id: mutation_858.txt command: DELETE WHERE body = '[deleted]' create_time: 2018-12-27 22:41:01 block_numbers.partition_id: [''] block_numbers.number:  parts_to_do: 0 is_done: 1
As we can see our “mutation” is done.
ClickHouse does not offer full text search, however we can use some text functions. In my previous blog post about ClickHouse I used it to find the most popular wikipedia page of the month. This time I’m trying to find the news keywords of the year using all reddit comments: basically I’m calculating the most frequently used new words for the specific year (algorithm based on an article about finding trending topics using Google Books n-grams data). To do that I’m using the ClickHouse function alphaTokens(body) which will split the “body” field into words. From there, I can count the words or use arrayJoin to create a list (similar to MySQL’s group_concat function). Here is the example:
First I created a table word_by_year_news:
create table word_by_year_news ENGINE MergeTree() PARTITION BY y ORDER BY (y) as select a.w as w, b.y as y, sum(a.occurrences)/b.total as ratio from ( select lower(arrayJoin(alphaTokens(body))) as w, toYear(toDate(created_utc)) as y, count() as occurrences from rc where body <> '[deleted]' and created_utc < toUnixTimestamp('2018-01-01 00:00:00') and created_utc >= toUnixTimestamp('2007-01-01 00:00:00') and subreddit in ('news', 'politics', 'worldnews') group by w, y having length(w) > 4 ) as a ANY INNER JOIN ( select toYear(toDate(created_utc)) as y, sum(length(alphaTokens(body))) as total from rc where body <> '[deleted]' and subreddit in ('news', 'politics', 'worldnews') and created_utc < toUnixTimestamp('2018-01-01 00:00:00') and created_utc >= toUnixTimestamp('2007-01-01 00:00:00') group by y ) AS b ON a.y = b.y group by a.w, b.y, b.total; 0 rows in set. Elapsed: 787.032 sec. Processed 7.35 billion rows, 194.32 GB (9.34 million rows/s., 246.90 MB/s.)
This will store all frequent words (I’m filtering by subreddits; the examples are: “news, politics and worldnews” or “programming”) as well as its occurrence this year; actually I want to store “relative” occurrence which is called “ratio” above: for each word I divide its occurrence by the number of total words this year (this is needed as the number of comments grows significantly year by year).
Now we can actually calculate the words of the year:
SELECT groupArray(w) as words, y + 1 as year FROM ( SELECT w, CAST((y - 1) AS UInt16) AS y, ratio AS a_ratio FROM word_by_year_news WHERE ratio > 0.00001 ) AS a ALL INNER JOIN ( SELECT w, y, ratio AS b_ratio FROM word_by_year_news WHERE ratio > 0.00001 ) AS b USING (w, y) WHERE (y > 0) AND (a_ratio / b_ratio > 3) GROUP BY y ORDER BY y LIMIT 100; 10 rows in set. Elapsed: 0.232 sec. Processed 14.61 million rows, 118.82 MB (63.01 million rows/s., 512.29 MB/s.)
And the results are (here I’m grouping words for each year):
For “programming” subreddit:
┌─year─┬─words─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ 2007 │ ['audio','patents','swing','phones','gmail','opera','devices','phone','adobe','vista','backup','mercurial','mobile','passwords','scala','license','copyright','licenses','photoshop'] │ │ 2008 │ ['webkit','twitter','teacher','android','itunes'] │ │ 2009 │ ['downvotes','upvote','drupal','android','upvoted'] │ │ 2010 │ ['codecs','imgur','floppy','codec','adobe','android'] │ │ 2011 │ ['scala','currency','println'] │ │ 2013 │ ['voting','maven'] │ │ 2014 │ ['compose','xamarin','markdown','scrum','comic'] │ │ 2015 │ ['china','sourceforge','subscription','chinese','kotlin'] │ │ 2016 │ ['systemd','gitlab','autotldr'] │ │ 2017 │ ['offices','electron','vscode','blockchain','flash','collision'] │ └──────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
For news subreddit:
┌─year─┬─words──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ 2008 │ ['michigan','delegates','obama','alaska','georgia','russians','hamas','biden','hussein','barack','elitist','mccain'] │ │ 2009 │ ['stimulus','reform','medicare','franken','healthcare','payer','insurance','downvotes','hospitals','patients','option','health'] │ │ 2010 │ ['blockade','arizona'] │ │ 2011 │ ['protests','occupy','romney','weiner','protesters'] │ │ 2012 │ ['santorum','returns','martin','obamacare','romney'] │ │ 2013 │ ['boston','chemical','surveillance'] │ │ 2014 │ ['plane','poland','radar','subreddits','palestinians','putin','submission','russia','automoderator','compose','rockets','palestinian','hamas','virus','removal','russians','russian'] │ │ 2015 │ ['refugees','refugee','sanders','debates','hillary','removal','participating','removed','greece','clinton'] │ │ 2016 │ ['morons','emails','opponent','establishment','trump','reply','speeches','presidency','clintons','electoral','donald','trumps','downvote','november','subreddit','shill','domain','johnson','classified','bernie','nominee','users','returns','primaries','foundation','voters','autotldr','clinton','email','supporter','election','feedback','clever','leaks','accuse','candidate','upvote','rulesandregs','convention','conduct','uncommon','server','trolls','supporters','hillary'] │ │ 2017 │ ['impeached','downvotes','monitored','accusations','alabama','violation','treason','nazis','index','submit','impeachment','troll','collusion','bannon','neutrality','permanent','insults','violations'] │ └──────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
ClickHouse is a great massively parallel analytical system. It is extremely efficient and can potentially (with some hacks) be used as a main backend database powering a public API gateway serving both realtime and analytical queries. At the same time, it was not originally designed that way. Let me know in the comments if you are using ClickHouse for this or similar projects.