Recently, I was planning a data extraction strategy from an API and the goal was to schedule the frequency of data extraction to avoid cached responses, but also to be within rate limits. In order to have data for analysis, I have collected API response every minute for 3 hours. It resulted in 185 files (3 full hours plus several minutes more) with a total size of 6.15 MB saved in JSON New Lines format (and compressed with gzip) and 55746 records. As an analysis tool, I used clickhouse-local. This utility helps us to run SQL against local files without setting up a database, creating tables, and loading data.
Data Quality
Source API has a dynamic nature, meaning that stream start/stop and their number of viewers changes, which affects their order in the API response. Based on how the API works, there are two major notes about data:
- Data collection was started and stopped at some random times. We will consider data before the first change in viewer count and after the last change in viewer count to be incomplete and we should filter them.
- Source data was collected from an API that has pagination and only first the 2 pages were saved. Because the number of viewers changes, streams move position in API responses and might go out of the scope of our script. This influences our calculation because changes to the number of viewers might happen when the stream moved to later pages.
Docker command
To run SQL queries on saved files, we will use clickhouse-local executed in a Docker container:
docker run -v /mnt/f/projects/api_minute_cache:/q -v /mnt/f/projects/api_minute_cache/raw/api/streams/2022_03_14:/data clickhouse/clickhouse-server:22.5 clickhouse-local --input-format "JSONAsString" --output-format "Pretty" --queries-file "/q/query_3.sql" --file "/data/**/*"
Using Window functions
First approach to calculate is to use SQL Window functions. This option is compatible with other databases that support Window functions.
We begin by parsing JSON into ClickHouse data types and using LAG function (lagInFrame
in ClickHouse).
WITH
source AS (
SELECT
toUInt64(JSONExtractString(json, 'id')) AS id,
parseDateTimeBestEffort(JSONExtractString(json, 'captured_at')) AS captured_at,
JSONExtract(json, 'viewer_count', 'UInt32') AS viewer_count,
lagInFrame(viewer_count) OVER (
PARTITION BY id ORDER BY captured_at ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS prev_count,
lagInFrame(captured_at) OVER (
PARTITION BY id ORDER BY captured_at ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS prev_captured_at
FROM _local.table
)
By default, clickhouse-local uses database name _local
and table name table
.
In the same query we use window function lagInFrame
to have current viewer count, previous viewer count, current data extraction time (column captured_at
), and previous data extraction time for each stream event.
Our query will return result like in the following table:
id | captured_at | viewer_count | prev_count | prev_captured_at |
---|---|---|---|---|
44960163196 | 2022-03-14 11:09:10 | 867 | 0 | 1970-01-01 00:00:00 |
44960163196 | 2022-03-14 11:10:08 | 867 | 867 | 2022-03-14 11:09:10 |
44960163196 | 2022-03-14 11:11:08 | 867 | 867 | 2022-03-14 11:10:08 |
44960163196 | 2022-03-14 11:12:07 | 867 | 867 | 2022-03-14 11:11:08 |
44960163196 | 2022-03-14 11:13:08 | 920 | 867 | 2022-03-14 11:12:07 |
44960163196 | 2022-03-14 11:14:11 | 920 | 920 | 2022-03-14 11:13:08 |
WITH
source AS (...)
, streams_with_changes AS (
SELECT
id,
captured_at,
(viewer_count - prev_count) != 0 AS is_changed -- CH converts this to 0 and 1
FROM source
WHERE toYear(prev_captured_at) != 1970 -- filter first row which is marked as change
)
In CTE streams_with_changes
we define the new column is_changed
that defines whether at this event number of viewers changed. Instead of saving the value as boolean, ClickHouse converts it to 0 and 1. Where value 1 means that change happened at this event. Besides that, we filter the first event for each stream. Otherwise, they would look like changes, but those first sequences are incomplete.
Now CTE streams_with_changes
returns smaller tables with 0s and 1s:
id | captured_at | is_changed |
---|---|---|
44960163196 | 2022-03-14 11:10:08 | 0 |
44960163196 | 2022-03-14 11:11:08 | 0 |
44960163196 | 2022-03-14 11:12:07 | 0 |
44960163196 | 2022-03-14 11:13:08 | 1 |
44960163196 | 2022-03-14 11:14:11 | 0 |
44960163196 | 2022-03-14 11:15:05 | 0 |
44960163196 | 2022-03-14 11:16:07 | 0 |
44960163196 | 2022-03-14 11:17:10 | 1 |
The next step, coded in CTE only_changed
, is to select only events when the number of viewers changed and bring on one row timestamp of the current change event and timestamp of the previous change event. As the result, we have a table that comprises two columns that contain timestamps only for change events.
The code:
WITH
source AS (...)
, streams_with_changes AS (...)
, only_changed AS (
SELECT
captured_at,
lagInFrame(captured_at) OVER (PARTITION BY id ORDER BY captured_at ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS prev_change_ts
FROM streams_with_changes
WHERE is_changed
)
returns:
captured_at | prev_change_ts |
---|---|
2022-03-14 11:13:08 | 1970-01-01 00:00:00 |
2022-03-14 11:17:10 | 2022-03-14 11:13:08 |
The final step in data transformation is to calculate the time difference between the previous change event and the current change event. To achieve this, we use function dateDiff
and calculate the difference between two timestamps in minutes.
WITH
source AS (...)
, streams_with_changes AS (...)
, only_changed AS (...)
, diffs AS (
SELECT
dateDiff('minute', prev_change_ts, captured_at) AS minutes_diff
from only_changed
WHERE toYear(prev_change_ts) != 1970 -- if we use LAG then we should start from second row
)
CTE diffs
returns only one column that contains a difference in minutes between two change events:
minutes_diff |
---|
4 |
Now having data with minutes differences between change events, we can calculate statistics like average time between change events.
WITH
source AS (...)
, streams_with_changes AS (...)
, only_changed AS (...)
, diffs AS (...)
SELECT
count(*) AS num_of_changes,
median(minutes_diff) AS median_minutes,
min(minutes_diff) AS min_minutes,
max(minutes_diff) AS max_minutes,
avg(minutes_diff) AS avg_minutes
FROM diffs
Calculating statistics on the whole dataset gives us the result that there were 11514 change events and the average time between change events is a big longer than 4.5 minutes. In addition, I calculate minimum and maximum that show that the minimum time between events was 1 minute and maximum time was 153 minutes. Such a long time is probably related to how the API works and streams going out of scope of our data extraction process. Considering these outliers, I have also calculated median time, which is less susceptible to outliers than average.
num_of_changes | median_minutes | min_minutes | max_minutes | avg_minutes |
---|---|---|---|---|
11514 | 4 | 1 | 153 | 4.647993746743095 |
Both median and average return result of about 4 minutes.
Note: there are multiple versions of the function median
in ClickHouse - link to CH docs
Using ClickHouse Arrays
The second option is leveraging ClickHouse arrays.
We start again with parsing source data and use window function lagInFrame
to have the current viewer count and previous viewer count for each stream in the same row.
WITH
source AS (
SELECT
toUInt64(JSONExtractString(json, 'id')) AS id,
parseDateTimeBestEffort(JSONExtractString(json, 'captured_at')) as captured_at,
JSONExtract(json, 'viewer_count', 'UInt32') as viewer_count,
lagInFrame(viewer_count) OVER (PARTITION BY id ORDER BY captured_at rows between unbounded preceding and unbounded following) as prev_count
FROM _local.table
)
id | captured_at | viewer_count | prev_count |
---|---|---|---|
44960163196 | 2022-03-14 11:09:10 | 867 | 0 |
44960163196 | 2022-03-14 11:10:08 | 867 | 867 |
44960163196 | 2022-03-14 11:11:08 | 867 | 867 |
44960163196 | 2022-03-14 11:12:07 | 867 | 867 |
44960163196 | 2022-03-14 11:13:08 | 920 | 867 |
Next, we will add a column with event timestamp only for events where current and previous viewer counts are not equal. At the same time, we convert timestamp to Unix timestamp that is stored as an integer. We need this conversion for function arrayDifference
that does not work on timestamp.
WITH
source AS (...)
, diffs AS (
SELECT
id,
toUnixTimestamp(captured_at) AS diff_unix
FROM source
WHERE viewer_count != prev_count
ORDER BY id, captured_at -- this is for groupArray to maintain order
)
Now we have a table of two columns - streams id and change event timestamp represented as Unix timestamp:
id | diff_unix |
---|---|
44960163196 | 1647256150 |
44960163196 | 1647256388 |
44960163196 | 1647256630 |
Next step is to aggregate timestamps into arrays and apply some transformations:
WITH
source AS (...)
, diffs AS (...)
, id_arr AS (
SELECT
id,
groupArray(diff_unix) AS arr_diffs_unix,
arrayPopFront(arr_diffs_unix) AS full_seq_arr, -- remove first change because it is incomplete
arrayDifference(full_seq_arr) AS seconds_diffs,
arrayPopFront(seconds_diffs) AS clean_seconds_diffs, -- remove first element because it is introduced by function arrayDifference
arrayMap((x) -> x/60, clean_seconds_diffs) AS arr_minutes,
arrayMap((x) -> round(x), arr_minutes) AS arr_minutes_rounded
FROM diffs
GROUP BY id
)
In this code, we create an array of Unix timestamp using the function groupArray
. Then we remove the first change event (because those starting sequences are incomplete) using the function arrayPopFront
. Then duration between change events timestamps is calculated with function arrayDifference
. Because function arrayDifference
adds value 0 as the first array element, we remove the first 0 with previously used function arrayPopFront
. Now we have an array of durations in seconds between change events. Function arrayMap
is used to convert seconds to minutes and then to round result to integer.
Result of this CTE will look like this:
id | arr_diffs_unix | full_seq_arr | seconds_diffs | clean_seconds_diffs | arr_minutes | arr_minutes_rounded |
---|---|---|---|---|---|---|
44960163196 | [1647256150,1647256388,1647256630] | [1647256388,1647256630] | [0,242] | [242] | [4.033333333333333] | [4] |
CTE id_arr
calculates duration between change events for each stream were calculated, next CTE all_diffs_arr
combines all durations into array of arrays and then flattens this array of array into array of integers (with no nested arrays).
WITH
source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (
SELECT
groupArray(arr_minutes_rounded) AS arr_diffs, -- array of arrays
arrayFlatten(arr_diffs) AS change_arr
FROM id_arr
)
Result:
arr_diffs | change_arr |
---|---|
[[4]] | [4] |
Now that all durations are collected together, we can again calculate statistics over the entire dataset. Running SQL query that calculate average, median, minimum, and maximum of durations returns almost the same result (average value is slightly different) as previous version.
WITH
source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (...)
SELECT
length(change_arr) AS num_changes,
arrayReduce('median', change_arr) AS median_minutes,
arrayMin(change_arr) AS min_minutes,
arrayMax(change_arr) AS max_minutes,
arrayAvg(change_arr) AS avg_minutes
FROM all_diffs_arr
num_changes | median_minutes | min_minutes | max_minutes | avg_minutes |
---|---|---|---|---|
11514 | 4 | 1 | 153 | 4.64816744832378 |
And again median and average duration between change events is about 4 minutes.
Calculating frequencies
One more interesting thing to calculate about this dataset is frequency distribution of durations between change events. ClickHouse has function topK that computes approximately most frequent values. It is easy to extend our last query to have one more column with the result of the function topK
:
WITH
source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (...)
SELECT
length(change_arr) AS num_changes,
arrayReduce('median', change_arr) AS median_minutes,
arrayMin(change_arr) AS min_minutes,
arrayMax(change_arr) AS max_minutes,
arrayAvg(change_arr) AS avg_minutes,
arrayReduce('topK(5)', change_arr) AS most_freq_durations
FROM all_diffs_arr
The function topK
returns most frequent values, but not their frequencies:
num_changes | median_minutes | min_minutes | max_minutes | avg_minutes | most_freq_durations |
---|---|---|---|---|---|
11514 | 4 | 1 | 153 | 4.64816744832378 | [4,5,3,6,7] |
Now let’s adjust our query to calculate frequencies of values. This time it is more than one line change. We will unnest the array of duration with ARRAY JOIN to use SQL agggregate functions and GROUP BY:
WITH
source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (...)
SELECT
carr AS duration,
COUNT(*) AS frequency
FROM all_diffs_arr AS ada
ARRAY JOIN ada.change_arr AS carr
GROUP BY duration
ORDER BY frequency DESC
LIMIT 10
It returns result that is the same as topK
, but with the count of frequencies for each duration value:
duration | frequency |
---|---|
4 | 4138 |
5 | 3011 |
3 | 2597 |
6 | 764 |
7 | 379 |
8 | 227 |
9 | 113 |
10 | 82 |
11 | 43 |
12 | 40 |
It looks like most often durations between change events are 3,4, and 5 minutes, with 4 minutes occuring most often.
Conclusion
In this post, we discussed a data analysis use case to determine how long data is cached in an API. Our source data was stored in files and from the beginning, the decision was made to not load this data into a database, but to analyse data in files directly. This goal was accomplished by using ClickHouse utility called clickhouse-local that uses ClickHouse query engine to process local files. In terms of code, we devised two solutions: one using SQL with Window functions that should be compatible or easily portable to other query engines, and the other one using SQL and ClickHouse arrays. Both options returned the same result and were not far apart in lines of code. In addition, we calculated frequencies of durations between change events using approximate count and using function COUNT
.
Bonus: CSV to Markdown tables conversion
I used Markdown to format this article and Markdown has support for tables, but ClickHouse does not have any output format that would be suitable for Markdown tables. To avoid manually reformatting ClickHouse output for each table, I have used script csv2md in combination with ClickHouse output format CSVWithNames
:
docker run -v /mnt/f/projects/api_minute_cache:/q -v /mnt/f/projects/api_minute_cache/raw/api/streams/2022_03_14:/data clickhouse/clickhouse-server:22.5 clickhouse-local --input-format "JSONAsString" --output-format "CSVWithNames" --queries-file "/q/query_4.sql" --file "/data/**/*" | csv2md
The code
Source code is available here - link to code