We have mentioned ClickHouse in some recent posts (ClickHouse: New Open Source Columnar Database, Column Store Database Benchmarks: MariaDB ColumnStore vs. Clickhouse vs. Apache Spark), where it showed excellent results. ClickHouse by itself seems to be event-oriented RDBMS, as its name suggests (clicks). Its primary purpose, using Yandex Metrica (the system similar to Google Analytics), also points to an event-based nature. We also can see there is a requirement for date-stamped columns.
It is possible, however, to use ClickHouse in a general analytical workload. This blog post shares my findings. For these tests, I used a Star Schema benchmark — slightly-modified so that able to handle ClickHouse specifics.
First, let’s talk about schemas. We need to adjust to ClickHouse data types. For example, the biggest fact table in SSB is “lineorder”. Below is how it is defined for Amazon RedShift (as taken from https://docs.aws.amazon.com/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html):
CREATE TABLE lineorder ( lo_orderkey INTEGER NOT NULL, lo_linenumber INTEGER NOT NULL, lo_custkey INTEGER NOT NULL, lo_partkey INTEGER NOT NULL, lo_suppkey INTEGER NOT NULL, lo_orderdate INTEGER NOT NULL, lo_orderpriority VARCHAR(15) NOT NULL, lo_shippriority VARCHAR(1) NOT NULL, lo_quantity INTEGER NOT NULL, lo_extendedprice INTEGER NOT NULL, lo_ordertotalprice INTEGER NOT NULL, lo_discount INTEGER NOT NULL, lo_revenue INTEGER NOT NULL, lo_supplycost INTEGER NOT NULL, lo_tax INTEGER NOT NULL, lo_commitdate INTEGER NOT NULL, lo_shipmode VARCHAR(10) NOT NULL );
For ClickHouse, the table definition looks like this:
CREATE TABLE lineorderfull ( LO_ORDERKEY UInt32, LO_LINENUMBER UInt8, LO_CUSTKEY UInt32, LO_PARTKEY UInt32, LO_SUPPKEY UInt32, LO_ORDERDATE Date, LO_ORDERPRIORITY String, LO_SHIPPRIORITY UInt8, LO_QUANTITY UInt8, LO_EXTENDEDPRICE UInt32, LO_ORDTOTALPRICE UInt32, LO_DISCOUNT UInt8, LO_REVENUE UInt32, LO_SUPPLYCOST UInt32, LO_TAX UInt8, LO_COMMITDATE Date, LO_SHIPMODE String )Engine=MergeTree(LO_ORDERDATE,(LO_ORDERKEY,LO_LINENUMBER),8192);
From this we can see we need to use datatypes like UInt8 and UInt32, which are somewhat unusual for database world datatypes.
The second table (RedShift definition):
CREATE TABLE customer ( c_custkey INTEGER NOT NULL, c_name VARCHAR(25) NOT NULL, c_address VARCHAR(25) NOT NULL, c_city VARCHAR(10) NOT NULL, c_nation VARCHAR(15) NOT NULL, c_region VARCHAR(12) NOT NULL, c_phone VARCHAR(15) NOT NULL, c_mktsegment VARCHAR(10) NOT NULL );
For ClickHouse, I defined as:
CREATE TABLE customerfull ( C_CUSTKEY UInt32, C_NAME String, C_ADDRESS String, C_CITY String, C_NATION String, C_REGION String, C_PHONE String, C_MKTSEGMENT String, C_FAKEDATE Date )Engine=MergeTree(C_FAKEDATE,(C_CUSTKEY),8192);
For reference, the full schema for the benchmark is here: https://github.com/vadimtk/ssb-clickhouse/blob/master/create.sql.
For this table, we need to define a rudimentary column
C_FAKEDATE Date in order to use ClickHouse’s most advanced engine (MergeTree). I was told by the ClickHouse team that they plan to remove this limitation in the future.
To generate data acceptable by ClickHouse, I made modifications to ssb-dbgen. You can find my version here: https://github.com/vadimtk/ssb-dbgen. The most notable change is that ClickHouse can’t accept dates in CSV files formatted as “19971125”. It has to be “1997-11-25”. This is something to keep in mind when loading data into ClickHouse.
It is possible to do some preformating on the load, but I don’t have experience with that. A common approach is to create the staging table with datatypes that match loaded data, and then convert them using SQL functions when inserting to the main table.
One of the goals of this benchmark to see how ClickHouse scales on multiple nodes. I used a setup of one node, and then compared to a setup of three nodes. Each node is 24 cores of “Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz” CPUs, and the data is located on a very fast PCIe Flash storage.
For the SSB benchmark I use a scale factor of 2500, which provides (in raw data):
Table lineorder – 15 bln rows, raw size 1.7TB, Table customer – 75 mln rows
When loaded into ClickHouse, the table lineorder takes 464GB, which corresponds to a 3.7x compression ratio.
We compare a one-node (table names
customerfull) setup vs. a three-node (table names
Single Table Operations
SELECT toYear(LO_ORDERDATE) AS yod, sum(LO_REVENUE) FROM lineorderfull GROUP BY yod
7 rows in set. Elapsed: 9.741 sec. Processed 15.00 billion rows, 90.00 GB (1.54 billion rows/s., 9.24 GB/s.)
7 rows in set. Elapsed: 3.258 sec. Processed 15.00 billion rows, 90.00 GB (4.60 billion rows/s., 27.63 GB/s.)
We see a speed up of practically three times. Handling 4.6 billion rows/s is blazingly fast!
One Table with Filtering
SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorderfull WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)
1 rows in set. Elapsed: 3.175 sec. Processed 2.28 billion rows, 18.20 GB (716.60 million rows/s., 5.73 GB/s.)
1 rows in set. Elapsed: 1.295 sec. Processed 2.28 billion rows, 18.20 GB (1.76 billion rows/s., 14.06 GB/s.)
It’s worth mentioning that during the execution of this query, ClickHouse was able to use ALL 24 cores on each box. This confirms that ClickHouse is a massively parallel processing system.
Two Tables (Independent Subquery)
In this case, I want to show how Clickhouse handles independent subqueries:
SELECT sum(LO_REVENUE) FROM lineorderfull WHERE LO_CUSTKEY IN ( SELECT C_CUSTKEY AS LO_CUSTKEY FROM customerfull WHERE C_REGION = 'ASIA' )
1 rows in set. Elapsed: 28.934 sec. Processed 15.00 billion rows, 120.00 GB (518.43 million rows/s., 4.15 GB/s.)
1 rows in set. Elapsed: 14.189 sec. Processed 15.12 billion rows, 121.67 GB (1.07 billion rows/s., 8.57 GB/s.)
We do not see, however, the close to 3x speedup on three nodes, because of the required data transfer to perform the match
Two Tables JOIN
With a subquery using columns to return results, or for GROUP BY, things get more complicated. In this case we want to GROUP BY the column from the second table.
First, ClickHouse doesn’t support traditional subquery syntax, so we need to use JOIN. For JOINs, ClickHouse also strictly prescribes how it must be written (a limitation that will also get changed in the future). Our JOIN should look like:
SELECT C_REGION, sum(LO_EXTENDEDPRICE * LO_DISCOUNT) FROM lineorderfull ANY INNER JOIN ( SELECT C_REGION, C_CUSTKEY AS LO_CUSTKEY FROM customerfull ) USING (LO_CUSTKEY) WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25) GROUP BY C_REGION
5 rows in set. Elapsed: 31.443 sec. Processed 2.35 billion rows, 28.79 GB (74.75 million rows/s., 915.65 MB/s.)
5 rows in set. Elapsed: 25.160 sec. Processed 2.58 billion rows, 33.25 GB (102.36 million rows/s., 1.32 GB/s.)
In this case the speedup is not even two times. This corresponds to the fact of the random data distribution for the tables lineorderd and customerd. Both tables were defines as:
CREATE TABLE lineorderd AS lineorder ENGINE = Distributed(3shards, default, lineorder, rand()); CREATE TABLE customerd AS customer ENGINE = Distributed(3shards, default, customer, rand());
rand() defines that records are distributed randomly across three nodes. When we perform a JOIN by LO_CUSTKEY=C_CUSTKEY, records might be located on different nodes. One way to deal with this is to define data locally. For example:
CREATE TABLE lineorderLD AS lineorderL ENGINE = Distributed(3shards, default, lineorderL, LO_CUSTKEY); CREATE TABLE customerLD AS customerL ENGINE = Distributed(3shards, default, customerL, C_CUSTKEY);
Three Tables JOIN
This is where it becomes very complicated. Let’s consider the query that you would normally write:
SELECT sum(LO_REVENUE),P_MFGR, toYear(LO_ORDERDATE) yod FROM lineorderfull ,customerfull,partfull WHERE C_REGION = 'ASIA' and LO_CUSTKEY=C_CUSTKEY and P_PARTKEY=LO_PARTKEY GROUP BY P_MFGR,yod ORDER BY P_MFGR,yod;
With Clickhouse’s limitations on JOINs syntax, the query becomes:
SELECT sum(LO_REVENUE), P_MFGR, toYear(LO_ORDERDATE) AS yod FROM ( SELECT LO_PARTKEY, LO_ORDERDATE, LO_REVENUE FROM lineorderfull ALL INNER JOIN ( SELECT C_REGION, C_CUSTKEY AS LO_CUSTKEY FROM customerfull ) USING (LO_CUSTKEY) WHERE C_REGION = 'ASIA' ) ALL INNER JOIN ( SELECT P_MFGR, P_PARTKEY AS LO_PARTKEY FROM partfull ) USING (LO_PARTKEY) GROUP BY P_MFGR, yod ORDER BY P_MFGR ASC, yod ASC
By writing queries this way, we force ClickHouse to use the prescribed JOIN order — at this moment there is no optimizer in ClickHouse and it is totally unaware of data distribution.
There is also not much speedup when we compare one node vs. three nodes:
One node execution time:
35 rows in set. Elapsed: 697.806 sec. Processed 15.08 billion rows, 211.53 GB (21.61 million rows/s., 303.14 MB/s.)
Three nodes execution time:
35 rows in set. Elapsed: 622.536 sec. Processed 15.12 billion rows, 211.71 GB (24.29 million rows/s., 340.08 MB/s.)
There is a way to make the query faster for this 3-way JOIN, however. (Thanks to Alexander Zaytsev from https://www.altinity.com/ for help!)
SELECT sum(revenue), P_MFGR, yod FROM ( SELECT LO_PARTKEY AS P_PARTKEY, toYear(LO_ORDERDATE) AS yod, SUM(LO_REVENUE) AS revenue FROM lineorderfull WHERE LO_CUSTKEY IN ( SELECT C_CUSTKEY FROM customerfull WHERE C_REGION = 'ASIA' ) GROUP BY P_PARTKEY, yod ) ANY INNER JOIN partfull USING (P_PARTKEY) GROUP BY P_MFGR, yod ORDER BY P_MFGR ASC, yod ASC
Optimized query time:
35 rows in set. Elapsed: 106.732 sec. Processed 15.00 billion rows, 210.05 GB (140.56 million rows/s., 1.97 GB/s.)
35 rows in set. Elapsed: 75.854 sec. Processed 15.12 billion rows, 211.71 GB (199.36 million rows/s., 2.79 GB/s.
That’s an improvement of about 6.5 times compared to the original query. This shows the importance of understanding data distribution, and writing the optimal query to process the data.
Another option for dealing with JOIN complexity, and to improve performance, is to use ClickHouse’s dictionaries. These dictionaries are described here: https://www.altinity.com/blog/2017/4/12/dictionaries-explained.
I will review dictionary performance in future posts.
Another traditional way to deal with JOIN complexity in an analytics workload is to use denormalization. We can move some columns (for example,
P_MFGR from the last query) to the facts table (
- ClickHouse can handle general analytical queries (it requires special schema design and considerations, however)
- Linear speedup is possible, but it depends on query design and requires advanced planning — proper speedup depends on data locality
- ClickHouse is blazingly fast (beyond what I’ve seen before) because it can use all available CPU cores for query, as shown above using 24 cores for single server and 72 cores for three nodes
- Multi-table JOINs are cumbersome and require manual work to achieve better performance, so consider using dictionaries or denormalization