Search
Nicolas Perez

A Functional Approach to Logging in Apache Spark

February 5, 2021

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019 may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": "Nicolas A Perez",
"publish": "2016-07-28T07:00:00.000Z",
"tags": "apache-spark"

A Functional Approach to Logging in Apache Spark

Logging in Apache Spark is very easy to do, since Spark offers access to a logobject out of the box; only some configuration setups need to be done. In a previous post, we looked at how to do this while identifying some problems that may arise. However, the solution presented might cause some problems when you are ready to collect the logs, since they are distributed across the entire cluster. Even if you utilize YARN log aggregation capabilities, there will be some contentions that might affect performance, or you could end up with log interleaves that corrupt the nature of the log itself.

In this blog post, I will demonstrate how to solve these problems by taking a different, more functional approach.

The Monad Writer

I do not intend to go over the details about monads or the Monad Writer, so if you would like to learn more, please read “Functors, Applicatives, And Monads In Pictures” which is very informative about this topic.

Just to put things in context, let’s say that the monad writer (writer) is a container that holds the current value of a computation in addition to the history (log) of the value (set of transformation on the value).

Because the writer has monadic properties, it allows us to do functional transformations, and we will soon see how everything sticks together.

A Simplistic Log

The following code demonstrates a simplistic log.

object app {
  def main(args: Array[String]) {
    val log = LogManager.getRootLogger
    log.setLevel(Level.WARN)

    val conf = new SparkConf().setAppName("demo-app")
    val sc = new SparkContext(conf)

    log.warn("Hello demo")

    val data = sc.parallelize(1 to 100000)

    log.warn("I am done")
  }
}

The only thing to note is that logging is actually happening on the Spark driver, so we don’t have synchronization or contention problems. However, everything starts to get complicated once we start distributing our computations.

The following code won’t work (read this previous post to know why)

val log = LogManager.getRootLogger
val data = sc.parallelize(1 to 100000)

data.map { value => 
    log.info(value)
    value.toString
}

A solution to this was also presented in the previous post, but it requires extra work to manage the logs.

Once we start logging on each node of the cluster, we need to go to each node and collect each log file in order to make sense of whatever is in the logs. Hopefully, you are using some kind of tool to help you with this task, such as Splunk, Datalog, etc. However, you still need to know how to get those logs into your system.

Our Data Set

Our data set is a collection of the class “Person” that is going to be transformed while keeping an unified log of the operations on our data set.

Let’s suppose we want our data set to get loaded, filter each person who is less than 20 years old, and finally, extract his/her name. This is a very silly example, but it will demonstrate how the logs are produced. You could replace these computations, but the idea of building a unified log will remain.

Getting the Writer

In order to use the TypeLevel / Cats library to import the monad writer, we add the following line to our build.sbt file.

libraryDependencies += "org.typelevel" %% "cats" % "0.6.1"

Playing with our Data

Now, let’s define the transformations we are going to use. First, let’s load the data.

def loadPeopleFrom(path: String)(implicit sc: SparkContext) = 
  s"loading people from $path" ~> sc.textFile(path)
                                    .map(x => User(x.split(",")(0), x.split(",")(1).toInt))

In here, the ~> operation is defined via implicit conversions as follows:

implicit class toW(s: String) {
  def ~>[A](rdd: RDD[A]): Writer[List[String], RDD[A]] = Writer(s :: Nil, rdd)
}

If you look closely, our loading operation is not returning an RDD; in fact, it returns the monad writer that keeps track of the logs.

Let’s define the filter that we want to apply over the collection of users.

def filter(rdd: RDD[User])(f: User => Boolean) = "filtering users" ~> rdd.filter(f)

Again, we are applying the same function (~>) to keep track of this transformation.

Lastly, we define the mapping, which follows the same pattern we just saw.

def mapUsers(rdd: RDDUser])(prefix: String): Writer[List[String], RDD[String]] = 
  "mapping users" ~> rdd.map(p => prefix + p.name)

Putting it together

So far we have only defined our transformations, but we need to stick them together. Scala for is a very convenient way to work with monadic structures. Let’s see how.

val result = for {
  person          <-< span=""> loadPeopleFrom("~/users_dataset/")(sc)
  filtered        <-< span=""> filter(person)(_.age < 20)
  namesWithPrefix <-< span=""> mapUsers(filtered)("hello")
} yield namesWithPrefix

val (log, rdd) = result.run

Please note that the result is of the type:Writer[List[String], RDD[String]].

Calling result.run will give us the log: List[String] and the final computation is expressed by rdd: RDD[String].

At this point, we could use Spark logger to write down the log generated by the chain of transformations. Note that this operation will be executed on the Spark master, which implies that one log file will be created that contains all of the log information. We are also removing potential contention problems during the log writes. In addition, we are not locking the log file, which avoids performance issues by creating and writing to the file in a serial way.

Conclusion

In this blog post, I’ve shown you how to improve how to log on Apache Spark by using the Monad Writer. This functional approach allows you to distribute the creation of logs along with your computations, which is something that Spark does very well. However, instead of writing the logs on each worker node, you are collecting them back to the master to write them down.

This mechanism has certain advantages over the previous implementation. You can now control exactly how and when your logs are going to be written down, you can boost performance by removing IO operations on the worker nodes, you can remove synchronization issues by writing the logs in a serial way, and you can avoid the hazard of fishing logs across your entire cluster.

This post was originally published here.

Related

Ted Dunning & Ellen Friedman

3 ways a data fabric enables a data-first approach

Mar 15, 2022
Cenz Wong

Getting Started with DataTaps in Kubernetes Pods

Jul 6, 2021
Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Carol McDonald

An Inside Look at the Components of a Recommendation Engine

Jan 22, 2021
Carol McDonald

Analyzing Flight Delays with Apache Spark GraphFrames and MapR Database

Dec 16, 2020
Nicolas Perez

Apache Spark as a Distributed SQL Engine

Jan 7, 2021
Carol McDonald

Apache Spark Machine Learning Tutorial

Nov 25, 2020
Nicolas Perez

Apache Spark Packages, from XML to JSON

Dec 11, 2020

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.