Now let us understand about Accumulators and Broadcast Variables. They are also known as Shared Variables. Accumulators are primarily used as counters for sanity checks while broadcast variables are used for lookups. As part of this topic, we will also look into repartition and coalesce.
Accessing HDFS APIs using sc in Python
Validating input paths and output paths leveraging HDFS APIs
Perform join between orders and order_items from HDFS
Convert data from local file into RDD and then join to get Product Name
Use accumulators to get number of orders and number of order items processed
Running on cluster and check the accumulators as part of UI
Develop alternative solution using Broadcast variables
Read products from local file and create dict
Change join on products to lookup using the broadcast variable of type dict
Run on the cluster and check the behavior
Understand the relevance and syntax of repartition as well as coalesce
In this session, we will develop a program using HDFS APIs and then add accumulators to it.
You will not be able to see accumulator details as part of Spark UI for pyspark applications. However, you can read the variables after performing the action within the program. We have struggled a bit in the video, but the code is updated.
Problem Statement
Before going into shared variables such as Accumulators and Broadcast Variables, let us define a new problem statement and come up with the solution. With this, we will understand how we can use HDFS APIs as part of applications built using Pyspark.
https://www.youtube.com/watch?v=9YdxYQwqxvs
We have to use orders, order_items and products data set to compute revenue per product for a given month
orders have order_id and order_date
order_items have order_item_subtotal, order_item_order_id and order_item_product_id
products have product_id and product_name
orders and order_items are in HDFS and products is in local file system
High level design
Accept year and month as program argument (along with input path and output path)
Filter for orders which fall in the month passed as argument
Join filtered orders and order_items to get order_item details for a given month
Get revenue for each product_id
We need to read products from local file system
Convert into RDD and extract product_id and product_name
Join it with aggregated order_items (product_id, revenue)
Get product_name and revenue for each product
We will also check whether input path is valid and if output path exists we will delete it using HDFS APIs.
application.properties
Create new package by name retail
Create Python program with name RevenuePerProductForMonth
Accumulators is a shared variable which can be used to implement counters with in the Spark application.
https://www.youtube.com/watch?v=WB1WyPeoW54
It is important to perform some counts as part of the application for
unit testing
data quality
These counters cannot be global variables as part of the program
Instead we need to use accumulator which will be managed by spark
Accumulators will be passed to executors and scope is managed across all the executors or executor tasks
Accumulators can be used in any Spark APIs
sc.accumulator() is the API to create accumulator
In any Spark API, we can increment the accumulator
As Python lambda functions directly cannot increment accumulator we will create a function which will be invoked as part of the Spark API such as map, filter etc.
Take Revenue per product for given month program and add below accumulators
To get number of orders for the month – ordersCount
To get number of orderItems for the month – orderItemsCount
Increment ordersCount as part of map function after filtering on the month
Increment orderItemsCount as part of map function after join
Print the results after action is performed. Until then no processing will be done over accumulators.
Typically we preserve this information in database or log files so that we can keep track of this important information.
Here are some of the known issues with accumulators
Unless tasks are executed you will not see details about counters
Spark guarantees accumulators to be updated only in first execution
If any task is re-executed the results can be inconsistent
The issue is prevalent in both transformations and actions
You will not be able to see accumulator details as part of Spark UI for pyspark applications. However, you can read the variables after performing the action with in the program.
We can run the job and see the details about accumulators as part of the logs in Spark UI/History UI
As part of this session, we will talk about Broadcast Variables along with repartition and coalesce.
Broadcast Variables
Broadcast Variable is another type of shared variable which can be broadcasted into all the executors and can access at runtime by tasks while processing data. It is typically used to replace joins with lookups when a very large data set is joined with small data set which can fit into the memory of executor JVM.
At times we need to pass (broadcast) some information to all the executors
It can be done by using broadcast variables
A broadcast variable can be of preliminary type or it could be a hash map
Here are few examples
Single value – Common discount percent for all the products
Hash map – look up or map side join
When very large data set (fact) is tried to join with smaller data set (dimension), broadcasting dimension can have considerable performance improvement.
Broadcast variables are immutable
We can read data from HDFS or local file system or even as configuration parameters
Broadcast using broadcast method of Spark Context
Let us take the example of Revenue per product for a given month
Earlier we have read products from local file system, converted into RDD and then join with other RDD to get product name and revenue generated. Here is the DAG when the data is joined with out using broadcast variables
Here is how DAG look like after broadcasting products from local file system. If we run this against considerable amount of data, one can feel the difference in the performance because of broadcast variables
Here is the code for broadcast variables
Here is the code for submitting the job
repartition and coalesce
Now let us understand how we can control number of tasks to process data after first stage.
Each of the APIs which result in shuffling have additional argument numKeys or numPartitions
We can use repartition to control the number of tasks or partitions in subsequent stages
repartition result in shuffling process
We can increase or decrease number of partitions in RDD using repartition
coalesce can only be used to reduce number of partitions
coalesce does not result in shuffling process
Here is the example which covers coalesce and repartition (repartition will be slow in this case as it generate additional stage for shuffling data into new partitions)