Search
Nicolas Perez

Spark Custom Streaming Sources

January 14, 2021

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019, may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": ["Nicolas A Perez"],
"publish": "2019-03-14T07:00:00.000Z",
"tags": "apache-spark"

Apache Spark is one of the most versatile big data frameworks out there. The ability to mix different kinds of workloads, in memory processing and functional style, makes it desirable for anyone coming to code in the data processing world.

One important aspect of Spark is that it has been built for extensibility. Writing new connectors for the RDD API or extending the DataFrame/Dataset API allows third parties to integrate with Spark with ease. Most people will use one of the built-in APIs, such as Kafka for streams processing or JSON/CVS for file processing. However, there are times where we need more specific implementations, closer to us. For example, we might have a proprietary database we use in our company, and there will not be a connector for it. We can simply write one, as we explained in this previous post Spark Data Source API. Extending Our Spark SQL Query Engine.

Starting with Spark 2.0, we could create sources from streams, which gave life to the Spark Structured Streaming API. As we would imagine, there are some built-in streaming sources, Kafka being one of them, alongside FileStreamSource, TextSocketSource, etc.

Using the new Structured Streaming API should be preferred over the old DStream API. However, the same problem as before presents again. How can we extend this new API, so we can use our own streaming sources? The answer to this question is in this blog post.

Extensibility Points

Let's start by reviewing the main components that we need to touch on in order to create our own streaming source.

First of all, StreamSourceProvider is what indicates what source will be used as the stream reader.

Secondly, DataSourceRegister will allow us to register our source within Spark, so it becomes available to the stream processing.

Thirdly, Source is the interface that we need to implement, so we provide streaming source-like behavior.

Our Streaming Source

For the sake of this post, we will implement a rather easy streaming source, but the same concepts apply to any streaming source that you need to implement on your own.

Our streaming source is called InMemoryRandomStrings. It basically generates a sequence of random strings and their length, which are viewed as a DataFrame of pairs.

Since we want to keep it simple, we will store the batches in memory and discard them when the process is done. InMemoryRandomStrings is not fault-tolerant, since data is generated at the processing time in contrast to the built-in Kafka source, where data actually lives in a Kafka cluster. In most real-case scenarios, our data is consistently stored in advance systems that keep it secured and consistent; MapR Event Store for Apache Kafka and MapR Database are just a couple of these examples.

We can start by defining our StreamSourceProvider, which defines how our Source is created.

The class DefaultSource is our StreamSourceProvider, and we need to implement the two required functions, sourceSchema and createSource.

class DefaultSource extends StreamSourceProvider with DataSourceRegister {

  override def sourceSchema(sqlContext: SQLContext,
                            schema: Option[StructType],
                            providerName: String,
                            parameters: Map[String, String]): (String, StructType) = {

    (shortName(), InMemoryRandomStrings.schema)
  }

  override def createSource(sqlContext: SQLContext,
                            metadataPath: String,
                            schema: Option[StructType],
                            providerName: String,
                            parameters: Map[String, String]): Source = {

    new InMemoryRandomStrings(sqlContext)
  }

  override def shortName(): String = "InMemoryRandomStrings"
}

InMemoryRandomStrings.schema is the fixed schema we are going to use for the example, but the schema can be dynamically passed in.

The createSource function then returns an instance of InMemoryRandomStrings that is our actual Source.

InMemoryRandomStrings

Now, let's see InMemoryRandomStrings code in parts, so we can focus on all the details.

class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {

  override def schema: StructType = InMemoryRandomStrings.schema

  override def getOffset: Option[Offset] = ???

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???

  override def commit(end: Offset): Unit = ???

  override def stop(): Unit = ???
}

object InMemoryRandomStrings {

  lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))

}

schema returns the schema that our source uses; in our case, we know that the schema is fixed.

getOffset should return the latest offset seen by our source.

class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
  private var offset: LongOffset = LongOffset(-1)

  override def schema: StructType = InMemoryRandomStrings.schema

  override def getOffset: Option[Offset] = this.synchronized {
    println(s"getOffset: $offset")

    if (offset.offset == -1) None else Some(offset)
  }

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???

  override def commit(end: Offset): Unit = ???

  override def stop(): Unit = ???
}

object InMemoryRandomStrings {

  lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))

}

Notice that we added a variable called offset that will keep track of the seen data. Then, we return None if our source has never seen any data, Some(offset) otherwise.

Now, let's see how our source can produce some data; we will use a running thread for it. Please, notice the dataGeneratorStartingThread function.

class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
  private var offset: LongOffset = LongOffset(-1)

  private var batches = collection.mutable.ListBuffer.empty[(String, Long)]

  private val incrementalThread = dataGeneratorStartingThread()

  override def schema: StructType = InMemoryRandomStrings.schema

  override def getOffset: Option[Offset] = this.synchronized {
    println(s"getOffset: $offset")

    if (offset.offset == -1) None else Some(offset)
  }

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???

  override def commit(end: Offset): Unit = ???

  override def stop(): Unit = incrementalThread.stop()

  private def dataGeneratorStartingThread() = {
    val t = new Thread("increment") {
      setDaemon(true)
      override def run(): Unit = {

        while (true) {
          try {
            this.synchronized {
              offset = offset + 1

              val value = Random.nextString(Random.nextInt(5))

              batches.append((value, offset.offset))
            }
          } catch {
            case e: Exception => println(e)
          }

          Thread.sleep(100)
        }
      }

    }

    t.start()

    t
  }
}

object InMemoryRandomStrings {

  lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))

}

In here, we have added a thread that generates random values and increments the offset while storing the value and offset on an internal buffer. The thread starts running as soon as our source is created. The stop function stops the running thread.

At this point, we are only two functions away from our goal.

getBatch returns a DataFrame back to Spark with data within the passed offset range.

override def getBatch(start: Option[Offset], end: Offset): DataFrame = this.synchronized {

    val s = start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset + 1
    val e = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset + 1

    println(s"generating batch range $start ; $end")

    val data = batches
      .par
      .filter { case (_, idx) => idx >= s && idx <= e }
      .map { case (v, _) => (v, v.length) }
      .seq

    val rdd = sqlContext
      .sparkContext
      .parallelize(data)
      .map { case (v, l) => InternalRow(UTF8String.fromString(v), l.toLong) }

    sqlContext.sparkSession.internalCreateDataFrame(rdd, schema, isStreaming = true)
  }

We can see that we are getting the data from our internal buffer so that the data has the corresponding indexes. From there, we generate the DataFrame that we then send back to Spark.

Finally, commit is how Spark indicates to us that it will not request offsets less or equal to the one being passed. In other words, we can remove all data from our internal buffer with an offset less than or equal to the one passed to commit. In this way, we can save some memory and avoid running out of heap space.

override def commit(end: Offset): Unit = this.synchronized {

    val committed = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset

    val toKeep = batches.filter { case (_, idx) => idx > committed }

    batches = toKeep
  }

Now, we have completed our source; the entire code is the following:

class InMemoryRandomStrings(sqlContext: SQLContext) extends Source {
  private var offset: LongOffset = LongOffset(-1)

  private var batches = collection.mutable.ListBuffer.empty[(String, Long)]

  private val incrementalThread = dataGeneratorStartingThread()

  override def schema: StructType = InMemoryRandomStrings.schema

  override def getOffset: Option[Offset] = this.synchronized {
    println(s"getOffset: $offset")

    if (offset.offset == -1) None else Some(offset)
  }

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = this.synchronized {

    val s = start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset + 1
    val e = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset + 1

    println(s"generating batch range $start ; $end")

    val data = batches
      .par
      .filter { case (_, idx) => idx >= s && idx <= e }
      .map { case (v, _) => (v, v.length) }
      .seq

    val rdd = sqlContext
      .sparkContext
      .parallelize(data)
      .map { case (v, l) => InternalRow(UTF8String.fromString(v), l.toLong) }

    sqlContext.sparkSession.internalCreateDataFrame(rdd, schema, isStreaming = true)
  }

  override def commit(end: Offset): Unit = this.synchronized {

    val committed = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset

    val toKeep = batches.filter { case (_, idx) => idx > committed }

    batches = toKeep
  }

  override def stop(): Unit = incrementalThread.stop()

  private def dataGeneratorStartingThread() = {
    val t = new Thread("increment") {
      setDaemon(true)
      override def run(): Unit = {

        while (true) {
          try {
            this.synchronized {
              offset = offset + 1

              val value = Random.nextString(Random.nextInt(5))

              batches.append((value, offset.offset))
            }
          } catch {
            case e: Exception => println(e)
          }

          Thread.sleep(100)
        }
      }
    }

    t.start()

    t
  }
}


object InMemoryRandomStrings {
  lazy val schema = StructType(List(StructField("value", StringType), StructField("ts", LongType)))
}

Using Our Custom Source

Now, we need to plug in our source into the Spark Structured Streaming API by indicating the correct format to be used.

val r = sparkSession
    .readStream
    .format("com.github.anicolaspp.spark.sql.streaming.DefaultSource")
    .load()

In here, we use the regular .readStream API and specify that the stream format is our implementation of StreamSourceProvide, that is: com.github.anicolaspp.spark.sql.streaming.DefaultSource.

Now we can query our streaming source as any other DataFrame.

r.createTempView("w")

    sparkSession
      .sql("select ts, count(*) as c from w group by ts order by ts, c desc")
      .writeStream
      .format("console")
      .outputMode(OutputMode.Complete())
      .start()
      .awaitTermination()

The output will look similar to this:

-------------------------------------------
Batch: 3
-------------------------------------------
+---+---+
| ts|  c|
+---+---+
|  0| 81|
|  1| 78|
|  2| 74|
|  3| 82|
|  4| 80|
+---+---+

What we see is a continuous aggregation of the data generated by our source.

Conclusions

Apache Spark is the way to go when processing data at scale. Its features outperform almost any other tool out there. Also, it can be extended in many different ways, and as we can see, we can write our own data sources and streaming sources, so they can be plugged into our Spark code with ease.

Originally posted January 14, 2019, here.

The entire project and source code can be found here: SparkStreamSources.

Happy Coding.

Related

Ted Dunning & Ellen Friedman

3 ways a data fabric enables a data-first approach

Mar 15, 2022
Nicolas Perez

A Functional Approach to Logging in Apache Spark

Feb 5, 2021
Cenz Wong

Getting Started with DataTaps in Kubernetes Pods

Jul 6, 2021
Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Carol McDonald

An Inside Look at the Components of a Recommendation Engine

Jan 22, 2021
Carol McDonald

Analyzing Flight Delays with Apache Spark GraphFrames and MapR Database

Dec 16, 2020
Nicolas Perez

Apache Spark as a Distributed SQL Engine

Jan 7, 2021
Carol McDonald

Apache Spark Machine Learning Tutorial

Nov 25, 2020

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.