Do you want to understand Spark execution life cycle and understand different terms such as executor, executor tasks, driver program and more?
Develop Spark Application – Get Monthly Product Revenue
Build and Deploy
Local mode vs. YARN mode
Quick walk through of Spark UI
YARN deployment modes
Spark Execution Cycle
Develop Spark Application – Get Monthly Product Revenue
Let us start with details with respect to problem statement, design and then implementation.
Problem Statement
Using retail db dataset, we need to compute Monthly Product Revenue for given month.
We need to consider only completed and closed orders to compute revenue.
Also we need to consider only those transactions for a given month passed as argument.
We need to sort the data in descending order by revenue while saving the output to HDFS Path.
Design
Let us see the design for the given Problem Statement.
Filter for orders which fall in the month passed as the argument
Join filtered orders and order_items to get order_item details for a given month
Get revenue for each product_id
We need to read products from the local file system
Convert into RDD and extract product_id and product_name
Join it with aggregated order_items (product_id, revenue)
Get product_name and revenue for each product
Development
Let us create a new project and develop the logic.
Setup Project
Scala Version: 2.11.8 (on windows, latest of 2.11 in other environments)
sbt Version: 0.13.x
JDK: 1.8.x
Project Name: SparkDemo
Update build.sbt
typesafe config
Spark Core
Update application.properties
Develop Logic to compute revenue per product for given month.
Once the project is setup we can launch Scala REPL with Spark as well as typesafe config dependencies using sbt console
Once we get the logic, we can update as part of Program called GetMonthlyProductRevenue
Using sbt Console
As part of this topic, we will see how to access sbt console and use it for exploring Spark based APIs.
Go to the working directory of the project.
Run sbt console
We should be able to use typesafe config APIs as well as Spark APIs.
Create Spark Conf and Spark Context objects
[gist]b663b57eb56d3eb79237060f4fefc192[/gist]
Hadoop Configuration
Let us see how we can access Hadoop Configuration.
Spark uses HDFS APIs to read files from supported file systems.
As part of Spark dependencies, we get HDFS APIs as well.
We can get Hadoop Configuration using sc.hadoopConfiguration
Using it, we will be able to create FileSystem Object. It will explose APIs such as exists, delete etc.
We can use those to validate as well as manage input and/or output directories.
[gist]93322e0415e32a50b70377c55205db8b[/gist]
Read and Filter Orders
As we are able to create Spark Context, now let us actually read and manipulate data from orders.
Read data from orders
Use filter and validate for COMPLETE or CLOSED as well as passed month
Use map to extract order_id and hard coded value 1 so that we can use it to join later.
[gist]b8db54b5d4275e7ea02ac47b04012afa[/gist]
Join Orders and Order Items
Now let us join order_items with orders and get product_id and order_item_subtotal.
Read data from order_items
Extract order_id, product_id and order_item_subtotal as a tuple.
First element is order_id and second element is nested tuple which contain product_id and order_item_subtotal.
Join the data set with orders filtered using order_id as key.
It will generate RDD of tuples – (order_id, ((product_id, order_item_subtotal), 1))
[gist]bc811e368c72fe6f6df02a744c4a705b[/gist]
Compute Revenue per Product id
Now we can extract product_id and order_item_subtotal and compute revenue for each product_id.
We can discard order_id and 1 from the join ouput.
We can use map and get the required information – product_id and order_item_subtotal.
Using reduceByKey, we should be able to compute revenue for each product_id.
[gist]0317a8e18be6da94bdd261123da8a977[/gist]
Join Products and Get Product Name
Update Application
Here is the complete working code. We can copy paste the code to IntelliJ and then validate locally before building the jar file.
[gist]42e33baacbdc32bfaa09ebe6412a76a3[/gist]
Build and Deploy
Once the development is done, we can build the jar file and run it on cluster.
Go to the project working directory and run sbt package to build jar file.
We can validate jar file in the local environment using spark-submit
As we have used 3rd party plugin to externalize properties we need to pass it using –jars or –packages.
–jars can be used to point to the local jar file using fully qualified path, we can pass multiple jars using comma as separator.
–packages can be used to download the jar file from repositorites over internet. We need to pass group id, artifact id and version using colon as delimiter.
We can ship the jar to the gateway node on the cluster and run using spark-submit command.
In typical environments these commands are used to schedule using enterprise scheduling tools such as Azkaban, Air Flow, Control M, Cron Job etc.
Validate Locally
Run on Cluster
[gist]5dcfb59d35b79a6d291112d56ab04a3a[/gist]
Local Mode vs. YARN Mode
Let us understand different modes using which we deploy applications.
Local Mode
Stand Alone Mode (Spark Native)
Mesos Mode
YARN Mode
Local and Stand Alone is used for development and exploratory purposes. In Production we either use Mesos or YARN.
Most of the Big Data distributions such as Cloudera, Hortonworks etc have eco system of tools such as Kafka, Hadoop etc along with Spark.
They are typically deployed using YARN, while Spark exclusive clusters typically use Mesos.
As our clusters are either built using Hortonworks or Cloudera Distribution, we tend to run our Spark Jobs using YARN mode.
We can make YARN default while setting up clusters. However, if the cluster is configured to use local mode, then we can overwrite it by using –master as part of spark-submit or spark-shell
Quick walkthrough of Spark UI
Let us walk through the details about Spark UI.
While the job is running it is served by Application Master using Resource Manager Web Interface as Proxy.
There are several important tabs which we should be familiar with
Executors
Jobs
Stages
Environment
Storage
It is very important to spend some time and understand all these different components of Spark UI.
YARN Deployment Modes
We can submit Spark application in 2 deployment modes.
Client Mode – Driver Program will be running in the node on which job is submitted – i.e., Gateway Node.
Cluster Mode – Driver Program will be running as part of the application master – i.e., one of the worker nodes in the cluster.
We can control the mode by using –deploy-mode control argument as part of spark-submit or spark-shell.
We submit the job for the client. The JVM typically acts as the Driver Program.
It will talk to the Resource Manager and create the Application Master.
Application Master will talk to Worker Nodes on which Node Managers are running and provision resources based on Allocation Settings. Allocation can be either static or dynamic.
These resources are nothing but Executors. From YARN perspective they are Containers.
The Executor is nothing but JVM which can run multiple concurrent threads until the Job is complete
Here are the different components that are used as part of execution.