A Journey from 8K records/sec to 88K records/sec and towards 1 Million records/sec.
In our previous series of blogs on Apache Storm, we presented our use-case implementation along with the performance figures corresponding to it. The performance metrics worried us a bit, as 8000 records/sec is not enough for any usecase to manage the tremendous amount of data generated by telecom nodes. Immediately after writing the blog, we felt a greater urge to analyze in-depth on what happened in our previous work and rectify the numbers. The rest of this blog is introspection of what we did to increase the performance to 88K: This is a journey from 8K to 88K.
Having a good understanding of Internals of Storm is important when working in a production environment.
MYSTERIES TO BE ALLOWED
We came finally to 3 major worries for us to look at. They are:
- Why in this world were we only able to crunch 8000 records per second and what is the real bottleneck?
- Is it really scaling when we do Aggregation?
- Does Process like Aggregation, becomes a bottle neck while processing data?
There were several other things to solve, but we thought the solutions to these questions will automatically answer all of our other questions.
First and foremost, you need to understand your data and what kind of process you are trying to solve with a real time framework.
My Hardware Setup consists of 3 servers:
Server 1: Nimbus, Supervisor
CPU(s): 8, Thread(s) per core: 1, RAM: 24GB
Server 2: Supervisor
CPU(s): 8, Thread(s) per core: 1, RAM: 24 GB
Server 3: Supervisor
CPU(s): 16, Thread(s) per core: 2, RAM: 16 GB
My Query pattern is mostly queries aggregating 20 measures across 62 dimensions in real-time and emitting the aggregated output every 5 minutes. For those of you who are familiar with SQL, the pattern was a typical SELECT sum(M1), sum(M2), sum(M3),…..sum(M20) GROUP BY (D1),(D2),……. D(60)
HITTING 1 MILLION RECORDS/SEC
To establish a baseline, before even jumping into solving the problems, I tried to find out how a plain aggregation works. I tested different scenarios to conclude real-time aggregation is not a bottleneck while using Storm.
Lesson 1: Cardinality of dimensions doesn’t mater
Considering 4 Instances of Generator Spout and 6 Instances of Aggregator, the performances of aggregating 2 dimensions with two measures for different cardinality looks like this:
Lesson 2: Increase in number of dimensions and measures doesn’t affect the results
Considering 4 Instances of Generator Spout and Six Instances of Aggregator, the performances of aggregating for different dimensions and measures for 10000 cardinality looks like this:
Lesson 3: Increase in number of workers will increase the performance almost linearly where their capacity is greater than 1.
Considering 4 Instances of Generator Spout and 6 Instances of Aggregator, the performances of aggregating for 6 dimensions and 4 measures for 10000 cardinality looks like:
Analysis: The above three results clearly shows aggregation is not a bottleneck and we were able to achieve 1 Million records/sec with this limited hardware setup.
Surprise, Surprise, Surprise!!! From our previous study, all of us thought that the aggregation is the bottleneck behind the numbers of 8K records/sec. True surprise it is… I was able to hit 1 million records per sec easily when I did aggregation alone.
It’s clear that I missed a Sun in a cloudy day ☺
Relooking at the topology, where we had a file spout, flattener bolt and aggregator bolt; our focus now shifted to the Flattener bolt.
The Flattener Bolt contains a code which parses the network probe data in JSON format and finds out useful information from them. This is a CPU intensive module and hence was throttling all the available CPU’s in the servers and became a bottleneck for the whole topology to perform.
For us, it meant that we needed to rethink on the way we implemented the flattener code. One major change that we did was switching from Java Parser to Google Parser. And, rewriting the whole code gave us a performance increase from 1000 records/sec to 28000 records/sec for a single thread.
Now its time for us to determine the parallelism of each of the bolts, and the spout, to achieve an evenly distributed processing without a bottleneck. We ended up evaluating each bolt and spout as a single thread POJO and calculated their performance. Ratio of their performance enlightened us on how to design a better topology in terms of parallelism.
INTRA-MESSAGING vs INTER-MESSAGING
As we know, Internal messaging happens within a worker relies on LMAX Disruptor while communication between different workers happens through Netty (ZeroMQ in older versions). Our different scenario testing shows that performance increases when we keep the processing within minimal workers and also maintaining a capacity of each worker lesser than 1.
Considering 30 instances of File Spout, 120 instances of Flattening Bolt and 90 instances of Aggregation Bolt, we were able to get a performance of 88000 records/sec.
The above graph clearly shows that when we increase the number of workers from 1 to 12, the performance increased due to increase in usage of resources. While further increase to 24 worker decreases the performance, as the time spent on inter-communication is more compared to processing the data.
The study shows we need to balance between the resources used (in our case, CPU) and the number of workers to harvest a good performance from the topology.
We learned that several sets of points which need to be remembered while designing the topology in Storm. They are:
- If some executors share the same worker which is a single JVM by itself, this will impact the CPU performance of the worker as multiple executors run in that single worker parallel.
- If more than 1 task runs per executor, it will run the task serially only… It helps only when we want to rebalance, when the number of executors is increased/decreased as the number of tasks remains the same.
- Make sure capacity of workers are close to 1.000, up till that, have more number of executors in that worker. If it exceeds, increase the number of workers.
We also found some implementation leanings (Storm 0.9.2 version):
- Storm has an error of not picking worker arguments from Java API.
- Once Worker Memory is full, it gets killed then gets restarted without any indication of the cause of the failure in the log.
- Storm bolts are processed in threads. It becomes a good practice to be thread safe… eg: Instead of HashMap, use ConcurrentHashMap or SynchornizedHashMap.
So, by configuring the workers and executors along with good implementation skills, I was able to improve the performance to 88000 records per second. This can be easily increased to 1 million records per second when I can scale my CPU capacity to the required value by Flattener Bolt.
There are other performance improvements like Serialization tuning, Buffer Tuning, Use of Distributed In-Memory Databases, which I will leave for my next post, so stayed tuned!
– N. Hari Kumar, Ericsson Research