Trident - benchmarking performance
Our experiments show this distributed platform is better suited for real-time applications with low memory usage than for those with high memory usage.
There are a number of big data frameworks, Hadoop as an example, to process data that is readily available as files or database tables. For real time applications, it is required to process data as and when it is produced – it may be a few records or thousands of records, depending on the data production rate - and many batch processing oriented frameworks like Hadoop map-reduce are not efficient for that purpose. For real time applications, Trident is one among the suitable frameworks.
Our team in Ericsson Research has performed experiments with the intention to compare the performance of Trident for real time applications with high memory footprint – say aggregation – and those with low memory footprint – say word count or flattening of complex substructures.
Based on the results, we found that Trident performs much better for flattening compared to aggregation.
In this blog post, I’ll share the essentials about how we did this and elaborate a bit on the results and learnings. I hope you will find this useful!
Trident in short:
Trident is a distributed framework with capabilities of intermixing high throughput stream processing, state manipulation and low latency querying, for real time applications.
In the Trident world, a job is called a topology. A topology can be imagined like a DAG of tasks.
There are two kinds of nodes in a storm cluster, namely master node and worker node. The master node runs a daemon called “Nimbus” that is similar to Hadoop’s “JobTracker”. Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring failures.
Each worker node runs a daemon called the “Supervisor”. The supervisor listens for work assigned to it to start and stop worker process as necessary based on the job assigned by Nimbus. Each worker process executes a subset of a topology. A topology can consist of many worker processes spread across multiple machines. The communication between nimbus and worker nodes is managed using a zookeeper cluster.
The core abstraction in Trident is the “stream”. A stream is an unbounded sequence of tuples. Trident provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics. A spout is the source of streams. For example, a spout may read tuples from a kestrel queue and emit them as a stream. Bolts perform operations on input streams and emit new processed streams. A bolt can consume any number of input streams. Networks of spouts and bolts are packaged into a topology, which is the job that a user submits to Storm clusters for execution.
The use case we selected for our experiment was “Aggregation of KPI’s in ESR’s at an IMSI level”. An ESR is a generally 5-minute log file that contains, in a hierarchical JSON structure, all the KPIs and their respective dimensions for a given subscriber. Dimension denotes the parameters along which KPIs are grouped. Aggregating ESRs means that long-term statistics are calculated for selected KPIs, for example “total number of downlink bytes per subscriber” or “average transport layer throughput per terminal type per mobile cell”. In order to make the aggregated data fit for the use case it is applied for, the KPIs and dimensions are carefully selected beforehand.
The aggregated values are used by operators mostly for analytics purposes, for example root cause analysis of service degradation experienced by customers, and for various reporting purposes. It is also helpful in next generation networks, like SON, where necessary tuning/optimization mechanisms can be done based on change in trends in aggregated values at various granularities (say 5 minute aggregates, weekly aggregates).
The aggregation is performed for relatively short time batches, e.g. fifteen minute-aggregates, so that further time aggregations could be done almost arbitrarily.
Fig 1 General topology
Number of workers: 3
Grouping Strategy : Shuffle Grouping
3 node clusters, each with 64 GB of memory and 6-core dual processors with hyperthreading. The machines are interconnected through a 1 Gbps switch and contain approximately 3 TB of hard disks
The experiment was carried out in three phases.
- In the first phase, ESR's were flattened using the Flattener Bolt and then aggregated using the Aggregator & In memory store bolt.
- In the second phase, the aggregator & In memory store bolt was disabled and the performance of the flattener bolt for flattening ESR's was measured.
The motivation to do such an experiment in the second phase was to benchmark the performance of Trident for a use case like flattening which has low memory usage.
- In the third phase, the logic to flatten ESR's was placed inside the spout so that the spout directly generated flattened ESR's. This was done in to reduce the number of bolts in the topology.
Fig 2 Topology used for the third phase
Fig 3 Results
From the results, it is quite evident that Trident performs much better for flattening ESRs than for aggregating KPIs in ESRs. This is because memory usage is not involved for flattening the complex substructures in ESR’s. As and when an ESR arrives, it is flattened and sent as an output event stream. But for aggregation, there is heavy memory usage in storing the aggregated values of each IMSI in memory to add to the future events of the respective IMSI. This is true both for time based and length based aggregations.
Performance measurements were made with the same topology only for flattening i.e. the aggregator and persistor bolts were disabled to check the performance. Approximately 25,000 events per second could be flattened when three nodes was used. But, when the aggregator and persistor bolts were enabled, this dropped to approximately 7,375 records per second in three nodes. When the flattening logic was placed inside the spout, phase 3 above, there was a sharp drop in the performance to nearly 3,000 events per second in all three nodes in the aggregator bolt. The performance drop in the spout was a major factor for the reduction in performance in this case.
Periodic garbage collection: When latency and CPU usage of the topology were plotted in a line graph, regular periodic peaks could be observed in both, as shown in figure 1. We found that this was due to the periodic garbage collection that is triggered automatically. The size of the memory allocated to each worker was increased and it was configured to use all the cores for garbage collection. This increased the intervals between successive garbage collection pauses, once garbage collection was triggered, the time to stabilize was longer.
The configuration settings used were these:
worker.childopts: "-XX:+HeapDumpOnOutOfMemoryError -Xmx28192m -Xverify:none -XX:NewSize=15G -XX:MaxNewSize=15G -XX:SurvivorRatio=512 -XX:+CMSConcurrentMTEnabled”
Fig 4 CPU load average and latency plots
- Reducing the number of ack’ers increased the performance by few hundred raw ESR’s per second.
Having a light weight spout increased the performance twofold. For example, letting the spout do the flattening decreased the performance to half compared to having a bolt perform flattening.
- Grouping strategy to split events among multiple bolts plays an important role in performance. For example, shuffle grouping performs better than fields grouping. Shuffle grouping is randomly splitting the event streams across the bolts so that all bolts get the same load. Fields grouping is grouping tuples by values of certain fields in the events and sending one or more group of tuples to a bolt. Even load distribution is not guaranteed in case of fields grouping.
The data structure used to hold data in the bolt’s memory if no in-memory DB is used plays a vital role for performance.
- A larger number of workers across different machines decreased the performance. One or two workers per machine gave better performance.
If the memory allocated to a worker is increased, it gives good improvement in performance by (approximately 2000 ESR’s more) with a single worker, but scales poorly.
- With default memory size allocations to workers, scaling is linear up to 5 workers (in 3 machines). If no. of workers is increased beyond 5, the performance goes down from 10,000 records per second to 4,500 records per second
When max.threads is increased, workers are shutdown periodically. supervisor.worker.timeout.secs and worker.heartbeat.frequency.seconds are the parameters to play with for this case.
- Enabling acknowledgements per tuple, reduces the performance from 1.8 Million ESR’s per minute to 0.7 Million ESR’s per minute for flattening.
Frequent Error with Netty : “Timeout value is negative”
- The default value of storm.messaging.netty.max_retries is 30. This causes the sleep time to exceed the maximum integer value and hence it evaluates to negative value. Reducing the value of this property avoids this error.
Having more distinct bolts/stages in the topology reduces performance. Deciding what functionality should go into a single bolt plays a major role for performance.
- Storm master gets higher load in terms of number of workers in most cases, compared to the client worker machines in the storm cluster. No further conclusions were drawn in this experiment.
Runtime rebalancing of topologies works, but does not improve performance in the long run. Performance seems to improve immediately after rebalancing, but soon start to rollback to previous performance.
To conclude, further tuning is always possible in order to increase performance in Trident. But, our general recommendation is that Trident is extremely useful for real time applications with low memory footprint (like flattening, word count etc.). We plan to do further experiments to optimize Trident for real time applications with a heavy memory footprint. So stay tuned for another post on comparison between storm and Trident benchmarks.