Search
Ian Downard

How to Persist Kafka Data as JSON in NoSQL Storage Using MapR Event Store and MapR Database

September 25, 2020

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019, may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": "Ian Downard",
"publish": "2016-10-31T07:00:00.000Z",
"tags": "nosql"

Streaming data is like the phrase, “Now you see it. Now you don’t!”

One of the challenges when working with streams, especially streams of fast data, is the transitory nature of the data. Kafka streams are characterized by a retention period that defines the point at which messages will be permanently deleted. For many applications, such as those fed by streams of rapidly generated sensor data, the retention period is a desirable and convenient way to purge stale data, but in other cases, such as with insurance or banking applications, record-retention laws may require data to be persisted far beyond the point at which that data has any practical value to streaming analytics. This is challenging in situations where rapidly ingested data creates pressure on stream consumers designed to write streaming records to a database. Even if we can ensure these consumers keep up, we still need to guarantee zero data loss in the unlikely event that they do fail.

HPE Ezmeral Data Fabric Event Data Streams (Formerly MapR Event Store) and HPE Ezmeral Data Fabric Document Database (Formerly MapR Database) work together to provide a scalable and fault tolerant way to save streaming data in long-term storage. They both have a distributed, scale-out design based on the HPE Ezmeral Data Fabric (Formerly MapR Data Platform). Furthermore, as a NoSQL data store, HPE Ezmeral Data Fabric Document Database makes it easy to persist data with the same schema encoded into streams. This is not only convenient for the developer but also minimizes the work required to transform streaming records into persistable objects.

Let's illustrate these concepts with an example that persists streaming data in 5 simple steps:

  1. Setup stream and database connections.
  2. Consume records from a MapR stream using the standard Kafka API.
  3. Convert each consumed record to a JSON object.
  4. Persist that JSON object in HPE Ezmeral Data Fabric Document Database.
  5. Update the stream cursor to ensure graceful recovery should a stream consumer fail.

Step 1: Setup stream and database connections

Before we can do anything interesting we first have to setup our stream and database connections. We'll use the following two options that relate to fault tolerance:

  • We disable the enable.auto.commit consumer option in order to commit stream cursors only after their corresponding records have been writing to the database.
  • We disable the BUFFERWRITE table option in order to ensure database writes are not buffered on the client.

With these options we're sacrificing speed for higher fault tolerance but we can compensate for that tradeoff by creating more topic partitions and running more concurrent consumers in parallel.

So, here is what our database and consumer configurations look like:

Table table;
String tableName = "/user/mapr/ticktable";
if (MapRDB.tableExists(tableName)) {
    table = MapRDB.getTable(tableName);
} else {
    table = MapRDB.createTable(tableName);
}
table.setOption(Table.TableOption.BUFFERWRITE, false);

Properties props = new Properties();
props.put("enable.auto.commit","false");
props.put("group.id", “mygroup”);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
List<String> topics = new ArrayList<>();
topics.add(topic);
consumer.subscribe(topics);

Step 2: Consume records from the stream

To consume records from a stream, you first poll the stream. This gives you a collection of ConsumerRecords which you then iterate through in order to access each individual stream record. This is standard Kafka API stuff, and it looks like this:

ConsumerRecords<String, byte[]> records = consumer.poll(TIMEOUT);
Iterator<ConsumerRecord<String, byte[]>> iter = msg.iterator();
while (iter.hasNext())
{
    ConsumerRecord<String, byte[]> record = iter.next();
}

Step 3: Convert streamed records to JSON

Before we write consumer records to the database, we need to put each record in a format that has columns. In our example we’re streaming byte arrays, which by themselves have no field related attributes, so we need to convert these byte arrays into a type containing attributes that will correspond to columns in our database. We’ll do this with a Java object, defined in Tick.java, which uses the @JsonProperty annotation to conveniently convert Tick objects encoded as byte arrays into a JSON document, like this:

Tick tick = new Tick(record.value());
Document document = MapRDB.newDocument((Object)tick);

Step 4: Persist the JSON document to HPE Ezmeral Data Fabric Document Database

This part is easy. We can insert each JSON document as a new row to a table in MapR Database with one line of code, like this:

table.insertOrReplace(tick.getTradeSequenceNumber(), document);

The first parameter in the insertOrReplace method is Document ID (or rowkey). It’s a property of our dataset that the value returned by tick.getTradeSequenceNumber() is unique for each record, so we’re referencing that as the Document ID for our table insert in order to avoid persisting duplicate records even if duplicate messages are consumed from the stream. This guarantees idempotency in our stream consumer.

Step 5: Update the stream cursor

Finally, we’ll update the cursor in our stream topic. In the unlikely event that our stream consumer fails, this ensures that a new consumer will be able to continue working from where the last consumer left off.

consumer.commitSync();

Summary

The design we just outlined provides a scalable approach to persisting stream data. It ensures thread-safety by processing immutable stream data with idempotent stream consumers and achieves fault tolerance by updating stream cursors only after records have been persisted in HPE Ezmeral Data Fabric Document Database. This represents an elastic, responsive, resilient, and message-driven design consistent with the characteristics of reactive microservices. This is a reliable approach to persisting Kafka streams in long-term NoSQL storage.

If you'd like to see a complete example application that uses the techniques described in this post, check out the Persister.java in the Application blueprint on GitHub.

Editor’s Note: MapR products referenced are now part of the HPE Ezmeral Data Fabric.

To view more articles on this topic, be sure to check back regularly on the HPE DEV blog site.

Related

Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Don Wake

On-Premise Adventures: How to build an Apache Spark lab on Kubernetes

Jun 15, 2021
Carol McDonald

Using Apache Spark DataFrames for Processing of Tabular Data

Dec 13, 2021
Denis Choukroun

Deep Learning Model Training – A First-Time User’s Experience with Determined – Part 2

May 3, 2022
Didier Lalli

Calling all developers… Make your voices heard!

May 28, 2021
Joel Baxter & Kartik Mathur & Don Wake

Building Dynamic Machine Learning Pipelines with KubeDirector

Aug 14, 2020
Sridhar Reddy

File, objects, databases and streams – Oh my!

Mar 31, 2022
Ellen Friedman

How fine-grained data placement helps optimize application performance

Oct 22, 2021

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.