In our journey with data about streams, we did ad hoc analysis with Linux command-line tools, PySpark, and PostgreSQL (powered by TimescaleDB). Those are capable tools that enable analytics in various scenarios: when only Linux command line is available or when PostgreSQL compatibility is a requirement (then TimescaleDB is a good choice) or when queries should scale easily to hundreds of machines, then PySpark shines. But these tools come with their drawbacks. Since source data is stored in quite many GZIP compressed JSON files, it brings some challenges. In case of PySpark, initial read (and schema inference) of these files takes some time (and will take more when the number of files increases).

ClickHouse was briefly used earlier, but at that time I have loaded Parquet files that were produced by PySpark, instead of original GZIP compressed JSON files. Since ClickHouse performance really helped to shorten time between raising a question about data and getting query results back, I decided to explore more how to load data into ClickHouse.

I used Raspberry Pi 4 as a server that runs Python scripts (and has source files on the local disk). Old Asus laptop was used to host ClickHouse database server. Both Raspberry Pi 4 and Asus laptop were connected to the same network switch (Zyxel GS1200-8).

Used hardware:

  • Raspberry Pi 4 8GB with SSD Samsung 860 EVO 250 GB;
  • Asus K52Jc laptop (Intel Core i5-M450, 8 GB DDR3-1066, SSD Samsung 840 250 GB).

Source data (only June 2021):

  • Total files size: 5.4 GB;
  • Number of files: 2054.

Software:

  • ClickHouse 21.7.6;
  • Python 3.9.5.

Throwing files at database

I used HTTP Interface to load data into ClickHouse. Since HTTP Interface supports the same gzip compression as was already used to compress the source files, my first idea was to let ClickHouse handle everything and just send files as-is (with no decompression or json parsing) to the server.

First version of the table is quite simple and has only two columns: payload to store JSON string, and ingest_time that materializes timestamp at insertion time. No ordering on a particular column or partitioning was used.

CREATE TABLE IF NOT EXISTS rill.rp_json_helix_streams (
  payload     String,
  ingest_time DateTime DEFAULT NOW()
) ENGINE = MergeTree()
ORDER BY tuple()
;

In order to load JSON data into a single String column, we take advantage of ClickHouse input format called JSONAsString.

Before using Python, I went with loading data with command-line tools:

$ find /twitch/raw/helix/streams -type f -name '*2021_06*' |
parallel -j4 'curl -X POST "http://192.168.178.41:8123/?query=INSERT%20INTO%20rill.rp_json_helix_streams%20(payload)%20FORMAT%20JSONAsString" --data-binary @{} -H "Content-Encoding: gzip"'

It took 5 minutes (300 seconds) to load all those files in 4 threads in parallel from Raspberry Pi 4 to the Asus laptop. When the table was fully loaded (and optimized) it had 47_345_450 rows and had the size of 7.55 GiB.

I used command-line tools to load data to establish some baseline for loading performance so that I could compare performance of Python scripts.

Then I have written simple Python script that loads all files with extension .json.gz from a given directory (including subdirectories) into given table in ClickHouse - direct loading code

This was inspired by Altinity presentation about handling log data in Clickhouse.

Python code was executed on command-line in the following way (other scripts were also executed in the same way with parallel -j4 unless noted otherwise):

$ find /twitch/raw/helix/streams -type d -name '2021_06*' |
parallel -j4 'python3 ch_loader.py {} "rill.rp_json_helix_streams"'

It took 4 minutes (252 seconds) for this command to complete.

Current table uses default LZ4 compression for columns. I have changed compression code to be ZSTD(1) to see what impact that has on the table size and ingestion time.

New table definition is:

CREATE TABLE IF NOT EXISTS rill.rp_json_helix_streams_zstd (
  payload     String CODEC(ZSTD(1)),
  ingest_time DateTime DEFAULT NOW() CODEC(ZSTD(1))
) ENGINE = MergeTree()
ORDER BY tuple()
;

For the same Python script it took 5 minutes (344 seconds) to load the data. Final table size was 4.94 GiB. Using ZSTD compression definitely helped to reduce table size. Besides looking at ingestion time and table size, I have also looked at the amount of network traffic that was generated while ingesting data.

Measuring network traffic

Command-line utility vnStat was used to monitor traffic for a specific network interface.

First, we need to find which interface belongs to our docker container with ClickHouse:

$ docker exec -it <container_hash> cat /sys/class/net/eth0/iflink
14
$ ip ad | grep 14
14: veth2ca980d@if13: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue master br-47892c8ad0ef state UP group default
-- interface is veth2.... until @

After we got Docker network interface, vnStat is used like this:

vnstat --live -i veth2ca980d

Python script, that loaded JSON files directly into ClickHouse, generated about 5.59 GiB of traffic:

 traffic statistics

                           rx         |       tx
--------------------------------------+------------------
  bytes                    85.16 MiB  |        5.59 GiB
--------------------------------------+------------------
          max            4.97 Mbit/s  |   368.23 Mbit/s
      average            2.22 Mbit/s  |   149.18 Mbit/s
          min               0 kbit/s  |        0 kbit/s
--------------------------------------+------------------
  packets                    1341932  |         398326

Amount of data transferred is similar to the size of files on disk.

Extra columns for auditing

Data that was loaded into ClickHouse tables does not have any connection to source files. This means it is difficult to trace back the origin of records, so I decided to add filename and file last modified timestamp to JSON payload in Python. This means that the Python script now will decompress files, parse JSON lines, add new keys, serialize data back to JSON strings, and finally compress JSON to send it to ClickHouse - code.

In Python module gzip we can control compression level using parameter compresslevel. Compression level goes from 0 (no compression) to 9 (maximum compression).

Running this Python script with compression level = 1 (and extra columns) took 28 minutes (1686 seconds). Final table size was 7.63 GiB.

vnStat reported that 6.13 GiB of traffic was generated.

traffic statistics

                           rx         |       tx
--------------------------------------+------------------
  bytes                    64.23 MiB  |        6.13 GiB
--------------------------------------+------------------
          max            1.11 Mbit/s  |   128.43 Mbit/s
      average          301.18 kbit/s  |    29.45 Mbit/s
          min               0 kbit/s  |        0 kbit/s
--------------------------------------+------------------
  packets                    1009395  |         4367111

Then I was curious to test what happens if compression is not enabled and what happens when some level of compression is applied.

When compression level was set to 0 (no compression) then it took 27 minutes (1596 seconds) to load data. vnStat reported that about 25 GiB of network traffic was generated.

traffic statistics

                           rx         |       tx
--------------------------------------+------------------
  bytes                   130.20 MiB  |       25.81 GiB
--------------------------------------+------------------
          max            2.92 Mbit/s  |   539.57 Mbit/s
      average          679.24 kbit/s  |   137.89 Mbit/s
          min               0 kbit/s  |        0 kbit/s
--------------------------------------+------------------
  packets                    2050044  |        18324237
--------------------------------------+------------------

When compression level was set to 6 it took 32 minutes (1947 seconds) to load data. vnStat reported that about 5.30 GiB of network traffic was generated.

traffic statistics

                           rx         |       tx
--------------------------------------+------------------
  bytes                    67.73 MiB  |        5.30 GiB
--------------------------------------+------------------
          max            1.18 Mbit/s  |   129.45 Mbit/s
      average          203.27 kbit/s  |    16.30 Mbit/s
          min               0 kbit/s  |        0 kbit/s
--------------------------------------+------------------
  packets                    1064953  |         377854

In this first part we have only changed column compression in ClickHouse from default LZ4 to ZSTD and observed reduction of table size from 7.55 GiB to 4.94 GiB.

Ordering within JSON

Available streams data is a time-series data and some columns have the same values in unique events. For example, if we collected 20 events about a particular stream and the stream was only in one category, then there would be only one distinct value for column game_id for all 20 events. This property applies to multiple columns of this data. Since we are using columnar database and our data has repeated values, we should be able to compress the data even better.

Since ORDER BY clause accepts expressions, my first idea was to sort ClickHouse table using JSON functions. I have created a new table with the same ZSTD compression and ORDER BY clause:

CREATE TABLE IF NOT EXISTS rill.rp_json_helix_streams_zstd_ord (
  payload     String CODEC(ZSTD(1)),
  ingest_time DateTime DEFAULT NOW() CODEC(ZSTD(1))
) ENGINE = MergeTree()
ORDER BY (
  toUInt64(JSONExtractString(payload, 'user_id')),
  parseDateTimeBestEffort(JSONExtractString(payload, 'captured_at'))
)
;

Sorting data by columns user_id and captured_at gave the dataset in which the maximum number of repeated values was stored together.

I loaded data using Python (version that directly loads compressed files into ClickHouse without audit columns) and it took 12 minutes (704 seconds). Final table size was 639.10 MiB. That is a massive reduction in size compared to previous unordered table that took 4.94 GiB. It was achieved at the cost of time to load data increasing from 4 minutes to 12 minutes.

Materialized order

From the beginning, we chose to store JSON as string in a single column and use JSONAsString input format to load data. This input format has a limitation that only single (insertable) column of type String can be present in a table. But calculated columns can be present in such tables. Calculated columns are created using keyword MATERIALIZED.

With this in mind, I created another table in which columns that were previously parsed in the ORDER BY clause were materialized in separate columns. Then these materialized columns are used in ORDER BY:

CREATE TABLE IF NOT EXISTS rill.json_helix_streams_mat_ord (
  payload     String CODEC(ZSTD(1)),
  user_id     UInt64 MATERIALIZED toUInt64(JSONExtractString(payload, 'user_id')) CODEC(ZSTD(1)),
  captured_at DateTime MATERIALIZED parseDateTimeBestEffort(JSONExtractString(payload, 'captured_at')) CODEC(Delta, ZSTD(3)),
  ingest_time DateTime DEFAULT NOW() CODEC(Delta, ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(captured_at)
ORDER BY (
  user_id, captured_at
)
;

Compression codec is still ZSTD, with addition of specialised DELTA encoding being applied to DateTime columns.

There is also PARTITION BY clause applied to the table, but since all source data is from the same month, there is only one partition.

First, I have used command-line utilities to load files into this table and it took 6 minutes (389 seconds). Loading data using Python script (directly passing compressed files without addition of audit columns) took 7 minutes (390 seconds).

Final table size was 711.46 MiB. Which is slightly bigger than ordering using JSON functions, but also understandable because now there are two extra columns. Load time noticeably decreased from 12 minutes to 7 minutes.

Then I have loaded the same data with audit columns using various compression levels.

Compression level Load time
6 35 minutes (2047 seconds)
1 30 minutes (1766 seconds)
0 28 minutes (1697 seconds)

Final table size was 814.69 MiB, which is expected, because we added extra keys to JSON payload. And as we have previously seen that parsing JSON in Python and higher compression levels contribute to higher load times.

JSON libraries

Until now we used JSON library from Python standard library, but there are other options that might be faster.

I have tested my script with orjson, python-rapidjson, and pysimdjson libraries to see how much speed up I can get from using non-standard JSON library.

For pysimdjson I have used Drop-in API to minimize changes to code. Since function dumps is aliased to standard library JSON, I have also used a combination of rapidjson and pysimdjson as one of the options. To use orjson library I had to slightly change my Python script to take into account that orjson serializes to bytes rather than str - code

All runs were made with compression level 1 and with addition of audit columns.

JSON library Load time
std JSON 30 minutes (1766 seconds)
rapidjson with simdjson 4.0.2 21 minutes (1248 seconds)
rapidjson 1.4 21 minutes (1252 seconds)
ujson 4.2.0 19 minutes (1130 seconds)
orjson 3.6.4 14 minutes (830 seconds)

All the external JSON libraries showed an increase in performance (and decrease in loading times) in comparison to standard JSON module. orjson was the fastest library that halved loading times for our data.

Pandas

Another idea that I had is to try using pandas to parse JSON and load data into ClickHouse.

I had to adjust Python script to use pandas - code

And then change JSON functions in ClickHouse table definition to match new data schema generated by pandas:

CREATE TABLE IF NOT EXISTS rill.pd_json_helix_streams_mat_ord (
  payload     String CODEC(ZSTD(1)),
  user_id     UInt64 MATERIALIZED JSONExtract(payload, 'user_id', 'UInt64') CODEC(ZSTD(1)),
  captured_at DateTime MATERIALIZED FROM_UNIXTIME(intDiv(JSONExtract(payload, 'captured_at', 'UInt64'), 1000)) CODEC(Delta, ZSTD(3)),
  ingest_time DateTime DEFAULT NOW() CODEC(Delta, ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(captured_at)
ORDER BY (
  user_id, captured_at
)
;

It took 42 minutes (2504 seconds) to load all the source data with compression level 1 and audit columns.

Materialize them all

Eventually, I have created a table where all original JSON fields were materialized into separate columns.

CREATE TABLE IF NOT EXISTS rill.json_helix_streams_all_mat (
  payload      String CODEC(ZSTD(1)),
  user_id      UInt64 MATERIALIZED toUInt64(JSONExtractString(payload, 'user_id')) CODEC(ZSTD(1)),
  user_name    String MATERIALIZED JSONExtractString(payload, 'user_name') CODEC(ZSTD(1)),
  language     LowCardinality(String) MATERIALIZED JSONExtractString(payload, 'language') CODEC(ZSTD(1)),
  type         LowCardinality(String) MATERIALIZED JSONExtractString(payload, 'type') CODEC(ZSTD(1)),
  game_name    String MATERIALIZED TRIM(JSONExtractString(payload, 'game_name')) CODEC(ZSTD(1)),
  game_id      UInt32 MATERIALIZED toUInt32OrZero(JSONExtractString(payload, 'game_id')) CODEC(ZSTD(3)),
  tag_ids      Array(String) MATERIALIZED JSONExtract(payload, 'tag_ids', 'Array(String)') CODEC(ZSTD(3)),
  title        String MATERIALIZED TRIM(JSONExtractString(payload, 'title')) CODEC(ZSTD(3)),
  viewer_count UInt32 MATERIALIZED JSONExtract(payload, 'viewer_count', 'UInt32') CODEC(T64, ZSTD(1)),
  started_at   DateTime MATERIALIZED parseDateTimeBestEffort(JSONExtractString(payload, 'started_at')) CODEC(ZSTD(1)),
  captured_at  DateTime MATERIALIZED parseDateTimeBestEffort(JSONExtractString(payload, 'captured_at')) CODEC(Delta, ZSTD(3)),
  ingest_time  DateTime DEFAULT NOW() CODEC(Delta, ZSTD(1))
) ENGINE = MergeTree()
ORDER BY (
  user_id, captured_at
)
;

Now even more special data types (LowCardinality) and compression codes (‘T64’) were applied. Compression in ClickHouse is a fun topic and deserves a separate investigation.

It took 18 minutes (1080 seconds) for a Python script to directly load files (without addition of audit columns) into this all materialized table. The final table size was 910.81 MiB. Python script that adds audit columns (with compression level = 1 and standard JSON module) took 35 minutes (2129 seconds) and the final table size was 1019.83 MiB.

Conclusion

In this post we explored loading of data from GZIP compressed files into ClickHouse using command-line, but focused more on using Python to load data. Python received more attention because modern workflow management systems (see Apache Airflow, Prefect, Dagster) treat Python as first class citizen. We discovered that loading data from files into ClickHouse could be is as easy as sending files to ClickHouse server using HTTP interface. When it comes to table size ClickHouse offers several compression codecs and we have seen that switching from LZ4 to ZSTD already decreased table size. Further decrease in table size was achieved by sorting data in such way that repeated values are stored together. We also explored how much penalty we would encounter if we are to make Python to process source files (decompress, parse JSON, compress). From the tests we could see that it took 30 minutes to do this with standard Python JSON module, while orjson was able to speed up files processing in Python and decrease total loading time to 14 minutes. In this article I focused on using ClickHouse HTTP interface to load data, because my assumption was that ClickHouse server does not have access to source files. If ClickHouse server can access data stored in files on the same server or in a remote location (for example, NFS, hdfs) then ClickHouse offers multiple table functions that can help in querying data in remote locations.

Code

SQL

Measuring load times:

SELECT
    dateDiff('minute', MIN(ingest_time), MAX(ingest_time)) AS minutes,
    dateDiff('second', MIN(ingest_time), MAX(ingest_time)) AS seconds
FROM rill.json_helix_streams_mat_ord

Querying size of tables in ClickHouse:

SELECT
    CONCAT(database, '.', table) AS table,
    formatReadableSize(SUM(bytes)) AS size,
    SUM(rows) AS rows,
    MAX(modification_time) AS latest_modification,
    SUM(bytes) AS bytes_size,
    any(engine) AS engine,
    formatReadableSize(SUM(primary_key_bytes_in_memory)) AS primary_keys_size
FROM system.parts
WHERE active AND (database = 'rill')
GROUP BY
    database,
    table
ORDER BY bytes_size DESC

Python

Direct loading

Script to directly (without parsing JSON in Python) load JSON lines GZIP compressed files into ClickHouse:

# how to use:
# python3 ch_loader.py /path/to/directory "database_name.table_name"'
import sys
import requests
import glob
from pathlib import Path

SOURCE_PATH = sys.argv[1]
DEST_TABLE = sys.argv[2]

ch_url = "http://192.168.178.41:8123" # CH host

query = f"INSERT INTO {DEST_TABLE} (payload) FORMAT JSONAsString"

files_path = Path(SOURCE_PATH, '**', '*.json.gz')
for filepath in glob.iglob(str(files_path), recursive=True):
    print(filepath)
    with open(filepath, 'rb') as f:
        body = f.read()
        r = requests.post(
            ch_url,
            params={"query": query},
            data=body,
            headers={"Content-Encoding": 'gzip'}
        )
        if not r.ok:
            print(r.text)
            raise ValueError(r.status_code)

JSON with extra columns

import sys
import gzip
import json
import requests
import glob
from pathlib import Path

SOURCE_PATH = sys.argv[1]
DEST_TABLE = sys.argv[2]

ch_url = "http://192.168.178.41:8123" # asus

query = f"INSERT INTO {DEST_TABLE} (payload) FORMAT JSONAsString"

files_path = Path(SOURCE_PATH, '**', '*.json.gz')
for filepath in glob.iglob(str(files_path), recursive=True):
    filename = Path(filepath).name
    file_unix_ts = Path(filepath).stat().st_mtime
    print(filepath)
    with gzip.open(filepath, 'rt', encoding='utf-8') as f:
        items = []
        for line in f:
            d = json.loads(line)
            d['_source_filename'] = filename
            d['_file_ts'] = int(file_unix_ts)
            items.append(json.dumps(d, ensure_ascii=False))
        body = gzip.compress(('\n'.join(items)).encode('utf-8'), compresslevel=1)
        r = requests.post(
            ch_url,
            params={"query": query},
            data=body,
            headers={"Content-Encoding": 'gzip'}
        )
        if not r.ok:
            print(r.text)
            raise ValueError(r.status_code)

JSON handling with orjson

import sys
import gzip
import requests
import glob
from pathlib import Path

import orjson as ojson

SOURCE_PATH = sys.argv[1]
DEST_TABLE = sys.argv[2]

ch_url = "http://192.168.178.41:8123" # CH server

query = f"INSERT INTO {DEST_TABLE} (payload) FORMAT JSONAsString"

files_path = Path(SOURCE_PATH, '**', '*.json.gz')
for filepath in glob.iglob(str(files_path), recursive=True):
    filename = Path(filepath).name
    file_unix_ts = Path(filepath).stat().st_mtime
    print(filepath)
    with gzip.open(filepath, 'rb') as f:
        items = []
        for line in f:
            d = ojson.loads(line)
            d['_source_filename'] = filename
            d['_file_ts'] = int(file_unix_ts)
            items.append(ojson.dumps(d))
        body = gzip.compress(b'\n'.join(items), compresslevel=1)
        r = requests.post(
            ch_url,
            params={"query": query},
            data=body,
            headers={"Content-Encoding": 'gzip'}
        )
        if not r.ok:
            print(r.text)
            raise ValueError(r.status_code)

Handling JSON with pandas

import sys
import gzip
import requests
import glob
from pathlib import Path

import pandas as pd

SOURCE_PATH = sys.argv[1]
DEST_TABLE = sys.argv[2]

ch_url = "http://192.168.178.41:8123" # CH server

query = f"INSERT INTO {DEST_TABLE} (payload) FORMAT JSONAsString"

files_path = Path(SOURCE_PATH, '**', '*.json.gz')
for filepath in glob.iglob(str(files_path), recursive=True):
    filename = Path(filepath).name
    file_unix_ts = Path(filepath).stat().st_mtime
    print(filepath)
    df = pd.read_json(filepath, lines=True)
    df['_source_filename'] = filename
    df['_file_ts'] = int(file_unix_ts)
    body = df.to_json(orient='records', force_ascii=False, lines=True)
    body = gzip.compress(body.encode('utf-8'), compresslevel=1)
    r = requests.post(
        ch_url,
        params={"query": query},
        data=body,
        headers={"Content-Encoding": 'gzip'}
    )
    if not r.ok:
        print(r.text)
        raise ValueError(r.status_code)