Search
Carol McDonald

How Spark Runs Your Applications

November 18, 2020

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019, may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": "Carol McDonald",
"publish": "2018-10-31T07:00:00.000Z",
"tags": "apache-spark"

Recall that your Spark application runs as a set of parallel tasks. In this blog post, we will go over how Spark translates Dataset transformations and actions into an execution model.

With Spark 2.0 and later versions, big improvements were implemented to make Spark easier to program and execute faster.

In order to understand how your application runs on a cluster, an important thing to know about Dataset transformations is that they fall into two types, narrow and wide, which we will discuss first, before explaining the execution model.

Narrow and Wide Transformations  

As a review, transformations create a new Dataset from an existing one.  Narrow transformations do not have to move data between partitions when creating a new dataset from an existing one. Some example narrow transformations are filter and select, which are used in the example below to retrieve flight information for the carrier "AA":

// select and filter are narrow transformations
df.select($"carrier",$"origin",  $"dest", $"depdelay", $"crsdephour").filter($"carrier" === "AA" ).show(2)

result:
+-------+------+----+--------+----------+
|carrier|origin|dest|depdelay|crsdephour|
+-------+------+----+--------+----------+
|     AA|   ATL| LGA|     0.0|        17|
|     AA|   LGA| ATL|     0.0|        13|
+-------+------+----+--------+----------+

Multiple narrow transformations can be performed on a Dataset in memory, in a process called pipelining, making narrow transformations very efficient.

Wide transformations cause data to be moved between partitions when creating a new Dataset, in a process called the shuffle.  With wide transformation shuffles, data is sent across the network to other nodes and written to disk, causing network and disk I/O, and making the shuffle a costly operation. Some example wide transformations are groupBy, agg, sortBy, and orderBy. Below is a wide transformation to count the number of flights by carrier.

df.groupBy("carrier").count.show
result:

+-------+-----+
|carrier|count|
+-------+-----+
|     UA|18873|
|     AA|10031|
|     DL|10055|
|     WN| 2389|
+-------+-----+

The Spark Execution Model

The Spark execution model can be defined in three phases: creating the logical plan, translating that into a physical plan, and then executing the tasks on a cluster.

You can view useful information about your Spark jobs in real time in a web browser with this URL: http://<driver-node>:4040. For Spark applications that have finished, you can use the Spark history server to see this information in a web browser at this URL: http://<server-url>:18080.  Let’s walk through the three phases and the Spark UI information about the phases, with some example code.

The Logical Plan

In the first phase, the logical plan is created. This is the plan that shows which steps will be executed when an action gets applied. Recall that when you apply a transformation on a Dataset, a new Dataset is created. When this happens, that new Dataset points back to the parent, resulting in a lineage or directed acyclic graph (DAG) for how Spark will execute these transformations.

The Physical Plan

Actions trigger the translation of the logical DAG into a physical execution plan. The Spark Catalyst query optimizer creates the physical execution plan for DataFrames, as shown in the diagram below:

(Image reference: Databricks)

The physical plan identifies resources, such as memory partitions and compute tasks, that will execute the plan.

Viewing the Logical and Physical Plan

You can see the logical and physical plan for a Dataset by calling the explain(true) method. In the code below, we see that the DAG for df2 consists of a FileScan, a Filter on depdelay, and a Project (selecting columns).

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

var file = "maprfs:///data/flights20170102.json"

case class Flight(_id: String, dofW: Long, carrier: String, origin: String, dest: String, crsdephour: Long, crsdeptime: Double, depdelay: Double,crsarrtime: Double, arrdelay: Double, crselapsedtime: Double, dist: Double) extends Serializable

val df = spark.read.format("json").option("inferSchema", "true").load(file).as[Flight]

val df2 = df.filter($"depdelay" > 40)

df2.take(1)

result:
Array[Flight] = Array(Flight(MIA_IAH_2017-01-01_AA_2315, 7,AA,MIA,IAH,20,2045.0,80.0,2238.0,63.0,173.0,964.0))

df2.explain(true)

result:
== Parsed Logical Plan ==
'Filter ('depdelay > 40)
+- Relation[_id#8,arrdelay#9,] json

== Analyzed Logical Plan ==
_id: string, arrdelay: double…
Filter (depdelay#15 > cast(40 as double))
+- Relation[_id#8,arrdelay#9] json

== Optimized Logical Plan ==
Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
+- Relation[_id#8,arrdelay#9,] json

== Physical Plan ==
*Project [_id#8, arrdelay#9,]
+- *Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
   +- *FileScan json [_id#8,arrdelay#9,] Batched: false, Format: JSON, Location: InMemoryFileIndex[maprfs:///..],

You can see more details about the plan produced by Catalyst on the web UI SQL tab (http://<driver-node>:4040/SQL/).  Clicking on the query description link displays the DAG and details for the query.

In the code below, after the explain, we see that the physical plan for df3 consists of a FileScan, Filter, Project, HashAggregate, Exchange, and HashAggregate. The Exchange is the shuffle caused by the groupBy transformation. Spark performs a hash aggregation for each partition before shuffling the data in the Exchange. After the exchange, there is a hash aggregation of the previous sub-aggregations. Note that we would have an in-memory scan instead of a file scan in this DAG, if df2 were cached.

val df3 = df2.groupBy("carrier").count

df3.collect

result:
Array[Row] = Array([UA,2420], [AA,757], [DL,1043], [WN,244])

df3.explain

result:
== Physical Plan ==
*HashAggregate(keys=[carrier#124], functions=[count(1)])
+- Exchange hashpartitioning(carrier#124, 200)
+- *HashAggregate(keys=[carrier#124], functions=[partial_count(1)])
      +- *Project [carrier#124]
         +- *Filter (isnotnull(depdelay#129) && (depdelay#129 > 40.0))
+- *FileScan json [carrier#124,depdelay#129]

Clicking on the SQL tab link for this query displays the DAG below.

Executing the Tasks on a Cluster

In the third phase, the tasks are scheduled and executed on the cluster. The scheduler splits the graph into stages, based on the transformations. The narrow transformations (transformations without data movement) will be grouped (pipe-lined) together into a single stage. The physical plan for this example has two stages, with everything before the exchange in the first stage.

Each stage is comprised of tasks, based on partitions of the Dataset, which will perform the same computation in parallel.

The scheduler submits the stage task set to the task scheduler, which launches tasks via a cluster manager. These phases are executed in order, and the action is considered complete when the final phase in a job completes. This sequence can occur many times when new Datasets are created.

Here is a summary of the components of execution:

  • Task: a unit of execution that runs on a single machine
  • Stage: a group of tasks, based on partitions of the input data, which will perform the same computation in parallel
  • Job: has one or more stages
  • Pipelining: collapsing of Datasets into a single stage, when Dataset transformations can be computed without data movement
  • DAG: Logical graph of Dataset operations

Exploring the Task Execution on the Web UI

Here is a screenshot of the web UI Jobs tab, after running the code above. The Jobs page gives you detailed execution information for active and recently completed Spark jobs. It gives you the performance of a job and also the progress of running jobs, stages, and tasks. In this example, Job Id 2 is the job that was triggered by the collect action on df3.

Clicking the link in the Description column on the Jobs page takes you to the Job Details page. This page gives you details on the progress of the job, stages, and tasks.  We see this job consists of 2 stages, with 2 tasks in the stage before the shuffle and 200 in the stage after the shuffle.

The number of tasks correspond to the partitions: after reading the file in the first stage, there are 2 partitions; after a shuffle, the default number of partitions is 200. You can see the number of partitions on a Dataset with the rdd.partitions.size method shown below.

df3.rdd.partitions.size
result: Int = 200

df2.rdd.partitions.size
result: Int = 2

Under the Stages tab, you can see the details for a stage by clicking on its link in the description column.

Here we have summary metrics and aggregated metrics for tasks and aggregated metrics by executor.  You can use these metrics to identify problems with an executor or task distribution. If your task process time is not balanced, then resources could be wasted.

The Storage tab provides information about persisted Datasets. The Dataset is persisted if you called Persist or Cache on the Dataset, followed by an action to compute on that Dataset. This page tells you which fraction of the Dataset’s underlying RDD is cached and the quantity of data cached in various storage media. Look at this page to see if important Datasets are fitting into memory. You can also click on the link to view more details about the persisted Dataset. If you no longer need a cached Dataset, you can call Unpersist to uncache it.

Try caching df2, performing an action, then seeing how this gets persisted on the storage tab and how it changes the plan and execution time for df3 on the job details page. Notice how the execution time is faster after caching.

df2.cache
df2.count
df3.collect

Notice how the first stage is skipped in job4, when df2 is cached and df3 collect is executed again.

The Environment tab lists all the active properties of your Spark applicationenvironment. Use this page when you want to see which configuration flags are enabled.  Only values specified through spark-defaults.conf, SparkSession, or the command line will be displayed here. For all other configuration properties, the default value is used.

Under the Executors tab, you can see processing and storage for each executor:

  • Shuffle Read Write Columns: shows size of data transferred between stages
  • Storage Memory Column: shows the current used/available memory
  • Task Time Column: shows task time/garbage collection time

Use this page to confirm that your application has the amount of resources you were expecting.  You can look at the thread call stack by clicking on the thread dump link.

Summary

In this post, we discussed the Spark execution model, and we explored task execution on the Spark Web UI. This understanding of how Spark runs your applications is important when debugging, analyzing, and tuning the performance of your applications.

Related

Ted Dunning & Ellen Friedman

3 ways a data fabric enables a data-first approach

Mar 15, 2022
Nicolas Perez

A Functional Approach to Logging in Apache Spark

Feb 5, 2021
Cenz Wong

Getting Started with DataTaps in Kubernetes Pods

Jul 6, 2021
Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Carol McDonald

An Inside Look at the Components of a Recommendation Engine

Jan 22, 2021
Carol McDonald

Analyzing Flight Delays with Apache Spark GraphFrames and MapR Database

Dec 16, 2020
Nicolas Perez

Apache Spark as a Distributed SQL Engine

Jan 7, 2021
Carol McDonald

Apache Spark Machine Learning Tutorial

Nov 25, 2020

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.