Search
Carol McDonald

Using Apache Spark DataFrames for Processing of Tabular Data

December 13, 2021
"authorDisplayName": "Carol McDonald",
"category": "apache-spark",
"publish": "2015-06-24T07:00:00.000Z"

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019 may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/ezmeral-data-fabric.html

This post will help you get started using Apache Spark DataFrames with Scala on the MapR Sandbox (now known as the Development Environment for HPE Ezmeral Data Fabric). The new Spark DataFrames API is designed to make big data processing on tabular data easier.

What is a Spark DataFrame?

A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases.

In this post, you’ll learn how to:

  • Load data into Spark DataFrames
  • Explore data with Spark SQL

This post assumes a basic understanding of Spark concepts. If you have not already read the tutorial on Getting Started with Spark on MapR Sandbox, it would be good to read that first.

Software

The sample data sets

We will use two example datasets - one from eBay online auctions and one from the SFPD Crime Incident Reporting system.

The eBay online auction dataset has the following data fields:

auctionid - unique identifier of an auction
bid - the proxy bid placed by a bidder
bidtime - the time (in days) that the bid was placed, from the start of the auction
bidder - eBay username of the bidder
bidderrate - eBay feedback rating of the bidder
openbid - the opening bid set by the seller
price - the closing price that the item sold for (equivalent to the second highest bid + an increment)

The table below shows the data fields with some sample data:

Using Spark DataFrames, we will explore the data with questions like:

  • How many auctions were held?
  • How many bids were made per item?
  • What's the minimum, maximum, and average number of bids per item?
  • Show the bids with price > 100

The table below shows the SFPD data fields with some sample data:

Using Spark DataFrames, we will explore the SFPD data with questions like:

  • What are the top 10 Resolutions?
  • How many Categories are there?
  • What are the top 10 incident Categories?

Loading data into Spark DataFrames

Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with:
$ spark-shell

First, we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects.
(In the code boxes, comments are in Green and output is in Blue)

//  SQLContext entry point for working with structured data

val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._ // Import Spark SQL data types and Row.import org.apache.spark.sql._

Below, we load the data from the ebay.csv file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions; the first() action returns the first element in the RDD, which is the String “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”

// load the data into a  new RDD

val ebayText = sc.textFile("ebay.csv")

// Return the first element in this RDD

ebayText.first() // res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3

Below, we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then, map() transformations are applied to each element of ebayText to create the ebay RDD of Auction objects.

//define the schema using a case class

case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)

// create an RDD of Auction objects

val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))

The ebay RDD first() action returns the first element in the RDD, Auction = Auction( 8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox,3).

// Return the first element in this RDD

ebay.first() //res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)// Return the number of elements in the RDDebay.count() res8: Long = 10654

A DataFrame is a distributed collection of data organized into named columns. Spark SQL supports automatically converting an RDD containing case classes to a DataFrame with the method toDF():

// change ebay RDD of Auction objects to a DataFrame

val auction = ebay.toDF()

The previous RDD transformations can also be written on one line like this:

val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p =>
Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()

Explore and query the eBay auction data with Spark DataFrames

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples with the auction DataFrame. The DataFrame show() action displays the top 20 rows in a tabular form.

// Display the top 20 rows of DataFrame

auction.show() // auctionid bid bidtime bidder bidderrate openbid price item daystolive // 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3 // 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3 …

DataFrame printSchema() Prints the schema to the console in a tree format

// Return the schema of this DataFrame

auction.printSchema() root |-- auctionid: string (nullable = true) |-- bid: float (nullable = false) |-- bidtime: float (nullable = false) |-- bidder: string (nullable = true) |-- bidderrate: integer (nullable = true) |-- openbid: float (nullable = false) |-- price: float (nullable = false) |-- item: string (nullable = true) |-- daystolive: integer (nullable = true)

After a dataframe is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:

// How many auctions were held?

auction.select("auctionid").distinct.count // Long = 627

// How many bids per item?

auction.groupBy("auctionid", "item").count.show auctionid item count 3016429446 palm 10 8211851222 xbox 28 3014480955 palm 12 8214279576 xbox 4 3014844871 palm 18 3014012355 palm 35 1641457876 cartier 2 . . . // What's the min number of bids per item? what's the average? what's the max?auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show

// MIN(count) AVG(count) MAX(count) // 1 16.992025518341308 75// Get the auctions with closing price > 100

val highprice= auction.filter("price > 100") // highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]

// display dataframe in a tabular format

highprice.show() // auctionid bid bidtime bidder bidderrate openbid price item daystolive // 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3

You can register a DataFrame as a temporary table using a given name, and then run SQL statements using the sql methods provided by sqlContext. Here are some example queries using sqlContext:

// register the DataFrame as a temp table

auction.registerTempTable("auction") // SQL statements can be run // How many bids per auction?val results =sqlContext.sql("SELECT auctionid, item, count(bid) FROM auction GROUP BY auctionid, item") // display dataframe in a tabular formatresults.show() // auctionid item count // 3016429446 palm 10 // 8211851222 xbox 28. . .

val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction GROUP BY item,auctionid") results.show() // auctionid c1 // 3019326300 207.5 // 8213060420 120.0 . . .

Loading the SFPD data into Spark dataframes using a csv parsing library

Now we will load the SFPD dataset into a Spark dataframe using the spark-csv parsing library from Databricks. You can use this library at the Spark shell by specifying --packages com.databricks:spark-csv_2.10:1.0.3 when starting the shell as shown below:

$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3

The load operation will parse the sfpd.csv file and return a dataframe using the first header line of the file for column names.

import sqlContext.implicits._
import org.apache.spark.sql._


//  Return the dataset specified by data source as a DataFrame, use the header for column namesval df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))

The take operation returns the specified number of rows in the DataFame.

// Return the first n rows in the DataFrame

df.take(1)

// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])// Print the schema to the console in a tree format

df.printSchema() root |-- IncidntNum: string (nullable = true) |-- Category: string (nullable = true) |-- Descript: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- Date: string (nullable = true) |-- Time: string (nullable = true) |-- PdDistrict: string (nullable = true) |-- Resolution: string (nullable = true) |-- Address: string (nullable = true) |-- X: string (nullable = true) |-- Y: string (nullable = true) |-- Location: string (nullable = true) |-- PdId: string (nullable = true)

// display dataframe in a tabular format

df.show() IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId 150467944 LARCENY/THEFT GRAND THEFT FROM ... Thursday 05/28/2015 23:59 TENDERLOIN NONE TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244

Here are some example queries using sqlContext:

// how many categories are there?

df.select("Category").distinct.count // res5: Long = 39// register as a temp table inorder to use sqldf.registerTempTable("sfpd")

// How many categories are there

sqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)

// [ASSAULT] // [MISSING PERSON] // [TREA] . . .
// What are the top 10 Resolutions ?

sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println) // [NONE,1063775] // [ARREST, BOOKED,414219] // [ARREST, CITED,154033] . . .

// What are the top 10 most incident Categories?

val t = sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")

t.show() // Category catcount // LARCENY/THEFT 353793 // OTHER OFFENSES 253627 // NON-CRIMINAL 186272. . .

// The results of SQL queries are DataFrames and support RDD operations. // The columns of a row in the result can be accessed by ordinal

t.map(t => "column 0: " + t(0)).collect().foreach(println) // column 0: LARCENY/THEFT // column 0: OTHER OFFENSES // column 0: NON-CRIMINAL // column 0: ASSAULT …

The physical plan for DataFrames

The Catalyst query optimizer creates the physical Execution Plan for DataFrames as shown in the diagram below:

Print the physical plan to the console

DataFrames are designed to take the SQL queries constructed against them and optimize the execution as sequences of Spark Jobs as required. You can print the physical plan for a DataFrame using the explain operation as shown below:

//  Prints the physical plan to the console for debugging purpose

auction.select("auctionid").distinct.explain()

// == Physical Plan == // Distinct false // Exchange (HashPartitioning [auctionid#0], 200) // Distinct true // Project [auctionid#0] // PhysicalRDD //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37

Summary

In this blog post, you’ve learned how to load data into Spark DataFrames, and explore data with Spark SQL.

Want to learn more?

Related

Ted Dunning & Ellen Friedman

3 ways a data fabric enables a data-first approach

Mar 15, 2022
Nicolas Perez

A Functional Approach to Logging in Apache Spark

Feb 5, 2021
Cenz Wong

Getting Started with DataTaps in Kubernetes Pods

Jul 6, 2021
Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Carol McDonald

An Inside Look at the Components of a Recommendation Engine

Jan 22, 2021
Carol McDonald

Analyzing Flight Delays with Apache Spark GraphFrames and MapR Database

Dec 16, 2020
Nicolas Perez

Apache Spark as a Distributed SQL Engine

Jan 7, 2021
Carol McDonald

Apache Spark Machine Learning Tutorial

Nov 25, 2020

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.