Now let us understand about Accumulators and Broadcast Variables. They are also known as Shared Variables. Accumulators are primarily used as counters for sanity checks while broadcast variables are used for lookups. As part of this topic, we will also look into repartition and coalesce.
- Accessing HDFS APIs using sc in Python
- Validating input paths and output paths leveraging HDFS APIs
- Perform join between orders and order_items from HDFS
- Convert data from local file into RDD and then join to get Product Name
- Use accumulators to get number of orders and number of order items processed
- Running on cluster and check the accumulators as part of UI
- Develop alternative solution using Broadcast variables
- Read products from local file and create dict
- Change join on products to lookup using the broadcast variable of type dict
- Run on the cluster and check the behavior
- Understand the relevance and syntax of repartition as well as coalesce
In this session, we will develop a program using HDFS APIs and then add accumulators to it.
You will not be able to see accumulator details as part of Spark UI for pyspark applications. However, you can read the variables after performing the action within the program. We have struggled a bit in the video, but the code is updated.
Before going into shared variables such as Accumulators and Broadcast Variables, let us define a new problem statement and come up with the solution. With this, we will understand how we can use HDFS APIs as part of applications built using Pyspark.
- We have to use orders, order_items and products data set to compute revenue per product for a given month
- orders have order_id and order_date
- order_items have order_item_subtotal, order_item_order_id and order_item_product_id
- products have product_id and product_name
- orders and order_items are in HDFS and products is in local file system
- High level design
- Accept year and month as program argument (along with input path and output path)
- Filter for orders which fall in the month passed as argument
- Join filtered orders and order_items to get order_item details for a given month
- Get revenue for each product_id
- We need to read products from local file system
- Convert into RDD and extract product_id and product_name
- Join it with aggregated order_items (product_id, revenue)
- Get product_name and revenue for each product
- We will also check whether input path is valid and if output path exists we will delete it using HDFS APIs.
- Create new package by name retail
- Create Python program with name RevenuePerProductForMonth
- Run the spark job using spark-submit
Accumulators is a shared variable which can be used to implement counters with in the Spark application.
- It is important to perform some counts as part of the application for
- unit testing
- data quality
- These counters cannot be global variables as part of the program
- Instead we need to use accumulator which will be managed by spark
- Accumulators will be passed to executors and scope is managed across all the executors or executor tasks
- Accumulators can be used in any Spark APIs
- sc.accumulator() is the API to create accumulator
- In any Spark API, we can increment the accumulator
- As Python lambda functions directly cannot increment accumulator we will create a function which will be invoked as part of the Spark API such as map, filter etc.
- Take Revenue per product for given month program and add below accumulators
- To get number of orders for the month – ordersCount
- To get number of orderItems for the month – orderItemsCount
- Increment ordersCount as part of map function after filtering on the month
- Increment orderItemsCount as part of map function after join
- Print the results after action is performed. Until then no processing will be done over accumulators.
- Typically we preserve this information in database or log files so that we can keep track of this important information.
- Here are some of the known issues with accumulators
- Unless tasks are executed you will not see details about counters
- Spark guarantees accumulators to be updated only in first execution
- If any task is re-executed the results can be inconsistent
- The issue is prevalent in both transformations and actions
- You will not be able to see accumulator details as part of Spark UI for pyspark applications. However, you can read the variables after performing the action with in the program.
- We can run the job and see the details about accumulators as part of the logs in Spark UI/History UI
As part of this session, we will talk about Broadcast Variables along with repartition and coalesce.
Broadcast Variable is another type of shared variable which can be broadcasted into all the executors and can access at runtime by tasks while processing data. It is typically used to replace joins with lookups when a very large data set is joined with small data set which can fit into the memory of executor JVM.
- At times we need to pass (broadcast) some information to all the executors
- It can be done by using broadcast variables
- A broadcast variable can be of preliminary type or it could be a hash map
- Here are few examples
- Single value – Common discount percent for all the products
- Hash map – look up or map side join
- When very large data set (fact) is tried to join with smaller data set (dimension), broadcasting dimension can have considerable performance improvement.
- Broadcast variables are immutable
- We can read data from HDFS or local file system or even as configuration parameters
- Broadcast using broadcast method of Spark Context
- Let us take the example of Revenue per product for a given month
- Earlier we have read products from local file system, converted into RDD and then join with other RDD to get product name and revenue generated. Here is the DAG when the data is joined with out using broadcast variables
- Here is how DAG look like after broadcasting products from local file system. If we run this against considerable amount of data, one can feel the difference in the performance because of broadcast variables
- Here is the code for broadcast variables
- Here is the code for submitting the job
repartition and coalesce
Now let us understand how we can control number of tasks to process data after first stage.
- Each of the APIs which result in shuffling have additional argument numKeys or numPartitions
- We can use repartition to control the number of tasks or partitions in subsequent stages
- repartition result in shuffling process
- We can increase or decrease number of partitions in RDD using repartition
- coalesce can only be used to reduce number of partitions
- coalesce does not result in shuffling process
- Here is the example which covers coalesce and repartition (repartition will be slow in this case as it generate additional stage for shuffling data into new partitions)