Let’s continue our ad hoc data analysis journey with the next tool: Apache Spark and in particular PySpark. In the previous post we used Linux command-line tools to perform a data analysis, which is a hard way for people who do not spend most of their time in terminal. PySpark should be much easier to understand for people who use SQL and Python for data analysis. We will use the same questions as previously about the number of streams per day/month, the number of games per day/month, most popular games and genres.
In our setup we will use a Docker container provided by Jupyter (called pyspark-notebook) and run Spark in local mode (and write code in Jupyter notebook).
Used hardware: Core i7-6700HQ, 32GB RAM DDR4 2133, Samsung SSD 860 EVO 1TB.
PySpark initialization
Before starting any analysis, let’s prepare PySpark session with imports and session configuration.
First, we import SparkSession and Pyspark functions:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
And next we should configure our Spark session:
from pyspark import SparkConf
conf=SparkConf()
conf.set('spark.driver.memory', '12g')
conf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
conf.set('spark.sql.session.timeZone', 'UTC') # enforce UTC for the session
conf.set('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') # enforce UTC for driver JVM
conf.set('spark.executor.extraJavaOptions', '-Duser.timezone=UTC') # enforce UTC for executor JVM
conf.set('spark.rdd.compress', 'True') # enable compression for cached RDDs
Important parts in our configuration is setting of amount of memory for the driver process (because we use local mode we will have everything happening on the driver node) and enforcement of UTC for as many parts of Spark as we can reach. Settings for enabling KryoSerializer and RDD compression are there to squeeze more performance out of PySpark.
Now that Spark configuration is defined, we can start new Spark session in local mode:
spark = SparkSession.builder.config(conf=conf).master('local[4]').appName('rill').getOrCreate()
For our Spark session we explicitly set local
mode and we explicitly set 4 threads to be used.
On Linux use terminal command lscpu
to get information about CPU and see how many cores are available (key CPU(s)
). On my CPU Core i7-6700HQ command lscpu
returns CPU(s): 8
and we give 4 of them to Spark.
Finally, test that Spark session actually works by running
spark.version
Which returns '3.0.1'
for me.
Number of streams per day / per month
The first question is how many unique streams happened per day and per month. For this ad hoc analysis, we define Twitch stream as unique value in field “ID”. To start, let’s prepare Spark Dataframe with streams data (it took Spark 58 seconds to read the files)):
df_streams = spark.read.json('/data/twitch/raw/helix/streams/2020_07_*/**/*.gz')
Previously we have calculated a total number of events for July 2020 and let’s calculate the same with Spark to compare numbers:
df_streams.count()
Returned count is 12_971_288, which is the same result as with command-line tools.
Since at this point dataframe df_streams
contains data only for July 2021, we can also check number of unique streams for that month.
df_streams.agg(F.countDistinct('id')).show()
+---------+
|count(id)|
+---------+
| 939209|
+---------+
The query returns 939_209, which is the same number as we got before using command-line tools. This calculation took only about 41 seconds according to Spark Web UI (that usually runs on port 4040).
Pyspark dataframes have schemas that describe datasets in terms of column names and column data types. While creating dataframe df_streams
we did not provide schema definition explicitly, so Spark inferred schema for us.
Dataframe schema can be checked with command:
df_streams.printSchema()
that shows schema of our dataframe as:
root
|-- captured_at: string (nullable = true)
|-- game_id: string (nullable = true)
|-- id: string (nullable = true)
|-- language: string (nullable = true)
|-- started_at: string (nullable = true)
|-- tag_ids: array (nullable = true)
| |-- element: string (containsNull = true)
|-- thumbnail_url: string (nullable = true)
|-- title: string (nullable = true)
|-- type: string (nullable = true)
|-- user_id: string (nullable = true)
|-- user_name: string (nullable = true)
|-- viewer_count: long (nullable = true)
Spark set most of the columns to have data type String. We know that fields captured_at
and started_at
contain datetime values and should be of Timestamp
type.
In order to get a better overview of our dataframe we will use PySpark describe
(docs) command.
df_streams.describe(['captured_at', 'game_id', 'id', 'started_at']).show(truncate=False, n=30)
The output confirms that columns captured_at
and started_at
can be casted to Timestamp. In addition, we will cast game_id
and id
to Long.
+-------+-------------------------+------------------+----------------------+--------------------+
|summary|captured_at |game_id |id |started_at |
+-------+-------------------------+------------------+----------------------+--------------------+
|count |12971288 |12971288 |12971288 |12971288 |
|mean |null |303814.2656312978 |5.9693539930552256E17 |null |
|stddev |null |231173.94952696687|1.81902841965517338E18|null |
|min |2020-07-01 00:00:02+00:00| |100000114 |2020-06-29T01:20:47Z|
|max |2020-07-31 23:50:01+00:00|9990 |999989047844975133 |2020-07-31T23:47:08Z|
+-------+-------------------------+------------------+----------------------+--------------------+
Let’s prepare new dataframe with columns casted to appropriate datatypes and add extra columns captured_at_month
and captured_at_day
, which will come in handy for doing grouping by day and month.
df_h_streams = (
df_streams
.withColumn('captured_at', F.col('captured_at').cast('Timestamp'))
.withColumn('id', F.col('id').cast('Long')) # casting this to INT causes 1st July streams == 10518
.withColumn('game_id', F.col('game_id').cast('Long'))
.withColumn('captured_at_month', F.date_trunc('month', F.col('captured_at')).cast('Date'))
.withColumn('captured_at_day', F.date_trunc('day', F.col('captured_at')).cast('Date'))
.select(['captured_at', 'id', 'game_id', 'captured_at_month', 'captured_at_day'])
).cache() # not cached yet
Then test again if we have lost any records (and trigger caching):
df_h_streams.count()
Which returns 12971288, meaning that all records are still with us. Spark Web UI now shows that we have cached a new RDD in Spark:
Now that we have a dataframe with captured_at_day
and captured_at_month
it is easy to calculate number of streams per day and per month.
It took 6 seconds to get results for the number of unique streams per day:
(
df_h_streams
.groupBy('captured_at_day')
.agg(F.countDistinct('id').alias('num_streams'))
.orderBy('captured_at_day')
.show(n=31)
)
The result shows that number of unique streams in July 2020 is somewhere around 33K - 34K. Number of unique streams on 1st July 2020 is 34_898, which is the same as we calculated earlier with command-line tools.
+---------------+-----------+
|captured_at_day|num_streams|
+---------------+-----------+
| 2020-07-01| 34898|
| 2020-07-02| 34798|
| 2020-07-03| 33529|
| 2020-07-04| 34589|
| 2020-07-05| 33580|
| 2020-07-06| 34198|
| 2020-07-07| 34817|
| 2020-07-08| 35402|
| 2020-07-09| 35182|
| 2020-07-10| 34708|
| 2020-07-11| 34496|
| 2020-07-12| 33688|
| 2020-07-13| 33641|
| 2020-07-14| 34415|
| 2020-07-15| 34047|
| 2020-07-16| 34105|
| 2020-07-17| 34966|
| 2020-07-18| 33936|
| 2020-07-19| 33160|
| 2020-07-20| 33430|
| 2020-07-21| 34481|
| 2020-07-22| 34045|
| 2020-07-23| 34000|
| 2020-07-24| 34523|
| 2020-07-25| 33613|
| 2020-07-26| 34369|
| 2020-07-27| 33734|
| 2020-07-28| 32852|
| 2020-07-29| 33583|
| 2020-07-30| 34396|
| 2020-07-31| 32796|
+---------------+-----------+
And it took 3 seconds to get results for the number of unique streams per month.
(
df_h_streams
.groupBy('captured_at_month')
.agg(F.countDistinct('id').alias('num_streams'))
.orderBy('captured_at_month')
.show()
)
The query returns the same number as we calculated earlier with PySpark and with command-line tools
+-----------------+-----------+
|captured_at_month|num_streams|
+-----------------+-----------+
| 2020-07-01| 939209|
+-----------------+-----------+
Silent casting errors
While writing this code first time I have not used describe
and casted field id
to Int
. Spark happily executed this:
df_streams.withColumn('id', F.col('id').cast('Int')).agg(F.countDistinct('id')).show()
But the result was 372_455 instead of expected 939_209.
Those values in column id
that did not fit into Int
were set to Null
by Spark.
Number of unique games per day / per month
Answering question about the number of unique games per day/month becomes very easy after answering similar questions about number of streams. The only line that we will change is the line with agg
command: .agg(F.countDistinct('game_id').alias('num_games'))
Daily number of unique games in July 2020 is quite stable at aound 1600 - 1700 unique games per day:
(
df_h_streams
.groupBy('captured_at_day')
.agg(F.countDistinct('game_id').alias('num_games'))
.orderBy('captured_at_day')
.show(n=31)
)
+---------------+---------+
|captured_at_day|num_games|
+---------------+---------+
| 2020-07-01| 1711|
| 2020-07-02| 1704|
| 2020-07-03| 1669|
| 2020-07-04| 1759|
| 2020-07-05| 1706|
| 2020-07-06| 1667|
| 2020-07-07| 1722|
| 2020-07-08| 1758|
| 2020-07-09| 1759|
| 2020-07-10| 1779|
| 2020-07-11| 1764|
| 2020-07-12| 1723|
| 2020-07-13| 1672|
| 2020-07-14| 1717|
| 2020-07-15| 1683|
| 2020-07-16| 1647|
| 2020-07-17| 1617|
| 2020-07-18| 1696|
| 2020-07-19| 1669|
| 2020-07-20| 1584|
| 2020-07-21| 1623|
| 2020-07-22| 1666|
| 2020-07-23| 1653|
| 2020-07-24| 1731|
| 2020-07-25| 1757|
| 2020-07-26| 1713|
| 2020-07-27| 1631|
| 2020-07-28| 1606|
| 2020-07-29| 1632|
| 2020-07-30| 1629|
| 2020-07-31| 1631|
+---------------+---------+
Number of unique games for 1st - 5th July 2020 are the same as we have calculated before using command-line tools.
Number of unique games per month also returns the same result of 6995 unique games as we calculated earlier using command-line tools.
(
df_h_streams
.groupBy('captured_at_month')
.agg(F.countDistinct('game_id').alias('num_games'))
.orderBy('captured_at_month')
.show()
)
+-----------------+---------+
|captured_at_month|num_games|
+-----------------+---------+
| 2020-07-01| 6995|
+-----------------+---------+
So far our PySpark queries return the same results as our command-line based scripts, and we are getting results much faster.
That was way too easy. While we are looking at a number of streams and games, let’s see which channels are streamed the most number of various categories (gaming and non-gaming). To answer this, we will adjust our initial query that prepared dataframe df_h_streams
to include user_name
:
(
df_streams
.withColumn('captured_at', F.col('captured_at').cast('Timestamp'))
.withColumn('id', F.col('id').cast('Long')) # casting this to INT causes 1st July streams == 10518
.withColumn('game_id', F.col('game_id').cast('Long'))
.withColumn('captured_at_month', F.date_trunc('month', F.col('captured_at')).cast('Date'))
.groupBy(['captured_at_month', 'user_name'])
.agg(F.countDistinct('game_id').alias('num_games'))
.orderBy('num_games', ascending=False)
.show(n=10)
)
The query executed for 59 seconds according to Spark Web UI.
The result shows that several speedrunner events are in the Top-10 of our variety rating. I have learned that European Speedrunner Assembly event ESA Summer Online 2020 and Blast the Process 4 (all things SEGA marathon) both took place in July 2020. Good to see to that solo streamers such as NorkDorf, Slayproxx, and Xop0 made it to the Top-10 too.
+-----------------+----------------+---------+
|captured_at_month| user_name|num_games|
+-----------------+----------------+---------+
| 2020-07-01| ESAMarathon| 374|
| 2020-07-01|SpeedrunsEspanol| 132|
| 2020-07-01| NorkDorf| 106|
| 2020-07-01| RocketBeansTV| 104|
| 2020-07-01| GOGcom| 86|
| 2020-07-01| Yogscast| 75|
| 2020-07-01| Slayproxx| 69|
| 2020-07-01| Xop0| 68|
| 2020-07-01| RedragonBrasil| 64|
| 2020-07-01| BlastTheProcess| 63|
+-----------------+----------------+---------+
Most popular games and genres
To find most popular games we need to prepare another dataframe based on Twitch Helix Top Games endpoint.
df_h_tg = spark.read.json('/data/twitch/raw/helix/top_games/2020_07_*/**/*.gz')
For our analysis we are only interested in id
(that represents game id) and name
(game name):
df_h_tg = (
df_h_tg
.withColumn('id', F.col('id').cast('Long'))
.select(['id', 'name'])
.drop_duplicates()
.cache()
)
Let’s look at top games dataframe and trigger caching:
df_h_tg.describe().show()
Spark shows that the dataframe contains 22_775 distinct id-name pairs.
+-------+------------------+-----------------+
|summary| id| name|
+-------+------------------+-----------------+
| count| 22775| 22775|
| mean|261338.52250274425| Infinity|
| stddev| 237410.1778124698| NaN|
| min| 3| Crystal Story|
| max| 519014|奇迹一刻 Surmount|
+-------+------------------+-----------------+
Now that we have dataframes for streams and for games, we can join them to have game names in addition to game ids.
(
df_h_streams
.join(df_h_tg, on=[df_h_streams.game_id == df_h_tg.id])
.select(['captured_at_day', 'name', df_h_streams.id])
.groupBy(['captured_at_day', 'name'])
.agg(F.countDistinct('id').alias('num_streams'))
.where(F.col('num_streams') >= 350)
.orderBy(['captured_at_day', 'num_streams'], ascending=[True, False])
.show(n=30, truncate=False)
)
Since both dataframes contain column id
, our select
command mixes implicit column names as string and explicit column names with dataframe reference. Otherwise, Spark would raise an exception: AnalysisException: Reference 'id' is ambiguous, could be: id, id.;
Result of the query still shows that Just Chatting is much bigger than other gaming and non-gaming categories on Twitch. Spark query result matches the result that we got before with command-line tools.
+---------------+--------------------------------+-----------+
|captured_at_day|name |num_streams|
+---------------+--------------------------------+-----------+
|2020-07-01 |Just Chatting |5581 |
|2020-07-01 |Fortnite |2652 |
|2020-07-01 |Grand Theft Auto V |2077 |
|2020-07-01 |League of Legends |1764 |
|2020-07-01 |Call of Duty: Modern Warfare |1705 |
|2020-07-01 |VALORANT |1466 |
|2020-07-01 |Counter-Strike: Global Offensive|1022 |
|2020-07-01 |Minecraft |917 |
|2020-07-01 |World of Warcraft |835 |
|2020-07-01 |Apex Legends |732 |
|2020-07-01 |Music & Performing Arts |667 |
|2020-07-01 |Dead by Daylight |603 |
|2020-07-01 |FIFA 20 |467 |
|2020-07-01 |Tom Clancy's Rainbow Six: Siege |460 |
|2020-07-01 |Escape From Tarkov |440 |
|2020-07-01 |Overwatch |437 |
|2020-07-01 |Art |426 |
|2020-07-01 |Dota 2 |420 |
|2020-07-01 |PLAYERUNKNOWN'S BATTLEGROUNDS |358 |
|2020-07-01 |NBA 2K20 |351 |
|2020-07-02 |Just Chatting |5511 |
|2020-07-02 |Fortnite |2470 |
|2020-07-02 |Grand Theft Auto V |2106 |
|2020-07-02 |League of Legends |1768 |
|2020-07-02 |Call of Duty: Modern Warfare |1658 |
|2020-07-02 |VALORANT |1407 |
|2020-07-02 |Counter-Strike: Global Offensive|981 |
|2020-07-02 |Minecraft |922 |
|2020-07-02 |World of Warcraft |762 |
|2020-07-02 |Apex Legends |710 |
+---------------+--------------------------------+-----------+
Most popular genres
To analyse most popular genres we should do two things: first is to prepare Giantbomb dataframe, because it is the only source of genre information. And second, is to switch Top games dataframe from Helix to V5 version of Twitch API V5, which contains a mapping between Twitch Game IDs and Giantbomb Game IDs.
df_gb = spark.read.json('/data/giantbomb/raw/game/2020-07*/**/*.gz')
From Giantbomb dataset we are only interested in giantbomb_id
(to join on Twitch datasets) and genre
.
df_gb = df_gb.select(['giantbomb_id', 'genre']).drop_duplicates().cache()
Then running df_gb.count()
returns 7083 unique combinations of giantbomb_id
and genre
(and also caches the dataframe).
Next we prepare Top Games dataframe based on Twitch V5 API data with giantbomb_id
.
df_v5_tg = spark.read.json('/data/twitch/raw/v5/top_games_sdc/2020-07*/**/*.gz')
Looking at the schema of this dataframe we see that Spark correctly assigned datatype Long
to columns game__id
and game_giantbomb_id
:
root
|-- captured_at: long (nullable = true)
|-- channels: long (nullable = true)
|-- game__id: long (nullable = true)
|-- game_giantbomb_id: long (nullable = true)
|-- game_locale: string (nullable = true)
|-- game_localized_name: string (nullable = true)
|-- game_name: string (nullable = true)
|-- viewers: long (nullable = true)
Let’s prepare Top Games dataframes by renaming some columns to remove extra underscores and unnecessary prefixes and filtering records that do not have valid giantbomb_id
(Column game_giantbomb_id
should be higher than 0)
df_v5_tg = (
df_v5_tg
.withColumnRenamed('game__id', 'game_id')
.withColumnRenamed('game_giantbomb_id', 'giantbomb_id')
.select(['game_id', 'giantbomb_id', 'game_name'])
.where(F.col('giantbomb_id') > 0)
.drop_duplicates()
.cache()
)
When both dataframes are ready, we will follow the same logic as before: create a mapping between game ids and genres and then join this mapping with streams.
df_genres = (
df_v5_tg
.join(df_gb, 'giantbomb_id')
.select(['game_id', 'genre'])
.drop_duplicates()
)
And then join dataframe with genres to the dataframe with streams and calculate number of unique streams by genre.
(
df_h_streams
.join(df_genres, on='game_id')
.select(['captured_at_day', 'genre', 'id'])
.groupBy(['captured_at_day', 'genre'])
.agg(F.countDistinct('id').alias('num_streams'))
.where(F.col('num_streams') > 350)
.orderBy(['captured_at_day', 'num_streams'], ascending=[True, False])
.show()
)
The result shows the same genres, and in the same ranking order, but numbers are different. At this point I did a bit of debugging to test whether Spark dataframes and parts of the command-line pipeline return the same counts and they do. Since I have spent more time fighting with command-line tools (especially, sort
) and they are more complicated that Spark queries, my assumption is that something works differently on command-line.
+---------------+--------------------+-----------+
|captured_at_day| genre|num_streams|
+---------------+--------------------+-----------+
| 2020-07-01|First-Person Shooter| 6010|
| 2020-07-01| Action| 3268|
| 2020-07-01| Shooter| 3224|
| 2020-07-01| Driving/Racing| 3083|
| 2020-07-01| MOBA| 2420|
| 2020-07-01| Role-Playing| 2154|
| 2020-07-01| Action-Adventure| 1995|
| 2020-07-01| MMORPG| 1877|
| 2020-07-01| Simulation| 1648|
| 2020-07-01| Strategy| 1061|
| 2020-07-01| Card Game| 855|
| 2020-07-01| Music/Rhythm| 800|
| 2020-07-01| Adventure| 724|
| 2020-07-01| Soccer| 505|
| 2020-07-01| Fighting| 486|
| 2020-07-02|First-Person Shooter| 6359|
| 2020-07-02| Action| 3261|
| 2020-07-02| Driving/Racing| 3087|
| 2020-07-02| Shooter| 3048|
| 2020-07-02| MOBA| 2421|
+---------------+--------------------+-----------+
Summary
In this part we have used PySpark in local mode to perform ad hoc analysis on Twitch and Giantbomb datasets. Even though Spark has support for multiple interfaces (Scala, Python, SQL, R) and it is possible to mix, for example, Python and SQL in the same analysis, we focused on using as much Python as possible.
All used datasets are stored in JSON format, while on command-line we had to introduce a special utility to work with JSON (called jq
), Spark makes it very easy to load data in JSON or other supported formats. Spark focus on analytical use cases is also reflected in ease of query authoring: many built-in command for data transformation and computations, and code readability. Even though Spark can be used on command-line, developer experience is much better with Jupyter notebook. Many people think that to operate Spark they need a distributed filesystem and Spark cluster, but this is unnecessary for small datasets. We just used Spark with local filesystem and in local mode (all computations happened on a single machine).
Knowledge base
Jupyter Docker Stacks - Docker Images with Jupyter notebooks and other tools for analytics.