HPE Developer Community Portal
Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019, may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html
Original Post Information:
"authorDisplayName": "Mathieu Dumoulin", "publish": "2017-05-31T12:00:00.000", "tags": "apache-spark"
Real-world case study in the telecom industry
In a previous post, I pointed out how we were successfully able to accelerate an Apache Kafka/Spark Streaming/Apache Ignite application and turn a development prototype into a useful, stable streaming application – one that actually exceeded the performance goals set for the application. In this post, I’ll cover how we were able to tune a Kafka/Spark Streaming system and run it stably, without backing up under maximum production load.
Many of the lessons learned during this project would also apply to a similar system implemented using the MapR Data Platform. However, as we’ll explain later, a lot of the issues could have been avoided entirely, or at least greatly mitigated by using a converged platform instead of a multi-cluster approach.
The MapR Data Platform is the only currently available production-ready implementation of such a platform as of this writing.
Goal of the System
The Kafka/Spark Streaming system aims to provide better customer support by providing their support staff with always up-to-date call quality information for all their mobile customers.
Mobile customers, while making calls and using data, connect to the operator’s infrastructure and generate logs in many different systems. Three specific logs were identified that, if correlated with each other, give visibility in the actual quality of service experienced by each individual customer. The three logs were selected because they can be correlated through a simple relational database-like join operation.
For improving customer support, the quality of call information needs to be kept updated in near to real time; otherwise, it has no value. This has led, down the road, to building a streaming architecture rather than a batch job. The data volume at production load reaches several GB/s, generated by several million mobile customers, 24 hours a day, 365 days a year. Performance and stability at that scale is required for the system to reach production.
Project SLA Goals
The application has clear performance requirements based on the known worst-case throughput of the input data. This log data is generated by real-world use of the services of the company. If the application is to be useful at all, as a real-time streaming application, it must be able to handle this data without getting behind.
In term of numbers, the goal is to handle up to 3GB/min of input data. For this large mobile operator, such throughput represents about 150-200,000 events/second. Ordinarily, the throughput is about half of that value or 1.5GB/min and 60,000-80,000 events/second.
The raw data source are the logs of three remote systems, labeled A, B, and C here, where the log from A comprises about 84-85% of the entries, the log from B about 1-2%, and the log from C about 14-15%. The fact that the data is unbalanced is one of the (many) sources of difficulty in this application.
The raw data is ingested into the system by a single Kafka producer into Kafka running on 6 servers. The producer reads the various logs and adds each log's records into its own topic. As there are three logs, there are three Kafka topics.
The data is consumed by a Spark Streaming application, which picks up each topic, does a simple filter to cut out unnecessary fields, a map operation to transform the data, and then a foreachRDD operation (each micro-batch generates an RDD in Spark Streaming) that saves the data to Ignite and to HDFS as Hive tables for backup.
A second batch Spark application runs once per hour on the data stored in-memory in Ignite to join the records from the three separate logs into a single table. The batch job has a maximum data size of about 100GB. The cluster CPU resources should be sufficient to process this amount of data in one hour or less.
Ignite stores 3 hours’ worth of data at all time to account for calls that begin in one hour and end in the hour getting processed, as well as calls that begin in the target hour and end in the next one. The telecom operator judges that calls that are so long they aren’t captured in this scheme can be ignored, as they are very rare.
It’s worth noting that a better all-streaming architecture could have avoided the whole issue with the intermediate representation in the first place. An illustrative, real-world case with more time and thought upfront can make the entire project end faster than just rushing headlong into coding the first working solution that comes to mind.
System Hardware and Software: At the Bleeding Edge of Open Source Big Data
The cluster has a lot of CPU and memory resources. It has 12 nodes of enterprise-grade servers, each equipped with two E5 Xeon CPUs (16 physical cores), 256GB memory, and eight 6TB spinning HDD (2 for OS in RAID 1). Each server has one 10GbE network interface.
The technology stack selected for this project are centered around Kafka 0.8 for streaming the data into the system, Apache Spark 1.6 for the ETL operations (essentially a bit of filter and transformation of the input, then a join), and the use of Apache Ignite 1.6 as an in-memory shared cache to make it easy to connect the streaming input part of the application with joining the data. Backup is done to HDFS, as Hive ORC tables are also used to serve as a just-in-case backup for Ignite and to serve future need for other analytics use cases (none at the time).
The Spark applications are both coded in Scala 2.10 and Kafka’s direct approach (no receivers). Apache Ignite has a really nice Scala API with a magic IgniteRDD that can allow applications to share in-memory data, a key feature for this system to reduce coding complexity.
The cluster is running Apache Hadoop's HDFS as a distributed storage layer, with resources managed by Mesos 0.28. Finally, HBase is used as the ultimate data store for the final joined data. It will be queried by other systems outside the scope of this project. The cluster design with all relevant services is shown in the table above.
The original system had several issues:
- First Spark Streaming job is not stable
- Second Spark batch job can’t process 1 hour of data before the next hour of data arrives
- Stability: The application crashes under load
A Spark Streaming application is said to be stable if the processing time of each micro-batch is less than or equal to that micro-batch time. In this case, the application processes each 30 seconds of data in as much as 6 minutes. We need a 12x speedup.
Second, there is a batch process to join data one hour at a time that was targeted to run in 30 minutes but was taking over 2 hours to complete.
Third, the application was randomly crashing after running for a few hours. Stability of such a complex, fully open-source stack should never be assumed. Rather, it is the result of a constant effort by the team to better understand the system. We can expect that there will still be a lot of learning required to keep the system up and running once it is moved to production as well.
In my opinion, all performance and stability issues stem from the terrible idea of management to push a very good POC project developed on AWS into production on some on-premises hardware. It’s hard to believe, but they fully expected the POC code to run as-is on a production system it was never tested on.
Regardless, the task was set, and we had only a few short days to identify what could be done and get the system up to production speed. Final QA testing of the system was barely 1 week away, and management wasn’t in the mood to accept delays. We got to work...
First target: Improve Spark Streaming Performance
At maximum load, the Spark Streaming application is taking between 4.5 to 6 minutes for each micro-batch of 30 seconds. We need to find 9-12x speedup worth of improvements.
Spark has a lot of moving parts, but it will always be true that fast algorithms beat tweaking the configuration. In this case, there is nothing to get from the code; it’s all very parallelizable with no obvious issues, like doing two computations separately when they could be combined or any O(n^2) loop-in-another loop issues. The job is nothing more than a filter and a map.
What we need to determine, then, is whether the job is indeed being processed in parallel to make the most of all those CPU cores. In a Spark Streaming job, Kafka partitions map 1 to 1 with Spark partitions.
Increase Parallelism: Increase Number of Partitions in Kafka
A quick check of the Spark UI shows 36 partitions. As each server has 6 physical disks, I assume the choice of partitioning was selected by the formula node * physical disks = partition count per topic. Quickly checking online reveals that partitioning is quite a bit more complex than that and the formula to decide on partition number isn’t from any known Kafka best practices guide. (Ref: https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ )
The input data was unbalanced and most of the application processing time was spent processing Topic 1 (with 85% of the throughput). Kafka partitions are matched 1:1 with the number of partitions in the input RDD, leading to only 36 partitions, meaning we can only keep 36 cores busy on this task. To increase the parallelism, we need to increase the number of partitions. What we did was split topic 1 into 12 topics each with 6 partitions for a total of 72 partitions. The way it was done was a simple modification to the producer to evenly divide the data from the first log into 12 topics instead of just one. Zero code needed to be modified on the consumer side.
We also right-sized the number of partitions for the two other topics, in proportion to their relative importance in the input data, so we set topic 2 to two partitions and topic 3 to eight partitions.
Running more tasks in parallel. Before tuning, each stage always had 36 partitions!
Fix RPC Timeout Exceptions
When looking at the application logs, we could see a lot of RPC timeout exceptions. We do a web search and find what we believe is the relevant JIRA (SPARK-14140 in JIRA). The recommended fix is to increase the spark.executor.heartbeatInterval from 10s (default) to 20s.
I think this could be caused by nodes getting busy from disk or CPU spikes because of Kafka, Ignite, or garbage collector pauses. Since Spark runs on all nodes, the issue was random (see the cluster services layout table in the first section).
The configuration change fixed this issue completely. We haven’t seen it happen since. (Yay!)
Increase Driver and Executor Memory
Out of memory issues and random crashes of the application were solved by increasing the memory from 20g per executor to 40g per executor as well as 40g for the driver. Happily, the machines in the production cluster were heavily provisioned with memory. This is a good practice with a new application, since you don’t know how much you will need at first.
The issue was difficult to debug with precision and reliable information, since the Spark UI reports very little memory consumption. In practice, as this setting is easy to change, we empirically settled on 40g being the smallest memory size for the application to run stably.
Right Size the Executors
The original application was running only 3 executors with 72 total cores. We configured the application to run with 80 cores with a maximum of 10 cores per executor, for a total of 8 executors. Note that with 16 real cores per node on a 10 node cluster, we’re leaving plenty of resources for Kafka brokers, Ignite, and HDFS/NN to run on.
Increase the Batch Window from 30s to 1m
The data is pushed into Kafka by the producer as batches every 30s, as it is gathered by FTP batches from the remote systems. Such an arrangement is common in telecom applications due to a need to deal with equipment and systems from a bewildering range of manufacturers, technology, and age.
This meant that the input stream was very spiky, when looking at the processing time from the Spark UI’s streaming tab.
Increasing the window to 1m allowed us to smooth out the input and gave the system a chance to process the data in 1 minute or less and still be stable.
To make sure of it, the team had a test data which simulated the known worst-case data, and with the new settings, the Spark Streaming job was now indeed stable. We also tried it on real production data, and everything looked good. Win!
Drop Requirement to Save Hive Tables to HDFS
Discussion with the project managers revealed that Hive was not actually part of the requirements for the streaming application! Mainly, this is because the other analytics, mostly SQL requests, could be serviced from the data in HBase.
Considering the goal of the system, the worst-case scenario for missing data is that a customer's call quality information cannot be found... which is already the case. In other words, the consequence of data loss is not negative; rather, the consequence of gaining data is additional insights. If the great majority of the data is processed and stored, the business goals can be reached.
There wasn’t much point in saving the data to Hive mid-flight for increased fault-tolerance either, as once the data is in Ignite, it’s safe even if the Spark application crashes. This made Ignite an even more critical part of the application, despite it having some issues of its own. It was a difficult decision that we made entirely due to the advanced stage of the project. As we’ll explain in more detail in the conclusion, the architecture itself was problematic, and it’s not time to play with architecture when you’re a week or two from production.
Spark Performance Tuning Results
The Spark Streaming application finally became stable, with an optimized runtime of 30-35s.
As it turns out, cutting out Hive also sped up the second Spark application that joins the data together, so that it now ran in 35m, both now well within the project requirements.
With improvements from the next part, the final performance of the Spark Streaming job went down in the low 20s range, for a final speedup of a bit over 12 times.
Second target: Improve System Stability
We had to work quite hard on stability. Several strategies were required, as we will explain below.
Make the Spark Streaming Application Stable
The work we did to fix the performance had a direct impact on system stability. If both applications are stable themselves and running on right-sized resources, then the system has the best chance to be stable overall.
Remove Mesos and Use Spark Standalone
The initial choice of Mesos to manage resources was forward-looking, but ultimately we decided to drop it from the final production system. At the onset, the plan was to have Mesos manage all the applications. But the team never could get Kafka and Ignite to play nice with Mesos, and so they were running in standalone mode, leaving only Spark to be managed by Mesos. Surely, with more time, there is little doubt all applications could be properly configured to work with Mesos.
Proposing to remove Mesos was a bit controversial, as Mesos is much more advanced and cool than Spark running in standalone mode.
But the issue with Mesos was twofold:
- Control over executor size and number was poor, a known issue (SPARK-5095) with Spark 1.6 and now fixed in Spark 2.X.
- Ignite and Kafka aren’t running on Mesos, only Spark is. Given the schedule pressure, the team had given up trying to get those two services running in Mesos.
Mesos can only ever allocate resources well if it controls resources. In the case of this system, Kafka and Ignite are running outside of Mesos’ knowledge, meaning it’s going to assign resources to the Spark applications incorrectly.
In addition, it’s a single purpose cluster, so we can live with customizing the sizing of the resources for each application with a global view of the system’s resources. There is little need for dynamic resource allocations, scheduling queues, multi-tenancy, and other buzzwords.
Change the Ignite Memory Model
It is a known issue that when the Heap controlled by the JVM gets very big (> 32GB), the cost of garbage collection is quite large. We could indeed see this when the join application runs, where the stages with 25GB shuffle had some rows with spikes in GC time from 10 seconds range up to more than a minute.
The initial configuration of Ignite was to run ONHEAP_TIERED, with 48GB worth of data cached on heap, then overflow drops to 12GB of off-heap memory. That setting was changed to the OFFHEAP_TIERED model. While slightly slower due to serialization cost, OFFHEAP_TIERED doesn't rely on the JVM’s garbage collection. It still runs in memory, so we estimated it would be a net gain.
With this change, the run time for each batch dutifully came down by about five seconds, from 30 seconds down to about 25 seconds. In addition, successive batches tended to have much more similar processing time, with a delta of 1-3 seconds, whereas it would vary by over 5 to 10 seconds, previously.
Update the Ignite JVM Settings
We followed the recommended JVM options as found in Ignite documentation’s performance tuning section (http://apacheignite.gridgain.org/docs/jvm-and-system-tuning).
Improve the Spark Code
Some parts of the code assumed reliability, like queries to Ignite, when in fact there was possibility of the operations failing. These problems can be fixed in the code, which now handles exceptions more gracefully, though there is probably work left to increase the robustness of the code. We can only find these spots by letting the application run now.
Reassign ZooKeeper to Nodes 10-12
Given the cluster is of medium size, it’s worth spreading the services as much as possible. We moved the ZooKeeper services from nodes 1-3 to nodes 10-12.
Final System Architecture
Tuning this application took about 1 week of full-time work. The main information we used was Spark UI and Spark logs, easily accessible from the Spark UI. The view of Jobs and Stages as well as the streaming UI are really very useful.
- Migrating a streaming application from a prototype on AWS to an on-site cluster requires schedule time for testing
- Not testing the AWS prototype with realistic data was a big mistake
- Including many “bleeding-edge” OSS components (Apache Ignite and Mesos) with expectations of very high reliability is unrealistic
- A better architecture design could have simplified the system tremendously
- Tuning a Kafka/Spark Streaming application requires a holistic understanding of the entire system; it’s not just about changing parameter values of Spark. It’s a combination of the data flow characteristics, the application goals and value to the customer, the hardware and services, the application code, and then playing with Spark parameters.
- The MapR Data Platform would have cut the development time, complexity, and cost for this project.
This project was a hell of a dive in the deep end of the pool for a telecom operator with very little experience with the open-source enterprise big data world. They should be applauded for ambition and desire to take up such a challenge with the goal of benefiting their customers. But a better choice of platform and application architecture could have made their life a lot easier.
A Converged Platform is the Correct Approach
In fact, the requirements for this project show the real-world business need for a state-of-the-art converged platform with a fast distributed file system, high performance key-value store for persistence and real-time, and high performance streaming capabilities.
A MapR-based solution would have been a lot easier to build and maintain, absolutely for sure. Since MapR Event Store is built-in, there is one less cluster to manage (bye bye, Kafka brokers). The Spark application could run with the same code but without needing to rely on a speculative open-source project like Apache Ignite.
Saving to MapR Database uses the same HBase API, so likely no code change there either, and you’re saving to a DB that’s built into the native C MapR XD, so that’s going to be super fast as well. Finally, sharing the resources is simplified by running only Spark on YARN or standalone-mode, while the platform is left to deal with the resource requirements of the MapR Event Store, MapR XD, and MapR Database with reliability and performance, guaranteed, since highly trained support engineers are available 24/7 to support every single part of this application.
Given this system is heading into production for a telecom operator with 24/7 reliability expectation, I’d argue that built-in simplicity, performance, and support are pretty compelling and hopefully will be adopted by this customer for the next iteration of the system. (Stay tuned!)