Basic Transformations and Actions

Let us look into how we can perform basic transformations such as row-level transformations, aggregations, joins, sorting etc as part of this topic. We will also see a few details related to mapPartitions.

  • Overview of data processing life cycle – row level transformations -> shuffling -> joins/aggregations
  • Row-level transformations – map, flatMap
  • Filtering – filter
  • Aggregations – reduceByKey and aggregateByKey
  • Joins – performing inner joins and outer joins
  • Sorting data

Sum Of Even Numbers

Let us understand how we can convert a collection to RDD and perform RDD operations to process. As par of Spark Context we have an API called parallelize, which can be used to convert typical list into RDD. Similarly we can use collect API on RDD to convert into list.

  • Create list from 1 to 100000 using range – l = list(range(1, 100001))
  • Convert into RDD – lRDD = sc.parallelize(l)
  • Filter for even numbers – lEven = lRDD.filter(lambda n: n % 2 == 0)
  • Get sum of the even numbers – sumEven = lEven.reduce(lambda x, y: x + y)

Word Count

Let us develop word count program using pyspark. As part of this example we will see flatMap, map and reduce.

  • flatMap – convert a single record into multiple records based up on the logic. Number of records in output RDD will be greater than number of records in input RDD.
  • map – apply the transformation on individual records which will result in changed value. Number of records in both input RDD and output RDD will be same.
  • reduce – Generate aggregated result by processing data in input RDD. Typically it returns one value irrespective of number of records in input RDD. reduce is action and hence it will trigger execution of the DAG in the form of one or more jobs.
  • Problem Statement – For unique word in input file we need to get how many times it is repeated. Input file contain bunch of lines with words.
  • Design
    • Break each line into words (using flatMap). If you want to convert each record into multiple records based on logic we need to use flatMap API. flatMap take lambda function as argument for which we need to pass logic to break down input record into array and flatMap inbuilt logic will return each element in array as record.
    • As we broke each line into word, we need to convert them into tuples (using map). It will facilitate us to use by key operations such as reduceByKey.
    • Paired RDD (output of map function) can now be passed to reduceByKey and get count for each word.
    • Logic passed to as part of reduceByKey execute both on the map output as well as reduce input.

Shuffling

Let us understand the concept of Shuffling.

  • As we have seen a Spark job will run in multiple stages
  • Stages will run in linear fashion. For example Stage 1 will run only after Stage 0 is completely done
  • In each stage data will be processed using tasks
  • Output of stage 0 tasks will be passed as input to stage 1 tasks
  • When the output of tasks in earlier stages is passed as input to tasks in later stages, following happen
      • Data will be partitioned by using hash mod algorithm
      • Data related to keys will be grouped together
      • This data will be cached in memory and it might be spilled to disk as well.
      • Data related to a particular key from all tasks of earlier stages will be passed to one task in later stages.
      • This entire process is called shuffling
      • When certain APIs such as reduceByKey/aggregateByKey is used, it will also perform some thing called combining which can improve the performance significantly.
      • APIs such as join, reduceByKey, aggregateByKey, groupByKey etc result in shuffling.
  • Number of tasks in subsequent stages are determined by one of these
    • Number of partitions from earlier stage
    • numTasks or numPartitions argument as part of APIs that result in shuffling
    • repartition or coalesce (covered as part of next topic)
  • Accurate number of tasks can only be determined after understanding data behavior in detail. Here is some of the criteria
    • Ratio between input data vs. output (in case of filtering and aggregations output size will be considerably lower)
    • Keys on which data is shuffled (sparse keys vs. dense keys)
    • Joins and potential cartesian products
    • and more

Here are the examples of groupByKey, reduceByKey and aggregateByKey to understand the differences.

Get Daily Revenue

Let us develop revenue for each day considering completed orders. As part of this example we will explore distinct, filter, map, join, reduceByKey. We will also understand shuffling process.

  • Let us read orders data orders = sc.textFile("/public/retail_db/orders")
  • Let us read order items data  orderItems = sc.textFile("/public/retail_db/order_items")
  • As part of data analysis we will see what all different statuses we have in orders. For that we need to read orders data into RDD, extract order_status and apply distinct on top of it orders.split(",")[3].distinct
  • Once we understand the data, we will see how we can filter for completed orders using filter ordersFiltered = orders.filter(lambda o: o.split(",")[3] in ("COMPLETE", "CLOSED"))
  • filter – It creates new RDD for the records which satisfies the criteria passed as argument
  • join – It can be used to join multiple data sets on common key. It can be performed on the RDDs where each element in the form of (k, v) and (k, w). It result new RDD where each element is in the form of (k, (v, w)).
  • Typically we use map to transform both our input data sets into key value pairs.
    • orders data – ordersMap = ordersFiltered.map(lambda o: (int(o.split(",")[0]), o.split(",")[1]))
    • order items data – orderItemsMap = orderItems.map(lambda oi: (int(oi.split(",")[1]), float(oi.split(",")[4])))
    • Once data sets are in required format, joining is very straight forward
    • Key in both the data sets should have same business context (typically primary key in one table and foreign key in another table)
    • joining data – ordersJoin = ordersMap.join(orderItemsMap)
  • outer join – APIs also support outer joins (leftOuterJoin, rightOuterJoin and fullOuterJoin). left and right are functionally and purely based on the position of the parent data set (in our case it is orders which drives the outer join)
  • Get orders with no corresponding order items – ordersMap.leftOuterJoin(orderItemsMap).filter(lambda o: o[1][1] == None)
  • ordersJoin have the data which contain (order_id, (order_date, order_item_subtotal))
  • We don’t need order_id any more and hence we can apply map and eliminate order_id
  • Discarding order id from join results – ordersJoinMap = ordersJoin.map(lambda o: o[1])
  • As ordersJoinMap have date as key and item subtotal as value, we can use reduceByKey to get daily revenue. Even though we can achieve this using aggregateByKey and groupByKey, reduceByKey is more appropriate way. Here is the criteria
    • For aggregations we should prefer using groupByKey as there is no combiner involved
    • Combiner is the process of computing intermediate values
    • When logic to compute intermediate values and logic to compute final values are same, then we should use reduceByKey (eg: sum)
    • When logic to compute intermediate values and logic to compute final values are similar but not same, then we should use aggregateByKey
    • If you look at syntax, reduceByKey take only one argument where as aggregateByKey take 3 arguments (2 arguments are lambda functions – one to compute intermediate values and the other to compute final values)
    • reduceByKey take only one lambda function as argument.
    • 3rd argument in aggregate by key is for initialization. Type of that argument is determined by the output value type.
    • For our example (sum) – we need to use reduceByKey
  • daily revenue – dailyRevenue = ordersJoinMap.reduceByKey(lambda x, y: x + y)
  • Now we can sort the data, let us sort in ascending order by date – dailyRevenueSorted = dailyRevenue.sortByKey()
  • We can perform action such as saveAsTextFile to save the output. Typically we transform our data to the way it is supposed to be saved (e. g: Delimiters) – dailyRevenueSortedMap = dailyRevenueSorted.map(lambda oi: oi[0] + "," + str(oi[1]))
  • Saving output – dailyRevenueSorted.saveAsTextFile("/user/training/bootcamp/pyspark/daily_revenue")