Data Frame Operations – Analytics Functions or Windowing Functions

As part of this session, we will see advanced operations such as aggregations, ranking and windowing functions within each group using APIs such as over, partitionBy etc. We will also build a solution for problem and run it on multinode cluster.

  • Aggregations, Ranking and Windowing Functions – APIs
  • Problem Statement – Get top n products per day
  • Creating Window Spec
  • Performing aggregations
  • Using windowing functions
  • Ranking within each partition or group
  • Development Life Cycle

Aggregations, Ranking and Windowing Functions – APIs

Let us understand APIs related to aggregations, ranking and windowing functions.

  • Main package pyspark.sql.window
  • It has classes such as Window and WindowSpec
  • Window have APIs such as partitionBy, orderBy etc
  • It return WindowSpec object. We can pass WindowSpec object to over on functions such as rank(), dense_rank(), sum() etc
  • e.g.: rank().over(spec) where spec = Window.partitionBy(‘ColumnName’)
  • Aggregations – sum, avg, min, max etc
  • Ranking – rank, dense_rank, row_number etc
  • Windowing – lead, lag etc

Problem Statement – Get top n products per day

Let us define the problem statement and see the real usage of analytics function.

  • Problem Statement – Get top N Products Per day
  • Get daily product revenue code from previous topic
  • Use ranking functions and get the rank associated based on revenue for each day
  • Once we get rank, let us filter for top n products.

Creating Window Spec

Let us see how to create Window Spec.

  • Window have APIs such as partitionBy, orderBy
  • For aggregations we can define group by using partitionBy
  • For ranking or windowing we need to use partitionBy and then orderBy. partitionBy is to group the data and orderBy is to sort the data to assign rank.
  • partitionBy or orderBy returns WindowSpec object
  • WindowSpec object need to be passed to over with ranking and aggregate functions.

Performing aggregations

Let us see how to perform aggregations with in each group.

  • We have functions such as sum, avg, min, max etc which can be used to aggregate the data.
  • We need to create WindowSpec object using partitionBy to get aggregations with in each group.
  • Some realistic use cases
    • Get average salary for each department and get all employee details who earn more than average salary
    • Get average revenue for each day and get all the orders who earn revenue more than average revenue
    • Get highest order revenue and get all the orders which have revenue more than 75% of the revenue

Using windowing functions

Let us see details about windowing functions with in each group

  • We have functions such as lead, lag etc
  • We need to create WindowSpec object using partitionBy and then orderBy for most of the windowing functions
  • Some realistic use cases
    • Salary difference between current and next/previous employee with in each department

Ranking with in each partition or group

Let us talk about ranking functions with in each group.

  • We have functions like rank, dense_rank, row_number, first, last etc
  • We need to create WindowSpec object using partitionBy and then orderBy for most of the ranking functions
  • Some realistic use cases
    • Assign rank to employees based on salary with in each department
    • Assign ranks to products based on revenue each day or month

Development Life Cycle

Let us talk about development life cycle.

  • Take the DailyProductRevenue code which gives us order_date, order_item_product_id and revenue
  • Import Window and create spec to partition by date and order by revenue in descending order.
  • Use withColumn and assign rank
  • Filter data where rank is less than or equal to topN passed as argument to the program
  • Drop rank field as we do not want to save the data and then sort in ascending order by date and descending order by revenue
  • Save the data frame into file