Creating Data Frames and Pre-Defined functions

As part of this session we will understand what is Data Frames, how data frames can be created from (text) files, hive tables, relational databases using JDBC etc. We will also understand how data frame can be registered as in memory table/view and run SQL on top of it as well as some of the important functions that can be used to manipulate data as part of data frame operations.

  • Data Frames – Overview
  • Reading text data from files
  • Reading data from hive
  • Reading data from MySQL over JDBC
  • Data Frame Operations – Overview
  • Spark SQL – Overview
  • Functions to manipulate data

Data Frames – Overview

Data Frames is nothing but RDD with structure.

  • Data Frame can be created on any data set which have structure associated with it.
  • Attributes/columns in a data frame can be referred using names.
  • One can create data frame using data from files, hive tables, relational tables over JDBC.
  • Common functions on Data Frames
    • printSchema – to print the column names and data types of data frame
    • show – to preview data (default 20 records)
    • describe – to understand characteristics of data
    • count – to get number of records
    • collect – to convert data frame into Array
  • Once data frame is created, we can process data using 2 approaches.
    • Native Data Frame APIs
    • Register as temp table and run queries using spark.sql
  • To work with Data Frames as well as Spark SQL, we need to create object of type SparkSession

  • Once the SparkSession object is created we can use APIs under spark.read to create data frame or use spark.sql to run queries on hive tables or temp tables.

Reading text data from files

Let us see how we can read text data from files into data frame. spark.read also have APIs for other types of file formats, but we will get into those details later.

  • We can use spark.read.csv or spark.read.text to read text data.
  • spark.read.csv can be used for comma separated data. Default field names will be in the form of _c0, _c1 etc
  • spark.read.text can be used to read fixed length data where there is no delimiter. Default field name is value.
  • We can define attribute names using toDF function
  • In either of the case data will be represented as strings
  • We can covert data types by using cast function – df.select(df.field.cast(IntegerType()))
  • We will see all other functions soon, but let us perform the task of reading the data into data frame and represent it in their original format.

Reading data from hive

If Hive and Spark are integrated, we can create data frames from data in Hive tables or run Spark SQL queries against it.

  • We can use spark.read.table to read data from Hive tables into Data Frame
  • We can prefix database name to table name while reading Hive tables into Data Frame
  • We can also run Hive queries directly using spark.sql
  • Both spark.read.table and spark.sql returns Data Frame

Reading data from MySQL over JDBC

Spark also facilitate us to read data from relational databases over JDBC.

  • We need to make sure jdbc jar file is registered using --packages or --jars and --driver-class-path while launching pyspark
  • In Pycharm, we need to copy relevant jdbc jar file to SPARK_HOME/jars
  • We can either use spark.read.format(‘jdbc’) with options or spark.read.jdbc with jdbc url, table name and other properties as dict to read data from remote relational databases.
  • We can pass a table name or query to read data using JDBC into Data Frame
  • While reading data, we can define number of partitions (using numPartitions), criteria to divide data into partitions (partitionColumn, lowerBound, upperBound)
  • Partitioning can be done only on numeric fields
  • If lowerBound and upperBound is specified, it will generate strides depending up on number of partitions and then process entire data. Here is the example
    • We are trying to read order_items data with 4 as numPartitions
    • partitionColumn – order_item_order_id
    • lowerBound – 10000
    • upperBound – 20000
    • order_item_order_id is in the range of 1 and 68883
    • But as we define lowerBound as 10000 and upperBound as 20000, here will be strides – 1 to 12499, 12500 to 14999, 15000 to 17499, 17500 to maximum of order_item_order_id
    • You can check the data in the output path mentioned

Data Frame Operations – Overview

Let us see overview about Data Frame Operations. It is one of the 2 ways we can process Data Frames.

  • Selection or Projection – select
  • Filtering data – filter or where
  • Joins – join (supports outer join as well)
  • Aggregations – groupBy and agg with support of functions such as sum, avg, min, max etc
  • Sorting – sort or orderBy
  • Analytics Functions – aggregations, ranking and windowing functions

Spark SQL – Overview

We can also use Spark SQL to process data in data frames.

  • We can get list of tables by using spark.sql('show tables')
  • We can register data frame as temporary view df.createTempView("view_name")
  • Output of show tables show the temporary tables as well
  • Once temp view is created, we can use SQL style syntax and run queries against the tables/views
  • Most of the hive queries will work out of the box

Functions to manipulate data

Let us quickly look into some of the functions available in Data Frames.

  • Main package for functions pyspark.sql.functions
  • We can import by saying from pyspark.sql import functions as sf
  • You will see many functions which are similar to the functions in traditional databases.
  • These can be categorized into
    • String manipulation
    • Date manipulation
    • Type casting
    • Expressions such as case when
  • We will see some of the functions in action
    • substring
    • lower, upper
    • trim
    • date_format
    • trunc
    • Type Casting
    • case when