As part of this session, we will see basic transformations we can perform on top of Data Frames such as filtering, aggregations, joins etc. We will build end to end application by taking a simple problem statement.
Data Frame Operations – APIs
Problem Statement – Get daily product revenue
Projecting data using select, withColumn and selectExpr
Filtering data using where or filter
Joining Data Sets
Grouping data and performing aggregations
Sorting data
Development Life Cycle
Data Frame Operations – APIs
Let us recap about Data Frame Operations. It is one of the 2 ways we can process Data Frames.
Selection or Projection – select
Filtering data – filter or where
Joins – join (supports outer join as well)
Aggregations – groupBy and agg with support of functions such as sum, avg, min, max etc
Sorting – sort or orderBy
Analytics Functions – aggregations, ranking and windowing functions
Problem Statement – Get daily product revenue
Here is the problem statement for which we will be exploring Data Frame APIs to come up with final solution.
Apply type cast functions to convert fields into their original type where ever is applicable.
Projecting data using select, withColumn and selectExpr
Now let us see how we can project data the way we want using select.
Python classes are dynamic. It means we can change the structure of the class at run time.
In this case both orders and orderItems are of type DataFrame, but in orders we will be able to access its attributes and in orderItems we will be able to access its attributes (e. g.: orders.order_id and orderItems.order_item_id)
We can use select and fetch data from the fields we are looking for.
We can represent data using DataFrame.ColumnName or directly ‘ColumnName’ in select clause – e.g.: orders.select(orders.order_id, orders.order_date) and orders.select('order_id', 'order_date')
We can apply necessary functions to manipulate data while it is being projected – orders.select(substring('order_date', 1, 7)).show()
We can give aliases to the derived fields using alias function – orders.select(substring('order_date', 1, 7).alias('order_month')).show()
If we want to add new field derived from existing fields we can use withColumn function. First argument is alias and 2nd argument is data processing logic – orders.withColumn('order_month', substring('order_date', 1, 7).alias('order_month')).show()
Filtering data using where or filter
Data Frame have 2 APIs to filter the data, where and filter. They are just synonyms and you can use either of them for filtering.
You can use filter or where in 2 ways
One by using class.attributeName and comparing with values – e. g.: orders.where(orders.order_status == 'COMPLETE').show()
Other by passing conditions as literals – e. g.: orders.where('order_status = "COMPLETE"').show()
Make sure both orders and orderItems data frames are created
Let us see few more examples
Get orders which are either COMPLETE or CLOSED
Get orders which are either COMPLETE or CLOSED and placed in month of 2013 August
Get order items where order_item_subtotal is not equal to product of order_item_quantity and order_item_product_price
Get all the orders which are placed on first of every month
Joining Data Sets
Quite often we need to deal with multiple data sets which are related with each other.
We need to first understand the relationship with respect to data sets
All our data sets have relationships defined between them.
orders and order_items are transaction tables. orders is parent and order_items is child. Relationship is established between the two using order_id (in order_items, it is represented as order_item_order_id)
We also have product catalog normalized into 3 tables – products, categories and departments (with relationships established in that order)
We also have customers table
There is relationship between customers and orders – customers is parent data set as one customer can place multiple orders.
There is relationship between product catalog and order_items via products – products is parent data set as one product can be ordered as part of multiple order_items.
Determine the type of join – inner or outer (left or right or full)
Data Frames have an API called join to perform joins
We can make the join outer by passing additional argument
Let us see few examples
Get all the order items corresponding to COMPLETE or CLOSED orders
Get all the orders where there are no corresponding order_items
Check if there are any order_items where there is no corresponding order in orders data set
Grouping data and performing aggregations
Many times we want to perform aggregations such as sum, average, minimum, maximum etc with in each group. We need to first group the data and then perform aggregation.
groupBy is the function which can be used to group the data on one or more columns
Once data is grouped we can perform all supported aggregations – sum, avg, min, max etc
We can invoke the functions directly or as part of agg
agg gives us more flexibility to give aliases to the derived fields
Let us see few examples
Get count by status from orders
Get revenue for each order id from order items
Get daily product revenue (order_date and order_item_product_id are part of keys, order_item_subtotal is used for aggregation)
Sorting data
Now let us see how we can sort the data using sort or orderBy.
sort or orderBy can be used to sort the data
We can perform composite sorting by using multiple fields
By default data will be sorted in ascending order
We can change the order by using desc function
Let us see few examples
Sort orders by status
Sort orders by date and then by status
Sort order items by order_item_order_id and order_item_subtotal descending
Take daily product revenue data and sort in ascending order by date and then descending order by revenue.
Development Life Cycle
Let us develop the application using Pycharm and run it on the cluster.
Make sure application.properties have required input path and output path along with execution mode
Read orders and order_items data into data frames
Filter for complete and closed orders
Join with order_items
Aggregate to get revenue for each order_date and order_item_product_id
Sort in ascending order by date and then descending order by revenue
Save the output as CSV format
Validate using Pycharm
Ship it to the cluster, run it on the cluster and validate.
Exercises
Try to develop programs for these exercises
Get number of closed or complete orders placed by each customer
Get revenue generated by each customer for the month of 2014 January (consider only closed or complete orders)
Get revenue generated by each product on monthly basis – get product name, month and revenue generated by each product (round off revenue to 2 decimals)
Get revenue generated by each product category on daily basis – get category name, date and revenue generated by each category (round off revenue to 2 decimals)
Get the details of the customers who never placed orders