Should You Use ClickHouse as a Main Operational Database?

ClickHouse as a main operational database

ClickHouse as a main operational databaseFirst of all, this post is not a recommendation but more like a “what if” story. What if we use ClickHouse (which is a columnar analytical database) as our main datastore? Well, typically, an analytical database is not a replacement for a transactional or key/value datastore. However, ClickHouse is super efficient for timeseries and provides “sharding” out of the box (scalability beyond one node).  So can we use it as our main datastore?

Let’s imagine we are running a webservice and provide a public API. Public API as -a-service has become a good business model: examples include social networks like Facebook/Twitter, messaging as a service like Twilio, and even credit card authorization platforms like Marqeta. Let’s also imagine we need to store all messages (SMS messages, email messages, etc) we are sending and allow our customers to get various information about the message. This information can be a mix of analytical (OLAP) queries (i.e. how many messages was send for some time period and how much it cost) and a typical key/value queries like: “return 1 message by the message id”.

Using a columnar analytical database can be a big challenge here. Although such databases can be very efficient with counts and averages, some queries will be slow or simply non existent. Analytical databases are optimized for a low number of slow queries. The most important limitations of the analytical databases are:

  1. Deletes and updates are non-existent or slow
  2. Inserts are efficient for bulk inserts only
  3. No secondary indexes means that point selects (select by ID) tend to be very slow

This is all true for ClickHouse, however, we may be able to live with it for our task.

To simulate text messages I have used ~3 billion of reddit comments (10 years from 2007 to 2017), downloaded from pushshift.io . Vadim published a blog post about analyzing reddit comments with ClickHouse. In my case, I’m using this data as a simulation of text messages, and will show how we can use ClickHouse as a backend for an API.

Loading the JSON data to Clickhouse

I used the following table in Clickhouse to load all data:

CREATE TABLE reddit.rc(
body String,
score_hidden Nullable(UInt8),
archived Nullable(UInt8),
name String,
author String,
author_flair_text Nullable(String),
downs Nullable(Int32),
created_utc UInt32,
subreddit_id String,
link_id Nullable(String),
parent_id Nullable(String),
score Nullable(Int16),
retrieved_on Nullable(UInt32),
controversiality Nullable(Int8),
gilded Nullable(Int8),
id String,
subreddit String,
ups Nullable(Int16),
distinguished Nullable(String),
author_flair_css_class Nullable(String),
stickied Nullable(UInt8),
edited Nullable(UInt8)
) ENGINE = MergeTree() PARTITION BY toYYYYMM(toDate(created_utc)) ORDER BY created_utc ;

Then I used the following command to load the JSON data (downloaded from pushshift.io) to ClickHouse:

$ bzip2 -d -c RC_20*.bz2 | clickhouse-client --input_format_skip_unknown_fields 1 --input_format_allow_errors_num 1000000 -d reddit -n --query="INSERT INTO rc FORMAT JSONEachRow"

The data on disk in ClickHouse is not significantly larger than compressed files, which is great:

#  du -sh /data/clickhouse/data/reddit/rc/
638G    /data/clickhouse/data/reddit/rc/
# du -sh /data/reddit/
404G    /data/reddit/

We have ~4 billion rows:

SELECT
    toDate(min(created_utc)),
    toDate(max(created_utc)),
    count(*)
FROM rc
┌─toDate(min(created_utc))─┬─toDate(max(created_utc))─┬────count()─┐
│               2006-01-01 │               2018-05-31 │ 4148248585 │
└──────────────────────────┴──────────────────────────┴────────────┘
1 rows in set. Elapsed: 11.554 sec. Processed 4.15 billion rows, 16.59 GB (359.02 million rows/s., 1.44 GB/s.)

The data is partitioned and sorted by created_utc so queries which include created_utc will be able to using partition pruning: therefore skip the not-needed partitions. However, let’s say our API needs to support the following features, which are not common for analytical databases:

  1. Selecting a single comment/message by ID
  2. Retrieving the last 10 or 100 of the messages/comments
  3. Updating a single message in the past (e.g. in the case of messages, we may need to update the final price; in the case of comments, we may need to upvote or downvote a comment)
  4. Deleting messages
  5. Text search

With the latest ClickHouse version, all of these features are available, but some of them may not perform fast enough.

Retrieving a single row in ClickHouse

Again, this is not a typical operation in any analytical database, those databases are simply not optimized for it. ClickHouse does not have secondary indexes, and we are using created_utc as a primary key (sort by). So, selecting a message by just ID will require a full table scan:

SELECT
    id,
    created_utc
FROM rc
WHERE id = 'dbumnpz'
┌─id──────┬─created_utc─┐
│ dbumnpz │  1483228800 │
└─────────┴─────────────┘
1 rows in set. Elapsed: 18.070 sec. Processed 4.15 billion rows, 66.37 GB (229.57 million rows/s., 3.67 GB/s.)

Only if we know the timestamp (created_utc)… Then it will be lighting fast: ClickHouse will use the primary key:

SELECT *
FROM rc
WHERE (id = 'dbumnpz') AND (created_utc = 1483228800)
...
1 rows in set. Elapsed: 0.010 sec. Processed 8.19 thousand rows, 131.32 KB (840.27 thousand rows/s., 13.47 MB/s.)

Actually, we can simulate an additional index set by creating a materialized view in ClickHouse:

create materialized view rc_id_v
ENGINE MergeTree() PARTITION BY toYYYYMM(toDate(created_utc)) ORDER BY (id)
POPULATE AS SELECT id, created_utc from rc;

Here I’m creating a materialized view and populating it initially from the main (rc) table. The view will be updated automatically when there are any inserts into table reddit.rc. The view is actually another MergeTree table sorted by id. Now we can use this query:

SELECT *
FROM rc
WHERE (id = 'dbumnpz') AND (created_utc =
(
    SELECT created_utc
    FROM rc_id_v
    WHERE id = 'dbumnpz'
))
...
1 rows in set. Elapsed: 0.053 sec. Processed 8.19 thousand rows, 131.32 KB (153.41 thousand rows/s., 2.46 MB/s.)

This is a single query which will join our materialized view to pass the created_utc (timestamp) to the original table. It is a little bit slower but still less than 100ms response time.

Using this trick (materialized views) we can potentially simulate other indexes.

Retrieving the last 10 messages

This is where ClickHouse is not very efficient. Let’s say we want to retrieve the last 10 comments:

SELECT
    id,
    created_utc
FROM rc
ORDER BY created_utc DESC
LIMIT 10
┌─id──────┬─created_utc─┐
│ dzwso7l │  1527811199 │
│ dzwso7j │  1527811199 │
│ dzwso7k │  1527811199 │
│ dzwso7m │  1527811199 │
│ dzwso7h │  1527811199 │
│ dzwso7n │  1527811199 │
│ dzwso7o │  1527811199 │
│ dzwso7p │  1527811199 │
│ dzwso7i │  1527811199 │
│ dzwso7g │  1527811199 │
└─────────┴─────────────┘
10 rows in set. Elapsed: 24.281 sec. Processed 4.15 billion rows, 82.96 GB (170.84 million rows/s., 3.42 GB/s.)

In a conventional relational database (like MySQL) this can be done by reading a btree index sequentially from the end, as the index is sorted (like “tail” command on linux). In a partitioned massively parallel database system, the storage format and sorting algorithm may not be optimized for that operation as we are reading multiple partitions in parallel. Currently, an issue has been opened to make the “tailing” based on the primary key much faster: slow order by primary key with small limit on big data. As a temporary workaround we can do something like this:

SELECT count()
FROM rc
WHERE (created_utc > (
(
    SELECT max(created_utc)
    FROM rc
) - ((60 * 60) * 24))) AND (subreddit = 'programming')
┌─count()─┐
│    1248 │
└─────────┘
1 rows in set. Elapsed: 4.510 sec. Processed 3.05 million rows, 56.83 MB (675.38 thousand rows/s., 12.60 MB/s.) ```

It is still a five seconds query. Hopefully, this type of query will become faster in ClickHouse.

Updating / deleting data in ClickHouse

The latest ClickHouse version allows running update/delete in the form of “ALTER TABLE .. UPDATE / DELETE” (it is called mutations in ClickHouse terms). For example, we may want to upvote a specific comment.

SELECT score
FROM rc_2017
WHERE (id = 'dbumnpz') AND (created_utc =
(
    SELECT created_utc
    FROM rc_id_v
    WHERE id = 'dbumnpz'
))
┌─score─┐
│     2 │
└───────┘
1 rows in set. Elapsed: 0.048 sec. Processed 8.19 thousand rows, 131.08 KB (168.93 thousand rows/s., 2.70 MB/s.)
:) alter table rc_2017 update score = score +1 where id =  'dbumnpz' and created_utc = (select created_utc from rc_id_v where id =  'dbumnpz');
ALTER TABLE rc_2017
    UPDATE score = score + 1 WHERE (id = 'dbumnpz') AND (created_utc =
    (
        SELECT created_utc
        FROM rc_id_v
        WHERE id = 'dbumnpz'
    ))
Ok.
0 rows in set. Elapsed: 0.052 sec.

“Mutation” queries will return immediately and will be executed asynchronously. We can see the progress by reading from the system.mutations table:

select * from system.mutations\G
SELECT *
FROM system.mutations
Row 1:
──────
database:                   reddit
table:                      rc_2017
mutation_id:                mutation_857.txt
command:                    UPDATE score = score + 1 WHERE (id = 'dbumnpz') AND (created_utc = (SELECT created_utc FROM reddit.rc_id_v  WHERE id = 'dbumnpz'))
create_time:                2018-12-27 22:22:05
block_numbers.partition_id: ['']
block_numbers.number:       [857]
parts_to_do:                0
is_done:                    1
1 rows in set. Elapsed: 0.002 sec.

Now we can try deleting comments that have been marked for deletion (body showing “[deleted]”):

ALTER TABLE rc_2017
    DELETE WHERE body = '[deleted]'
Ok.
0 rows in set. Elapsed: 0.002 sec.
:) select * from system.mutations\G
SELECT *
FROM system.mutations
...
Row 2:
──────
database:                   reddit
table:                      rc_2017
mutation_id:                mutation_858.txt
command:                    DELETE WHERE body = '[deleted]'
create_time:                2018-12-27 22:41:01
block_numbers.partition_id: ['']
block_numbers.number:       [858]
parts_to_do:                64
is_done:                    0
2 rows in set. Elapsed: 0.017 sec.

After a while, we can do the count again:

:) select * from system.mutations\G
SELECT *
FROM system.mutations
...
Row 2:
──────
database:                   reddit
table:                      rc_2017
mutation_id:                mutation_858.txt
command:                    DELETE WHERE body = '[deleted]'
create_time:                2018-12-27 22:41:01
block_numbers.partition_id: ['']
block_numbers.number:       [858]
parts_to_do:                0
is_done:                    1

As we can see our “mutation” is done.

Text analysis

ClickHouse does not offer full text search, however we can use some text functions. In my previous blog post about ClickHouse I used it to find the most popular wikipedia page of the month. This time I’m trying to find the news keywords of the year using all reddit comments: basically I’m calculating the most frequently used new words for the specific year (algorithm based on an article about finding trending topics using Google Books n-grams data). To do that I’m using the ClickHouse function alphaTokens(body) which will split the “body” field into words. From there, I can count the words or use arrayJoin to create a list (similar to MySQL’s group_concat function). Here is the example:

First I created a table word_by_year_news:

create table word_by_year_news ENGINE MergeTree() PARTITION BY y ORDER BY (y) as
select a.w as w, b.y as y, sum(a.occurrences)/b.total as ratio from
(
select
 lower(arrayJoin(alphaTokens(body))) as w,
 toYear(toDate(created_utc)) as y,
 count() as occurrences
from rc
where body <> '[deleted]'
and created_utc < toUnixTimestamp('2018-01-01 00:00:00')
and created_utc >= toUnixTimestamp('2007-01-01 00:00:00')
and subreddit in ('news', 'politics', 'worldnews')
group by w, y
having length(w) > 4
) as a
ANY INNER JOIN
(
select
 toYear(toDate(created_utc)) as y,
 sum(length(alphaTokens(body))) as total
from rc
where body <> '[deleted]'
and subreddit in ('news', 'politics', 'worldnews')
and created_utc < toUnixTimestamp('2018-01-01 00:00:00')
and created_utc >= toUnixTimestamp('2007-01-01 00:00:00')
group by y
) AS b
ON a.y = b.y
group by
  a.w,
  b.y,
  b.total;
0 rows in set. Elapsed: 787.032 sec. Processed 7.35 billion rows, 194.32 GB (9.34 million rows/s., 246.90 MB/s.)

This will store all frequent words (I’m filtering by subreddits; the examples are: “news, politics and worldnews” or “programming”) as well as its occurrence this year; actually I want to store “relative” occurrence which is called “ratio” above: for each word I divide its occurrence by the number of total words this year (this is needed as the number of comments grows significantly year by year).

Now we can actually calculate the words of the year:

SELECT
    groupArray(w) as words,
    y + 1 as year
FROM
(
    SELECT
        w,
        CAST((y - 1) AS UInt16) AS y,
        ratio AS a_ratio
    FROM word_by_year_news
    WHERE ratio > 0.00001
) AS a
ALL INNER JOIN
(
    SELECT
        w,
        y,
        ratio AS b_ratio
    FROM word_by_year_news
    WHERE ratio > 0.00001
) AS b USING (w, y)
WHERE (y > 0) AND (a_ratio / b_ratio > 3)
GROUP BY y
ORDER BY
    y
LIMIT 100;
10 rows in set. Elapsed: 0.232 sec. Processed 14.61 million rows, 118.82 MB (63.01 million rows/s., 512.29 MB/s.)

And the results are (here I’m grouping words for each year):

For “programming” subreddit:

┌─year─┬─words─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 2007 │ ['audio','patents','swing','phones','gmail','opera','devices','phone','adobe','vista','backup','mercurial','mobile','passwords','scala','license','copyright','licenses','photoshop'] │
│ 2008 │ ['webkit','twitter','teacher','android','itunes']                                                                                                                                     │
│ 2009 │ ['downvotes','upvote','drupal','android','upvoted']                                                                                                                                   │
│ 2010 │ ['codecs','imgur','floppy','codec','adobe','android']                                                                                                                                 │
│ 2011 │ ['scala','currency','println']                                                                                                                                                        │
│ 2013 │ ['voting','maven']                                                                                                                                                                    │
│ 2014 │ ['compose','xamarin','markdown','scrum','comic']                                                                                                                                      │
│ 2015 │ ['china','sourceforge','subscription','chinese','kotlin']                                                                                                                             │
│ 2016 │ ['systemd','gitlab','autotldr']                                                                                                                                                       │
│ 2017 │ ['offices','electron','vscode','blockchain','flash','collision']                                                                                                                      │
└──────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

For news subreddit:

┌─year─┬─words──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 2008 │ ['michigan','delegates','obama','alaska','georgia','russians','hamas','biden','hussein','barack','elitist','mccain']                                                                                                                                       │
│ 2009 │ ['stimulus','reform','medicare','franken','healthcare','payer','insurance','downvotes','hospitals','patients','option','health']                                                                                                                           │
│ 2010 │ ['blockade','arizona']                                                                                                                                                                                                                                     │
│ 2011 │ ['protests','occupy','romney','weiner','protesters']                                                                                                                                                                                                       │
│ 2012 │ ['santorum','returns','martin','obamacare','romney']                                                                                                                                                                                                       │
│ 2013 │ ['boston','chemical','surveillance']                                                                                                                                                                                                                       │
│ 2014 │ ['plane','poland','radar','subreddits','palestinians','putin','submission','russia','automoderator','compose','rockets','palestinian','hamas','virus','removal','russians','russian']                                                                      │
│ 2015 │ ['refugees','refugee','sanders','debates','hillary','removal','participating','removed','greece','clinton']                                                                                                                                                │
│ 2016 │ ['morons','emails','opponent','establishment','trump','reply','speeches','presidency','clintons','electoral','donald','trumps','downvote','november','subreddit','shill','domain','johnson','classified','bernie','nominee','users','returns','primaries','foundation','voters','autotldr','clinton','email','supporter','election','feedback','clever','leaks','accuse','candidate','upvote','rulesandregs','convention','conduct','uncommon','server','trolls','supporters','hillary'] │
│ 2017 │ ['impeached','downvotes','monitored','accusations','alabama','violation','treason','nazis','index','submit','impeachment','troll','collusion','bannon','neutrality','permanent','insults','violations']                                                    │
└──────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Conclusion

ClickHouse is a great massively parallel analytical system. It is extremely efficient and can potentially (with some hacks) be used as a main backend database powering a public API gateway serving both realtime and analytical queries. At the same time, it was not originally designed that way. Let me know in the comments if you are using ClickHouse for this or similar projects.


Photo by John Baker on Unsplash

 

 

关注dbDao.com的新浪微博

扫码加入微信Oracle小密圈,了解Oracle最新技术下载分享资源

TEL/電話+86 13764045638
Email service@parnassusdata.com
QQ 47079569