Recently, I was planning a data extraction strategy from an API and the goal was to schedule the frequency of data extraction to avoid cached responses, but also to be within rate limits. In order to have data for analysis, I have collected API response every minute for 3 hours. It resulted in 185 files (3 full hours plus several minutes more) with a total size of 6.15 MB saved in JSON New Lines format (and compressed with gzip) and 55746 records. As an analysis tool, I used clickhouse-local. This utility helps us to run SQL against local files without setting up a database, creating tables, and loading data.

Data Quality

Source API has a dynamic nature, meaning that stream start/stop and their number of viewers changes, which affects their order in the API response. Based on how the API works, there are two major notes about data:

  1. Data collection was started and stopped at some random times. We will consider data before the first change in viewer count and after the last change in viewer count to be incomplete and we should filter them.
  2. Source data was collected from an API that has pagination and only first the 2 pages were saved. Because the number of viewers changes, streams move position in API responses and might go out of the scope of our script. This influences our calculation because changes to the number of viewers might happen when the stream moved to later pages.

Docker command

To run SQL queries on saved files, we will use clickhouse-local executed in a Docker container:

docker run -v /mnt/f/projects/api_minute_cache:/q -v /mnt/f/projects/api_minute_cache/raw/api/streams/2022_03_14:/data clickhouse/clickhouse-server:22.5 clickhouse-local --input-format "JSONAsString" --output-format "Pretty" --queries-file "/q/query_3.sql" --file "/data/**/*"

Using Window functions

First approach to calculate is to use SQL Window functions. This option is compatible with other databases that support Window functions. We begin by parsing JSON into ClickHouse data types and using LAG function (lagInFrame in ClickHouse).

WITH
  source AS (
    SELECT 
      toUInt64(JSONExtractString(json, 'id'))                         AS id,
      parseDateTimeBestEffort(JSONExtractString(json, 'captured_at')) AS captured_at,
      JSONExtract(json, 'viewer_count', 'UInt32')                     AS viewer_count,
      lagInFrame(viewer_count) OVER (
        PARTITION BY id ORDER BY captured_at ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
      )                                                               AS prev_count,
      lagInFrame(captured_at)  OVER (
        PARTITION BY id ORDER BY captured_at ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
      )                                                               AS prev_captured_at
    FROM _local.table
)

By default, clickhouse-local uses database name _local and table name table. In the same query we use window function lagInFrame to have current viewer count, previous viewer count, current data extraction time (column captured_at), and previous data extraction time for each stream event. Our query will return result like in the following table:

id captured_at viewer_count prev_count prev_captured_at
44960163196 2022-03-14 11:09:10 867 0 1970-01-01 00:00:00
44960163196 2022-03-14 11:10:08 867 867 2022-03-14 11:09:10
44960163196 2022-03-14 11:11:08 867 867 2022-03-14 11:10:08
44960163196 2022-03-14 11:12:07 867 867 2022-03-14 11:11:08
44960163196 2022-03-14 11:13:08 920 867 2022-03-14 11:12:07
44960163196 2022-03-14 11:14:11 920 920 2022-03-14 11:13:08
WITH
  source AS (...)
, streams_with_changes AS (
    SELECT 
      id,
      captured_at,
      (viewer_count - prev_count) != 0 AS is_changed -- CH converts this to 0 and 1
    FROM source
    WHERE toYear(prev_captured_at) != 1970 -- filter first row which is marked as change
)

In CTE streams_with_changes we define the new column is_changed that defines whether at this event number of viewers changed. Instead of saving the value as boolean, ClickHouse converts it to 0 and 1. Where value 1 means that change happened at this event. Besides that, we filter the first event for each stream. Otherwise, they would look like changes, but those first sequences are incomplete. Now CTE streams_with_changes returns smaller tables with 0s and 1s:

id captured_at is_changed
44960163196 2022-03-14 11:10:08 0
44960163196 2022-03-14 11:11:08 0
44960163196 2022-03-14 11:12:07 0
44960163196 2022-03-14 11:13:08 1
44960163196 2022-03-14 11:14:11 0
44960163196 2022-03-14 11:15:05 0
44960163196 2022-03-14 11:16:07 0
44960163196 2022-03-14 11:17:10 1

The next step, coded in CTE only_changed, is to select only events when the number of viewers changed and bring on one row timestamp of the current change event and timestamp of the previous change event. As the result, we have a table that comprises two columns that contain timestamps only for change events. The code:

WITH
  source AS (...)
, streams_with_changes AS (...)
, only_changed AS (
  SELECT
    captured_at,
    lagInFrame(captured_at) OVER (PARTITION BY id ORDER BY captured_at ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS prev_change_ts
  FROM streams_with_changes
  WHERE is_changed
)

returns:

captured_at prev_change_ts
2022-03-14 11:13:08 1970-01-01 00:00:00
2022-03-14 11:17:10 2022-03-14 11:13:08

The final step in data transformation is to calculate the time difference between the previous change event and the current change event. To achieve this, we use function dateDiff and calculate the difference between two timestamps in minutes.

WITH
  source AS (...)
, streams_with_changes AS (...)
, only_changed AS (...)
, diffs AS (
  SELECT
    dateDiff('minute',   prev_change_ts, captured_at) AS minutes_diff
  from only_changed
  WHERE toYear(prev_change_ts) != 1970 -- if we use LAG then we should start from second row
)

CTE diffs returns only one column that contains a difference in minutes between two change events:

minutes_diff
4

Now having data with minutes differences between change events, we can calculate statistics like average time between change events.

WITH
  source AS (...)
, streams_with_changes AS (...)
, only_changed AS (...)
, diffs AS (...)
SELECT 
  count(*)             AS num_of_changes,
  median(minutes_diff) AS median_minutes,
  min(minutes_diff)    AS min_minutes,
  max(minutes_diff)    AS max_minutes,
  avg(minutes_diff)    AS avg_minutes
FROM diffs

Calculating statistics on the whole dataset gives us the result that there were 11514 change events and the average time between change events is a big longer than 4.5 minutes. In addition, I calculate minimum and maximum that show that the minimum time between events was 1 minute and maximum time was 153 minutes. Such a long time is probably related to how the API works and streams going out of scope of our data extraction process. Considering these outliers, I have also calculated median time, which is less susceptible to outliers than average.

num_of_changes median_minutes min_minutes max_minutes avg_minutes
11514 4 1 153 4.647993746743095

Both median and average return result of about 4 minutes.

Note: there are multiple versions of the function median in ClickHouse - link to CH docs

Using ClickHouse Arrays

The second option is leveraging ClickHouse arrays. We start again with parsing source data and use window function lagInFrame to have the current viewer count and previous viewer count for each stream in the same row.

WITH
  source AS (
    SELECT
      toUInt64(JSONExtractString(json, 'id')) AS id,
      parseDateTimeBestEffort(JSONExtractString(json, 'captured_at')) as captured_at,
      JSONExtract(json, 'viewer_count', 'UInt32') as viewer_count,
      lagInFrame(viewer_count) OVER (PARTITION BY id ORDER BY captured_at rows between unbounded preceding and unbounded following) as prev_count
    FROM _local.table
)
id captured_at viewer_count prev_count
44960163196 2022-03-14 11:09:10 867 0
44960163196 2022-03-14 11:10:08 867 867
44960163196 2022-03-14 11:11:08 867 867
44960163196 2022-03-14 11:12:07 867 867
44960163196 2022-03-14 11:13:08 920 867

Next, we will add a column with event timestamp only for events where current and previous viewer counts are not equal. At the same time, we convert timestamp to Unix timestamp that is stored as an integer. We need this conversion for function arrayDifference that does not work on timestamp.

WITH
  source AS (...)
, diffs AS (
  SELECT 
    id,
    toUnixTimestamp(captured_at)             AS diff_unix
  FROM source
  WHERE viewer_count != prev_count
  ORDER BY id, captured_at -- this is for groupArray to maintain order
)

Now we have a table of two columns - streams id and change event timestamp represented as Unix timestamp:

id diff_unix
44960163196 1647256150
44960163196 1647256388
44960163196 1647256630

Next step is to aggregate timestamps into arrays and apply some transformations:

WITH
  source AS (...)
, diffs AS (...)
, id_arr AS (
  SELECT
    id,
    groupArray(diff_unix)                      AS arr_diffs_unix,
    arrayPopFront(arr_diffs_unix)              AS full_seq_arr, -- remove first change because it is incomplete
    arrayDifference(full_seq_arr)              AS seconds_diffs,
    arrayPopFront(seconds_diffs)               AS clean_seconds_diffs, -- remove first element because it is introduced by function arrayDifference
    arrayMap((x) -> x/60, clean_seconds_diffs) AS arr_minutes,
    arrayMap((x) -> round(x), arr_minutes)     AS arr_minutes_rounded
  FROM diffs
  GROUP BY id
)

In this code, we create an array of Unix timestamp using the function groupArray. Then we remove the first change event (because those starting sequences are incomplete) using the function arrayPopFront. Then duration between change events timestamps is calculated with function arrayDifference. Because function arrayDifference adds value 0 as the first array element, we remove the first 0 with previously used function arrayPopFront. Now we have an array of durations in seconds between change events. Function arrayMap is used to convert seconds to minutes and then to round result to integer. Result of this CTE will look like this:

id arr_diffs_unix full_seq_arr seconds_diffs clean_seconds_diffs arr_minutes arr_minutes_rounded
44960163196 [1647256150,1647256388,1647256630] [1647256388,1647256630] [0,242] [242] [4.033333333333333] [4]

CTE id_arr calculates duration between change events for each stream were calculated, next CTE all_diffs_arr combines all durations into array of arrays and then flattens this array of array into array of integers (with no nested arrays).

WITH
  source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (
  SELECT
    groupArray(arr_minutes_rounded) AS arr_diffs, -- array of arrays
    arrayFlatten(arr_diffs)         AS change_arr
  FROM id_arr
)

Result:

arr_diffs change_arr
[[4]] [4]

Now that all durations are collected together, we can again calculate statistics over the entire dataset. Running SQL query that calculate average, median, minimum, and maximum of durations returns almost the same result (average value is slightly different) as previous version.

WITH
  source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (...)
SELECT
  length(change_arr)                AS num_changes,
  arrayReduce('median', change_arr) AS median_minutes,
  arrayMin(change_arr)              AS min_minutes,
  arrayMax(change_arr)              AS max_minutes,
  arrayAvg(change_arr)              AS avg_minutes
FROM all_diffs_arr
num_changes median_minutes min_minutes max_minutes avg_minutes
11514 4 1 153 4.64816744832378

And again median and average duration between change events is about 4 minutes.

Calculating frequencies

One more interesting thing to calculate about this dataset is frequency distribution of durations between change events. ClickHouse has function topK that computes approximately most frequent values. It is easy to extend our last query to have one more column with the result of the function topK:

WITH
  source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (...)
SELECT
  length(change_arr)                 AS num_changes,
  arrayReduce('median', change_arr)  AS median_minutes,
  arrayMin(change_arr)               AS min_minutes,
  arrayMax(change_arr)               AS max_minutes,
  arrayAvg(change_arr)               AS avg_minutes,
  arrayReduce('topK(5)', change_arr) AS most_freq_durations
FROM all_diffs_arr

The function topK returns most frequent values, but not their frequencies:

num_changes median_minutes min_minutes max_minutes avg_minutes most_freq_durations
11514 4 1 153 4.64816744832378 [4,5,3,6,7]

Now let’s adjust our query to calculate frequencies of values. This time it is more than one line change. We will unnest the array of duration with ARRAY JOIN to use SQL agggregate functions and GROUP BY:

WITH
  source AS (...)
, diffs AS (...)
, id_arr AS (...)
, all_diffs_arr AS (...)
SELECT
  carr     AS duration,
  COUNT(*) AS frequency
FROM all_diffs_arr AS ada
ARRAY JOIN ada.change_arr AS carr
GROUP BY duration
ORDER BY frequency DESC
LIMIT 10

It returns result that is the same as topK, but with the count of frequencies for each duration value:

duration frequency
4 4138
5 3011
3 2597
6 764
7 379
8 227
9 113
10 82
11 43
12 40

It looks like most often durations between change events are 3,4, and 5 minutes, with 4 minutes occuring most often.

Conclusion

In this post, we discussed a data analysis use case to determine how long data is cached in an API. Our source data was stored in files and from the beginning, the decision was made to not load this data into a database, but to analyse data in files directly. This goal was accomplished by using ClickHouse utility called clickhouse-local that uses ClickHouse query engine to process local files. In terms of code, we devised two solutions: one using SQL with Window functions that should be compatible or easily portable to other query engines, and the other one using SQL and ClickHouse arrays. Both options returned the same result and were not far apart in lines of code. In addition, we calculated frequencies of durations between change events using approximate count and using function COUNT.

Bonus: CSV to Markdown tables conversion

I used Markdown to format this article and Markdown has support for tables, but ClickHouse does not have any output format that would be suitable for Markdown tables. To avoid manually reformatting ClickHouse output for each table, I have used script csv2md in combination with ClickHouse output format CSVWithNames:

docker run -v /mnt/f/projects/api_minute_cache:/q -v /mnt/f/projects/api_minute_cache/raw/api/streams/2022_03_14:/data clickhouse/clickhouse-server:22.5 clickhouse-local --input-format "JSONAsString" --output-format "CSVWithNames" --queries-file "/q/query_4.sql" --file "/data/**/*" | csv2md

The code

Source code is available here - link to code