As part of this session we will understand what is Data Frames, how data frames can be created from (text) files, hive tables, relational databases using JDBC etc. We will also understand how data frame can be registered as in memory table/view and run SQL on top of it as well as some of the important functions that can be used to manipulate data as part of data frame operations.
Data Frames – Overview
Reading text data from files
Reading data from hive
Reading data from MySQL over JDBC
Data Frame Operations – Overview
Spark SQL – Overview
Functions to manipulate data
Data Frames – Overview
Data Frames is nothing but RDD with structure.
Data Frame can be created on any data set which have structure associated with it.
Attributes/columns in a data frame can be referred using names.
One can create data frame using data from files, hive tables, relational tables over JDBC.
Common functions on Data Frames
printSchema – to print the column names and data types of data frame
show – to preview data (default 20 records)
describe – to understand characteristics of data
count – to get number of records
collect – to convert data frame into Array
Once data frame is created, we can process data using 2 approaches.
Native Data Frame APIs
Register as temp table and run queries using spark.sql
To work with Data Frames as well as Spark SQL, we need to create object of type SparkSession
Once the SparkSession object is created we can use APIs under spark.read to create data frame or use spark.sql to run queries on hive tables or temp tables.
Reading text data from files
Let us see how we can read text data from files into data frame. spark.read also have APIs for other types of file formats, but we will get into those details later.
We can use spark.read.csv or spark.read.text to read text data.
spark.read.csv can be used for comma separated data. Default field names will be in the form of _c0, _c1 etc
spark.read.text can be used to read fixed length data where there is no delimiter. Default field name is value.
We can define attribute names using toDF function
In either of the case data will be represented as strings
We can covert data types by using cast function – df.select(df.field.cast(IntegerType()))
We will see all other functions soon, but let us perform the task of reading the data into data frame and represent it in their original format.
Reading data from hive
If Hive and Spark are integrated, we can create data frames from data in Hive tables or run Spark SQL queries against it.
We can use spark.read.table to read data from Hive tables into Data Frame
We can prefix database name to table name while reading Hive tables into Data Frame
We can also run Hive queries directly using spark.sql
Both spark.read.table and spark.sql returns Data Frame
Reading data from MySQL over JDBC
Spark also facilitate us to read data from relational databases over JDBC.
We need to make sure jdbc jar file is registered using --packages or --jars and --driver-class-path while launching pyspark
In Pycharm, we need to copy relevant jdbc jar file to SPARK_HOME/jars
We can either use spark.read.format(‘jdbc’) with options or spark.read.jdbc with jdbc url, table name and other properties as dict to read data from remote relational databases.
We can pass a table name or query to read data using JDBC into Data Frame
While reading data, we can define number of partitions (using numPartitions), criteria to divide data into partitions (partitionColumn, lowerBound, upperBound)
Partitioning can be done only on numeric fields
If lowerBound and upperBound is specified, it will generate strides depending up on number of partitions and then process entire data. Here is the example
We are trying to read order_items data with 4 as numPartitions
partitionColumn – order_item_order_id
lowerBound – 10000
upperBound – 20000
order_item_order_id is in the range of 1 and 68883
But as we define lowerBound as 10000 and upperBound as 20000, here will be strides – 1 to 12499, 12500 to 14999, 15000 to 17499, 17500 to maximum of order_item_order_id
You can check the data in the output path mentioned
Data Frame Operations – Overview
Let us see overview about Data Frame Operations. It is one of the 2 ways we can process Data Frames.
Selection or Projection – select
Filtering data – filter or where
Joins – join (supports outer join as well)
Aggregations – groupBy and agg with support of functions such as sum, avg, min, max etc
Sorting – sort or orderBy
Analytics Functions – aggregations, ranking and windowing functions
Spark SQL – Overview
We can also use Spark SQL to process data in data frames.
We can get list of tables by using spark.sql('show tables')
We can register data frame as temporary view df.createTempView("view_name")
Output of show tables show the temporary tables as well
Once temp view is created, we can use SQL style syntax and run queries against the tables/views
Most of the hive queries will work out of the box
Functions to manipulate data
Let us quickly look into some of the functions available in Data Frames.
Main package for functions pyspark.sql.functions
We can import by saying from pyspark.sql import functions as sf
You will see many functions which are similar to the functions in traditional databases.