Data storage or what is a MergeTree

Last updated:

This document covers the answers to the following questions:

  • How data is stored on disk for MergeTree engine family tables
  • What are parts, granules and marks
  • How and why choosing the correct ORDER BY and PARTITION BY in table definitions affects query performance
  • How to use EXPLAIN to understand what ClickHouse is doing
  • Difference between PREWHERE and WHERE
  • Data compression

Introduction to MergeTree

Why is ClickHouse so fast? states:

ClickHouse was initially built as a prototype to do just a single task well: to filter and aggregate data as fast as possible.

Rather than force all possible tasks to be solved by singular tools, ClickHouse provides specialized "engines" that each solve specific problems.

MergeTree engine family tables are intended for ingesting large amounts of data, storing that data efficientl, and running analytical queries on it.

How MergeTree stores data

Consider the following (simplified) table for storing sensor events:

SQL
CREATE TABLE sensor_values (
timestamp DateTime,
site_id UInt32,
event VARCHAR,
uuid UUID,
metric_value Int32
)
ENGINE = MergeTree()
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192

Data for this table would be stored in parts, each part a separate directory on disk. Data for a given part is always sorted by the order set in ORDER BY statement and compressed.

Parts can be Wide or Compact depending on its size. We'll be mostly dealing with Wide parts as part of day-to-day operations.

Wide parts are large and store each column in a separate binary data file, which are sorted and compressed.

ClickHouse also stores a sparse index for the part. A collection of rows with size equal to the index_granularity setting is called a granule. For every granule, the primary index stores a mark containing the value of the ORDER BY statement as well as a pointer to where that mark is located in each data file.

💡 For better performance when running queries, it is not recommended to set index_granularity too low. The default value for engines in the MergeTree family is 8192. An implication of this is that accessing data by primary key (in this case the ORDER BY clause is equivalent to the primary key) will not read just one row, but rather up to index_granularity number of rows. This is acceptable given ClickHouse is meant to perform well with aggregations, rather than point lookups.

Diving deeper into data-on-disk for a Wide part

This assumes you're using a docker-based ClickHouse installation and have clickhouse-client running

Seeding data
SQL
INSERT INTO sensor_values
SELECT *
FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)
LIMIT 200000000
Looking at part data

system.parts table contains a lot of metadata about every part.

To find out what type each part is, its size, and where on disk it's located, you can run the following query:

SQL
SELECT
name,
part_type,
rows,
marks,
formatReadableSize(bytes_on_disk),
formatReadableSize(data_compressed_bytes),
formatReadableSize(data_uncompressed_bytes),
formatReadableSize(marks_bytes),
path
FROM system.parts
WHERE active and table = 'sensor_values'
FORMAT Vertical

The result might look something like this:

Row 1:
──────
name: all_12_17_1
part_type: Wide
rows: 6291270
marks: 769
formatReadableSize(bytes_on_disk): 476.07 MiB
formatReadableSize(data_compressed_bytes): 475.92 MiB
formatReadableSize(data_uncompressed_bytes): 474.00 MiB
formatReadableSize(marks_bytes): 90.12 KiB
path: /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
Inspecting data on disk
Terminal
docker exec -it posthog_clickhouse_1 ls -lhS /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
total 477M
-rw-r----- 1 clickhouse clickhouse 308M Nov 2 07:33 event.bin
-rw-r----- 1 clickhouse clickhouse 97M Nov 2 07:33 uuid.bin
-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 metric_value.bin
-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 timestamp.bin
-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 site_id.bin
-rw-r----- 1 clickhouse clickhouse 58K Nov 2 07:33 primary.idx
-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 event.mrk2
-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 metric_value.mrk2
-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 site_id.mrk2
-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 timestamp.mrk2
-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 uuid.mrk2
-rw-r----- 1 clickhouse clickhouse 494 Nov 2 07:33 checksums.txt
-rw-r----- 1 clickhouse clickhouse 123 Nov 2 07:33 columns.txt
-rw-r----- 1 clickhouse clickhouse 10 Nov 2 07:33 default_compression_codec.txt
-rw-r----- 1 clickhouse clickhouse 7 Nov 2 07:33 count.txt

What are these files?

  • For every column, there's a {column_name}.bin file, containing the compressed (LZ4 compression by default) data for that column. These take up most of the space.
  • For every column, there's a {column_name}.mrk2 file, contains an index with data to locate each granule in {column_name}.bin file
  • primary.idx contains information on ORDER BY column values for each granule. This is loaded into memory during queries.
  • checksums.txt, columns.txt, default_compression_codec.txt and count.txt contain metadata about this part.

You can read more on the exact structure of these files and how they're used in ClickHouse Index Design documentation.

What does the Merge stand for?

In every system, data must be ingested and kept up-to-date somehow. When data is inserted into MergeTree tables, each insert creates one or multiple parts for the data inserted.

As having a lot of small files would be disadvantageous for many reasons from query performance to storage, ClickHouse regularly merges small parts together until they reach a maximum size.

The merge combines the two parts into a new one. This is similar to how merge sort works and atomically replaces the two source parts.

Merges can be monitored using the system.merges table.

Query execution

Aggregation supported by ORDER BY

Our sensor_values table is set up in a way that queries similar to the following are really fast to execute.

SQL
SELECT
toStartOfDay(timestamp),
event,
sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20

Executing this reports:

20 rows in set. Elapsed: 0.042 sec. Processed 90.11 thousand rows, 3.54 MB (2.13 million rows/s., 83.60 MB/s.)

Why can it be fast? Because ClickHouse:

  1. leverages the table ORDER BY clause (ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)) to skip reading a lot of data
  2. is fast and efficient about I/O and aggregation

Let's dig into how the primary index for this query is used by using EXPLAIN.

SQL
EXPLAIN indexes=1, header=1 SELECT
toStartOfDay(timestamp),
event,
sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20
FORMAT LineAsString
Show full `EXPLAIN` output
Expression (Projection)
Header: toStartOfDay(timestamp) DateTime
event String
total_metric_value Int64
Limit (preliminary LIMIT (without OFFSET))
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Sorting (Sorting for ORDER BY)
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Expression (Before ORDER BY)
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Aggregating
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Expression (Before GROUP BY)
Header: event String
metric_value Int32
toStartOfDay(timestamp) DateTime
Filter (WHERE)
Header: timestamp DateTime
event String
metric_value Int32
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
timestamp DateTime
site_id UInt32
event String
metric_value Int32
ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
timestamp DateTime
site_id UInt32
event String
metric_value Int32
Indexes:
PrimaryKey
Keys:
site_id
toStartOfDay(timestamp)
Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
Parts: 2/2
Granules: 11/24415

The full output of explain is obtuse, but the most important part is also the most deeply nested one:

ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
timestamp DateTime
site_id UInt32
event String
metric_value Int32
Indexes:
PrimaryKey
Keys:
site_id
toStartOfDay(timestamp)
Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
Parts: 2/2
Granules: 11/24415

At the start of the query, ClickHouse loaded the primary index of each part into memory. From this output, we know that the query first used the primary key to filter based on site_id and timestamp values stored in the index. This allowed it to know that only 11 out of 24415 granules (0.05%) contained any relevant data.

From there it read those 11 granules (11 * 8192 rows) worth of data from timestamp, side_id, event and metric_value columns and did the rest of filtering and aggregation on that data alone.

See this documentation for a guide on how to choose ORDER BY.

"Point queries" not supported by ORDER BY

Consider this query:

SQL
SELECT * FROM sensor_values WHERE uuid = '69028f26-768f-afef-1816-521b22d281ca'

Executing this query reports:

1 row in set. Elapsed: 0.703 sec. Processed 200.00 million rows, 3.20 GB (304.43 million rows/s., 4.87 GB/s.)

While the overall execution time of this query is not bad thanks to fast I/O, it needed to read 2200x the amount of data from disk. As the dataset size or column sizes increase, this performance would get dramatically worse.

Why is this query slower? Because our ORDER BY does not support fast filtering by uuid and ClickHouse needs to read the whole table to find a single record and read all columns.

ClickHouse provides some ways to make this faster (e.g. Projections) but in general these require extra disk space or have other trade-offs.

Thus, it's important to make sure the ClickHouse schema is aligned with queries that are being executed.

PARTITION BY

Another tool to make queries faster is PARTITION BY. Consider the updated table definition:

SQL
CREATE TABLE sensor_values (
timestamp DateTime,
site_id UInt32,
event VARCHAR,
uuid UUID,
metric_value Int32
)
ENGINE = MergeTree()
PARTITION BY intDiv(toYear(timestamp), 10)
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192

Here, ClickHouse would generate one partition per 10 years of data, allowing to skip reading even the primary index in some cases.

In the underlying data, each part would belong to a single partition and only parts within a partition would get merged.

One additional benefit of partitioning by a derivate of timestamp is that if most queries touch recent data, you can also set up rules to automatically move older parts and partitions to cheaper storage or drop them entirely.

Query analysis

Let's use an identical query as before to explain with the new dataset:

SQL
SELECT
toStartOfDay(timestamp),
event,
sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20
Show full `EXPLAIN` output
Expression (Projection)
Header: toStartOfDay(timestamp) DateTime
event String
total_metric_value Int64
Limit (preliminary LIMIT (without OFFSET))
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Sorting (Sorting for ORDER BY)
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Expression (Before ORDER BY)
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Aggregating
Header: toStartOfDay(timestamp) DateTime
event String
sum(metric_value) Int64
Expression (Before GROUP BY)
Header: event String
metric_value Int32
toStartOfDay(timestamp) DateTime
Filter (WHERE)
Header: timestamp DateTime
event String
metric_value Int32
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
timestamp DateTime
site_id UInt32
event String
metric_value Int32
ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
timestamp DateTime
site_id UInt32
event String
metric_value Int32
Indexes:
MinMax
Keys:
timestamp
Condition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))
Parts: 2/14
Granules: 3589/24421
Partition
Keys:
intDiv(toYear(timestamp), 10)
Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))
Parts: 2/2
Granules: 3589/3589
PrimaryKey
Keys:
site_id
toStartOfDay(timestamp)
Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
Parts: 2/2
Granules: 12/3589

The relevant part of EXPLAIN is again nested deep within:

ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
timestamp DateTime
site_id UInt32
event String
metric_value Int32
Indexes:
MinMax
Keys:
timestamp
Condition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))
Parts: 2/14
Granules: 3589/24421
Partition
Keys:
intDiv(toYear(timestamp), 10)
Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))
Parts: 2/2
Granules: 3589/3589
PrimaryKey
Keys:
site_id
toStartOfDay(timestamp)
Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
Parts: 2/2
Granules: 12/3589

What this tells us is that ClickHouse:

  1. First leverages an internal MinMax index on timestamp to whittle down the number of parts to 2/14 and granules to 3589/24421
  2. Then it tries to filter via the partition key but this doesn't narrow things down further
  3. Then, it loads and leverages the Primary key as before to narrow data down to 12 granules.
  4. Lastly reads, filters and aggregates data in those 12 granules

The benefit here is that it could skip reading the primary key index for most of the parts that did not contain relevant data. If and how much this speeds up the query however depends on the size of the dataset.

Choosing a good PARTITION BY

Use partitions wisely - each INSERT should ideally only touch 1-2 partitions and too many partitions will cause issues around replication or prove useless for filtering.

Loading the primary index/marks file might not be the bottleneck you expect, so be sure to benchmark different schemas against each other.

See the following Altinity documentation for more guidance:

Other notes on MergeTree

Data is expensive to update

Updating data in ClickHouse is expensive and analogous to a schema migration.

For example, to update an event's properties, ClickHouse frequently needs to:

  • Scan all the data to find what parts contain the relevant data. This isn't often covered by ORDER BY and thus quite expensive.
  • Rewrite the whole part (including any columns) - this could be potentially up to 150GB of data rewritten for a single update.

This makes things operationally hard. We mitigate this by:

  • Writing duplicated rows for new data, using other table engines (e.g. ReplacingMergeTree) and accounting for this duplication in our queries.
  • Batching up GDPR or other data deletions and doing them on a schedule rather than immediately.

No query planner

ClickHouse doesn't have a query planner in the sense PostgreSQL or other databases do.

On the one hand, you often end up fighting the query planner in other databases. If we know how ClickHouse works internally and can develop that into intuition for how SQL is executed, we're well-equipped to deal with performance issues as they arise.

On the other, this means that we'll need to be careful writing SQL as small changes can have huge performance implications.

Examples:

  • For best performance, ClickHouse requires you "push" predicates in WHERE clauses into sub-queries rather than filtering at the outermost query.
  • In the sensor_values queries above, the execution plan would have been slightly more optimal if the filter condition on toYear(timestamp) rather than timestamp.

One notable exception to "no query planner" is that ClickHouse often pushes predicates from WHERE into PREWHERE. Filters in PREWHERE are executed first and ClickHouse moves columns it thinks are "cheaper" or "more selective" into it. However putting the wrong column (e.g. a fat column containing JSON) in PREWHERE can cause performance to tank.

Read more on PREWHERE in the ClickHouse docs.

Data compression

Compression means that if subsequent column values of a given column are often similar or identical, the data compresses really well. At PostHog we frequently see uncompressed / compressed ratios of 20x-40x for JSON columns and 300x-2000x for sparse small columns.

Compression ratios have direct impact on query performance: I/O is often the bottleneck, meaning that highly compressed data can be read faster from disk at the cost of more CPU work for decompression.

By default columns are compressed by the LZ4 algorithm. We've found good success using ZSTD(3) for storing JSON columns - see benchmarks for more information.

Another tip is to use ClickHouse's LowCardinality data type modifier on schemas where a given column will store values with low cardinality i.e. the total number of values is low. An example of this would be "country name".

Weak JOIN support

ClickHouse excels at aggregating data from a single table at a time. If you however have a query with JOINs or subqueries, the right-hand-side of the JOIN would be loaded into memory first. Thus, you should always have the bigger table on the left side of left-hand-side!

This means that at scale JOINs can kill performance. Read more on the effect of removing JOINs from our events database here:

Suggested reading

Next in the ClickHouse manual: Data replication

Questions?

Was this page useful?

Next article

Data replication and distributed queries

This document provides information on: How data replication and Distributed table engine works in ClickHouse Sharding MergeTree tables How to monitor replication How to reason about distributed query execution Important settings for distributed query execution Doing ad-hoc distributed queries Setting up replicated tables A great guide on setting up replicated tables on a pre-existing cluster can be found in ClickHouse documentation . Some important highlights are: ClickHouse replication works…

Read next article