"authorDisplayName": "Carol McDonald",
"category": "apache-spark",
"publish": "2015-06-24T07:00:00.000Z"
Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019 may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/ezmeral-data-fabric.html
This post will help you get started using Apache Spark DataFrames with Scala on the MapR Sandbox (now known as the Development Environment for HPE Ezmeral Data Fabric). The new Spark DataFrames API is designed to make big data processing on tabular data easier.
What is a Spark DataFrame?
A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases.
In this post, you’ll learn how to:
- Load data into Spark DataFrames
- Explore data with Spark SQL
This post assumes a basic understanding of Spark concepts. If you have not already read the tutorial on Getting Started with Spark on MapR Sandbox, it would be good to read that first.
Software
The sample data sets
We will use two example datasets - one from eBay online auctions and one from the SFPD Crime Incident Reporting system.
The eBay online auction dataset has the following data fields:
auctionid - unique identifier of an auction
bid - the proxy bid placed by a bidder
bidtime - the time (in days) that the bid was placed, from the start of the auction
bidder - eBay username of the bidder
bidderrate - eBay feedback rating of the bidder
openbid - the opening bid set by the seller
price - the closing price that the item sold for (equivalent to the second highest bid + an increment)
The table below shows the data fields with some sample data:
Using Spark DataFrames, we will explore the data with questions like:
- How many auctions were held?
- How many bids were made per item?
- What's the minimum, maximum, and average number of bids per item?
- Show the bids with price > 100
The table below shows the SFPD data fields with some sample data:
Using Spark DataFrames, we will explore the SFPD data with questions like:
- What are the top 10 Resolutions?
- How many Categories are there?
- What are the top 10 incident Categories?
Loading data into Spark DataFrames
Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr. Copy the sample data files to your sandbox home directory /user/user01 using scp. Start the spark shell with: $ spark-shell
First, we will import some packages and instantiate a sqlContext, which is the entry point for working with structured data (rows and columns) in Spark and allows the creation of DataFrame objects.
(In the code boxes, comments are in Green and output is in Blue)
// SQLContext entry point for working with structured dataval sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._ // Import Spark SQL data types and Row.import org.apache.spark.sql._
Below, we load the data from the ebay.csv file into a Resilient Distributed Dataset (RDD). RDDs can have transformations and actions; the first() action returns the first element in the RDD, which is the String “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”
// load the data into a new RDDval ebayText = sc.textFile("ebay.csv")
// Return the first element in this RDDebayText.first() // res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3
Below, we use a Scala case class to define the Auction schema corresponding to the ebay.csv file. Then, map() transformations are applied to each element of ebayText to create the ebay RDD of Auction objects.
//define the schema using a case classcase class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)
// create an RDD of Auction objectsval ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))
The ebay RDD first() action returns the first element in the RDD, Auction = Auction( 8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox,3).
// Return the first element in this RDDebay.first() //res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)// Return the number of elements in the RDDebay.count() res8: Long = 10654
A DataFrame is a distributed collection of data organized into named columns. Spark SQL supports automatically converting an RDD containing case classes to a DataFrame with the method toDF():
// change ebay RDD of Auction objects to a DataFrameval auction = ebay.toDF()
The previous RDD transformations can also be written on one line like this:
val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()
Explore and query the eBay auction data with Spark DataFrames
DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, and Python; below are some examples with the auction DataFrame. The DataFrame show() action displays the top 20 rows in a tabular form.
// Display the top 20 rows of DataFrameauction.show() // auctionid bid bidtime bidder bidderrate openbid price item daystolive // 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3 // 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3 …
DataFrame printSchema() Prints the schema to the console in a tree format
// Return the schema of this DataFrameauction.printSchema() root |-- auctionid: string (nullable = true) |-- bid: float (nullable = false) |-- bidtime: float (nullable = false) |-- bidder: string (nullable = true) |-- bidderrate: integer (nullable = true) |-- openbid: float (nullable = false) |-- price: float (nullable = false) |-- item: string (nullable = true) |-- daystolive: integer (nullable = true)
After a dataframe is instantiated, you can query it using SQL queries. Here are some example queries using the Scala DataFrame API:
// How many auctions were held?auction.select("auctionid").distinct.count // Long = 627
// How many bids per item?auction.groupBy("auctionid", "item").count.show auctionid item count 3016429446 palm 10 8211851222 xbox 28 3014480955 palm 12 8214279576 xbox 4 3014844871 palm 18 3014012355 palm 35 1641457876 cartier 2 . . . // What's the min number of bids per item? what's the average? what's the max?auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show
// MIN(count) AVG(count) MAX(count) // 1 16.992025518341308 75// Get the auctions with closing price > 100val highprice= auction.filter("price > 100") // highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]
// display dataframe in a tabular formathighprice.show() // auctionid bid bidtime bidder bidderrate openbid price item daystolive // 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3
You can register a DataFrame as a temporary table using a given name, and then run SQL statements using the sql methods provided by sqlContext. Here are some example queries using sqlContext:
// register the DataFrame as a temp tableauction.registerTempTable("auction") // SQL statements can be run // How many bids per auction?val results =sqlContext.sql("SELECT auctionid, item, count(bid) FROM auction GROUP BY auctionid, item") // display dataframe in a tabular formatresults.show() // auctionid item count // 3016429446 palm 10 // 8211851222 xbox 28. . .
val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction GROUP BY item,auctionid") results.show() // auctionid c1 // 3019326300 207.5 // 8213060420 120.0 . . .
Loading the SFPD data into Spark dataframes using a csv parsing library
Now we will load the SFPD dataset into a Spark dataframe using the spark-csv parsing library from Databricks. You can use this library at the Spark shell by specifying --packages com.databricks:spark-csv_2.10:1.0.3 when starting the shell as shown below:
$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
The load operation will parse the sfpd.csv file and return a dataframe using the first header line of the file for column names.
import sqlContext.implicits._ import org.apache.spark.sql._ // Return the dataset specified by data source as a DataFrame, use the header for column namesval df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))
The take operation returns the specified number of rows in the DataFame.
// Return the first n rows in the DataFramedf.take(1)
// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])// Print the schema to the console in a tree formatdf.printSchema() root |-- IncidntNum: string (nullable = true) |-- Category: string (nullable = true) |-- Descript: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- Date: string (nullable = true) |-- Time: string (nullable = true) |-- PdDistrict: string (nullable = true) |-- Resolution: string (nullable = true) |-- Address: string (nullable = true) |-- X: string (nullable = true) |-- Y: string (nullable = true) |-- Location: string (nullable = true) |-- PdId: string (nullable = true)
// display dataframe in a tabular formatdf.show() IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId 150467944 LARCENY/THEFT GRAND THEFT FROM ... Thursday 05/28/2015 23:59 TENDERLOIN NONE TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244
Here are some example queries using sqlContext:
// how many categories are there?df.select("Category").distinct.count // res5: Long = 39// register as a temp table inorder to use sqldf.registerTempTable("sfpd")
// How many categories are theresqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)
// [ASSAULT] // [MISSING PERSON] // [TREA] . . .
// What are the top 10 Resolutions ?sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println) // [NONE,1063775] // [ARREST, BOOKED,414219] // [ARREST, CITED,154033] . . .
// What are the top 10 most incident Categories?val t = sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")
t.show() // Category catcount // LARCENY/THEFT 353793 // OTHER OFFENSES 253627 // NON-CRIMINAL 186272. . .
// The results of SQL queries are DataFrames and support RDD operations. // The columns of a row in the result can be accessed by ordinalt.map(t => "column 0: " + t(0)).collect().foreach(println) // column 0: LARCENY/THEFT // column 0: OTHER OFFENSES // column 0: NON-CRIMINAL // column 0: ASSAULT …
The physical plan for DataFrames
The Catalyst query optimizer creates the physical Execution Plan for DataFrames as shown in the diagram below:
Print the physical plan to the console
DataFrames are designed to take the SQL queries constructed against them and optimize the execution as sequences of Spark Jobs as required. You can print the physical plan for a DataFrame using the explain operation as shown below:
// Prints the physical plan to the console for debugging purposeauction.select("auctionid").distinct.explain()
// == Physical Plan == // Distinct false // Exchange (HashPartitioning [auctionid#0], 200) // Distinct true // Project [auctionid#0] // PhysicalRDD //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37
Summary
In this blog post, you’ve learned how to load data into Spark DataFrames, and explore data with Spark SQL.
Want to learn more?