Spark Development and Execution life cycle

Do you want to understand Spark execution life cycle and understand different terms such as executor, executor tasks, driver program and more?

  • Develop Spark Application – Get Monthly Product Revenue
  • Build and Deploy
  • Local mode vs. YARN mode
  • Quick walk through of Spark UI
  • YARN deployment modes
  • Spark Execution Cycle

Develop Spark Application – Get Monthly Product Revenue

Let us start with details with respect to problem statement, design and then implementation.

Problem Statement

Using retail db dataset, we need to compute Monthly Product Revenue for given month.

  • We need to consider only completed and closed orders to compute revenue.
  • Also we need to consider only those transactions for a given month passed as argument.
  • We need to sort the data in descending order by revenue while saving the output to HDFS Path.

Design

Let us see the design for the given Problem Statement.

  • Filter for orders which fall in the month passed as the argument
  • Join filtered orders and order_items to get order_item details for a given month
  • Get revenue for each product_id
  • We need to read products from the local file system
  • Convert into RDD and extract product_id and product_name
  • Join it with aggregated order_items (product_id, revenue)
  • Get product_name and revenue for each product

Development

Let us create a new project and develop the logic.

  • Setup Project
    • Scala Version: 2.11.8 (on windows, latest of 2.11 in other environments)
    • sbt Version: 0.13.x
    • JDK: 1.8.x
    • Project Name: SparkDemo
  • Update build.sbt
    • typesafe config
    • Spark Core
  • Update application.properties
  • Develop Logic to compute revenue per product for given month.
  • Once the project is setup we can launch Scala REPL with Spark as well as typesafe config dependencies using sbt console
  • Once we get the logic, we can update as part of Program called GetMonthlyProductRevenue
Using sbt Console

As part of this topic, we will see how to access sbt console and use it for exploring Spark based APIs.

  • Go to the working directory of the project.
  • Run sbt console
  • We should be able to use typesafe config APIs as well as Spark APIs.
  • Create Spark Conf and Spark Context objects
[gist]b663b57eb56d3eb79237060f4fefc192[/gist]
Hadoop Configuration

Let us see how we can access Hadoop Configuration.

  • Spark uses HDFS APIs to read files from supported file systems.
  • As part of Spark dependencies, we get HDFS APIs as well.
  • We can get Hadoop Configuration using sc.hadoopConfiguration
  • Using it, we will be able to create FileSystem Object. It will explose APIs such as exists, delete etc.
  • We can use those to validate as well as manage input and/or output directories.
[gist]93322e0415e32a50b70377c55205db8b[/gist]
Read and Filter Orders

As we are able to create Spark Context, now let us actually read and manipulate data from orders.

  • Read data from orders
  • Use filter and validate for COMPLETE or CLOSED as well as passed month
  • Use map to extract order_id and hard coded value 1 so that we can use it to join later.
[gist]b8db54b5d4275e7ea02ac47b04012afa[/gist]
Join Orders and Order Items

Now let us join order_items with orders and get product_id and order_item_subtotal.

  • Read data from order_items
  • Extract order_id, product_id and order_item_subtotal as a tuple.
  • First element is order_id and second element is nested tuple which contain product_id and order_item_subtotal.
  • Join the data set with orders filtered using order_id as key.
  • It will generate RDD of tuples – (order_id, ((product_id, order_item_subtotal), 1))
[gist]bc811e368c72fe6f6df02a744c4a705b[/gist]
Compute Revenue per Product id

Now we can extract product_id and order_item_subtotal and compute revenue for each product_id.

  • We can discard order_id and 1 from the join ouput.
  • We can use map and get the required information – product_id and order_item_subtotal.
  • Using reduceByKey, we should be able to compute revenue for each product_id.
[gist]0317a8e18be6da94bdd261123da8a977[/gist]

Join Products and Get Product Name

Update Application

Here is the complete working code. We can copy paste the code to IntelliJ and then validate locally before building the jar file.

[gist]42e33baacbdc32bfaa09ebe6412a76a3[/gist]

Build and Deploy

Once the development is done, we can build the jar file and run it on cluster.

  • Go to the project working directory and run sbt package to build jar file.
  • We can validate jar file in the local environment using spark-submit
  • As we have used 3rd party plugin to externalize properties we need to pass it using –jars or –packages.
  • –jars can be used to point to the local jar file using fully qualified path, we can pass multiple jars using comma as separator.
  • –packages can be used to download the jar file from repositorites over internet. We need to pass group id, artifact id and version using colon as delimiter.
  • We can ship the jar to the gateway node on the cluster and run using spark-submit command.
  • In typical environments these commands are used to schedule using enterprise scheduling tools such as Azkaban, Air Flow, Control M, Cron Job etc.

Validate Locally

Run on Cluster

[gist]5dcfb59d35b79a6d291112d56ab04a3a[/gist]

Local Mode vs. YARN Mode

Let us understand different modes using which we deploy applications.

  • Local Mode
  • Stand Alone Mode (Spark Native)
  • Mesos Mode
  • YARN Mode

Local and Stand Alone is used for development and exploratory purposes. In Production we either use Mesos or YARN.

  • Most of the Big Data distributions such as Cloudera, Hortonworks etc have eco system of tools such as Kafka, Hadoop etc along with Spark.
  • They are typically deployed using YARN, while Spark exclusive clusters typically use Mesos.
  • As our clusters are either built using Hortonworks or Cloudera Distribution, we tend to run our Spark Jobs using YARN mode.
  • We can make YARN default while setting up clusters. However, if the cluster is configured to use local mode, then we can overwrite it by using –master as part of spark-submit or spark-shell

Quick walkthrough of Spark UI

Let us walk through the details about Spark UI.

  • While the job is running it is served by Application Master using Resource Manager Web Interface as Proxy.
  • There are several important tabs which we should be familiar with
    • Executors
    • Jobs
    • Stages
    • Environment
    • Storage
  • It is very important to spend some time and understand all these different components of Spark UI.

YARN Deployment Modes

We can submit Spark application in 2 deployment modes.

  • Client Mode – Driver Program will be running in the node on which job is submitted – i.e., Gateway Node.
  • Cluster Mode – Driver Program will be running as part of the application master – i.e., one of the worker nodes in the cluster.
  • We can control the mode by using –deploy-mode control argument as part of spark-submit or spark-shell.

Spark Execution Cycle

Let us understand details about Spark Execution Cycle. You can review this using Spark Official Documentation.

  • We submit the job for the client. The JVM typically acts as the Driver Program.
  • It will talk to the Resource Manager and create the Application Master.
  • Application Master will talk to Worker Nodes on which Node Managers are running and provision resources based on Allocation Settings. Allocation can be either static or dynamic.
  • These resources are nothing but Executors. From YARN perspective they are Containers.
  • The Executor is nothing but JVM which can run multiple concurrent threads until the Job is complete
  • Here are the different components that are used as part of execution.
    • Driver Program
    • Spark Context
    • Executors
    • Executor Tasks
    • Job
    • Stages
    • Tasks