Spark Data Frames and Data Sets – Overview of APIs

As we have basic knowledge of how to create Data Frames or Data Sets, now let us explore some key APIs to Create Data Frames Dynamically as well as to process the Data.

  • List of Important APIs
  • Creating Data Frame Dynamically
  • Data Frame Native Operations – Overview
  • Spark SQL – Overview
  • Saving Data Frames into Files – Overview

List of Important APIs

Let us explore the important packages which provide APIs to create, process as well as write Data Frames or Data Sets.

  • We have already seen spark.read to read the data
  • We also have write APIs on top of Data Frames which can be used to save Data Frame to underlying File System.
  • org.apache.spark.sql have several other APIs for different purposes.
    • org.apache.spark.sql.types for pre-defined types or to create schemas dynamically.
    • org.apache.spark.sql.functions for pre-defined functions
    • createTempView or createOrReplaceTempView on top of Data Frame to register it as in-memory view and process data using SQL based Queries.
    • spark.sql to run queries from Hive Tables or temporary views or even Hive commands
    • org.apache.spark.sql.functions.udf to create User Defined Functions for Data Frame Operations.
    • spark.register.udf to register standard Scala Functions as SQL functions.

Creating Data Frame Dynamically

Let us see how we can use StructTypes to create Data Frame dynamically based upon control files.

  • Many times we will get metadata about data in the form of control files.
  • Control files will have information such as column names, Data Types etc.
  • We need to create fields in Data Frame dynamically using column names and Data Types provided as part of control files.
  • The process is divided into two steps
    • Create Schema
    • Create Data Frame using Schema

Creating a Schema

Here are the steps involved in creating Schema by using metadata from control files.

  • Load data from a file into Scala collection
  • Build an array of fields using StructField with column name and Data Type
  • Using the array we can build StructType
  • Also, we need to read data from files and then apply the map to build RDD of Row type for each record with attributes.
  • Then we can use spark.createDataFrame to create Data Frame programmatically by passing RDD of records of type Row and StructType
[gist]2cb6d8e7fe5928c4b15bc1174cf2b409[/gist]

Creating Data Frame Dynamically

Let us see how we can create Data Frame after defining the Schema using metadata.

  • Read the data from the file – orders
  • Apply the necessary transformation to create RDD of type Row with four fields using map.
  • Convert into dataframe using spark.createDataFrame. It take RDD and schema as arguments.
  • RDD will be converted to Data Frame using Schema defined.
[gist]0f54ed07d9cca49b3cc6f2267991c5b3[/gist]

Data Frame Native Operations – Overview

As we have seen how to create Data Frames from files, RDD etc., let us get into the high-level details of Data Frame Native Operations. We can perform all standard transformations using Data Frame Operations.

  • Previewing Schema and Data – printSchema and show
  • Row-level transformations – using select, withColumn
  • Filtering the Data – filter or where. We can pass filtering either by using SQL style syntax or Data Frame Native Syntax.
  • Aggregations – count, sum, avg, min, max etc
  • Sorting
  • Ranking using Windowing or Analytical Functions

Let us see a few simple examples.

  • Get orders for the month of 2014 January.
  • Get count by status from filtered orders
  • Get revenue for each order_id from order_items
[gist]6cf9c67b9cc79cee1878e77f37dec5f9[/gist]

Spark SQL – Overview

Let us get into the details related to Spark SQL. We can submit queries on Hive tables or in memory temp tables using spark.sql API.

  • We can directly run queries on Hive tables or tables from remote databases using JDBC and create Data Frame for the results.
  • We can also register a temporary table for a Data Frame and run queries against it.
  • We can perform all the standard transformations using SQL syntax
  • We can also run standard Hive commands using spark.sql, such as show tables, describe table etc.
  • As part of Data Processing, we typically perform these operations.
    • Row Level Transformations (Data Standardization, Cleansing etc)
    • Filtering the data
    • Joining the Data Sets
    • Aggregations such as sum, min, max
    • Sorting and Ranking
    • and more

Let us see a few simple examples.

  • Get orders for the month of 2014 January.
  • Get count by status from filtered orders
  • Get revenue for each order_id from order_items
[gist]5a1c6719562a3ed26d9051c7151b7c50[/gist]

Saving Data Frames into Files – Overview

Once data in Data Frame is processed using either Data Frame Operations or Spark SQL, we can write Data Frame into target systems.

  • There are APIs to write data into files, Hive tables as well as remote RDBMS table over JDBC.
  • spark.write or spark.save are the main packages to write data into files in supported file systems.
  • We have APIs for these different file formats.
    • Text File Format – csv and text
    • parquet
    • orc
    • json
    • avro (require plugin)
  • We will see the details at a later point in time. For now, we will just validate on a Data Frame by writing into JSON format.

Let us see a demo.

  • Read JSON data from order_items
  • Compute revenue for each order. We can either use Data Frame Native Operations or Spark SQL for this purpose.
  • Let us spark.sql.shuffle.partitions to 2, so that data can be aggregated using 2 tasks. By default Spark SQL or Data Frame Operations use 200.
  • As our data set size is very small, it does not make sense to use 200 threads to perform aggregation.
  • Save data back to File System in the form of JSON
[gist]78706b1861e7bb19601e79c850ecfab5[/gist]