This is Part II of the Data Streaming from DynamoDB series. You can read Part I where the primary focus is on a use case where stream processing is helpful(indexing data in ElasticSearch). We evaluated the available options to process streams and discussed in detail how Stream processing can be done using Kinesis Client Library(KCL). In this post we will see how to scale up stream processing for very large throughput. As a use case, we will look at online migration of a Cassandra database to DynamoDB and processing streams to index the same data in ElasticSearch.
Why scale up stream processing?
Let us look at a service that has about 250000 active users. They do about 2 million writes to this database. So over the years, the database, currently in Cassandra, has acquired a lot of data. Now, we want to migrate this database to DynamoDB. (Note: To serve 2 million writes a day, one worker seems to be enough to index the data in ElasticSearch in near-real time, as there is no complex processing involved.) Since we cannot take downtime and would like to complete this migration in a reasonable amount of time, we decide to migrate data on a per-user basis from Cassandra to DynamoDB. Now the rate of migration is really dependent on the rate of writes to DynamoDB; so also the writes to ElasticSearch.
Now, there is a difference between how DynamoDB scales up and how ElasticSearch scales up. We can provision a range of values to autoscale for WCUs to a DynamoDB table, but we setup a cluster with predefined capacity for ElasticSearch. Assuming ElasticSearch is appropriately provisioned to support current and a little into the future of the estimated traffic and data, the data migration use case really boils down to DynamoDB. Again, DynamoDB can scale up to a certain limit(40000 WCUs shared by the table and LSIs and Global tables if any) and so do the corresponding streams. These are soft limits which can be raised by support. Then the rate of migration ultimately comes down to how fast the stream is processed so corresponding data appears in ElasticSearch.
Imagine that there is only one worker, but DynamoDB writes are at 40000 WCUs( with a few indexes, let us say effective rate is 10000 WCU to the table and therefore the stream). In that case, it will be several hours before the corresponding data appears in ElasticSearch. Moreover, the table continues to get writes, streams continue to grow. After 24 hours streams become unavailable for reads and if the worker is really lagging at that point, there will be data loss. Even in a less dramatic scenario, a service that does 2 million writes a day, provided that the partition key is somewhat uniformly distributed, can benefit from more than a single worker and make use of the parallelism inbuilt to the system. Let’s learn more.
DynamoDB Streams and Shards
As shown in the picture above, one DynamoDB partition corresponds to one shard in DynamoDB stream, which can be processed by one KCL worker. So if the table has multiple partitions, stream processing can benefit from multiple workers.
How many workers do we need?
How do we calculate the number of partitions in a table? There are several equations floating around claiming to help calculate this number. Even older AWS documents point to a link which now does not show this calculation. Based on their documentation however we can somewhat guess that a partition cannot process more than 1000 WCUs roughly. Again, I’m not sure how accurate this number is. What has been useful for me is to actually see it in action.
This is one foolproof way to find the number of shards in the stream, therefore the number of workers and therefore the number of partitions in the base table. As discussed in Part I of this post series, configure a KCL worker to process the table stream. Then login to AWS console, look for a new table by the name of the worker you have configured. KCL workers checkpoint using a DynamoDB table of the same name and therefore we can see a table that we didn’t create after the worker has started processing stream.
For a table with min WCU configured as 250(has 3 LSIs), the corresponding worker table has 8 items. There are 6 open shards and 2 that are checkpointed with SHARD_END. LeaseOwner column shows the workerId of the shard that holds the current lease to the shard. LeaseKey is the shardId. So now we know how many shards we have in the stream and therefore how many workers can process the stream in parallel.
Tips on scaling up writes and workers
Initially I had these configurations for the table:
RCU: Autoscale. Min: 250 Max: 3000
WCU: Autoscale. Min: 250 Max: 40000
What I observed is every time it auto scaled, there was slowness in production performance for 2 minutes. After all this is a live system that does online migrations. Also the number of workers was very small, which do not autoscale. So we started seeing data loss, most likely due to workers not scaling as much as the write throughput on DynamoDB table and stream.
The other issue was that the DynamoDB table automatically created for a worker doesn’t auto-scale. It has RCU and WCU of 5 when it is created. Soon, reads to this table were getting throttled.
It is best to do a few trial runs to learn about the nature of throttling and throughput values of the main table, worker table and the efficiency of workers in processing the stream. A few metrics that will help are Throttled write requests and events.
Most often these throttling events don’t appear in the application logs as throttling errors are retriable. So looking at these metrics in AWS console help understand if there is a need to increase WCU for the table. In this case, clearly it will help as requests are throttled by the hundreds every once in a while.
We can also see metrics from Streams with the number of records returned per batch and the latency in the following:
The other metric to watch out is the capacity on the worker table. As we can see, for this table, the rate at which data is read/written the default values of 5 without autoScaling is no good. WCU required is 50–70.
How to prevent data loss and achieve near real time processing
From our use case to do high throughput online migration, we have a few useful insights. Here in addition to writing to DynamoDB, all data also need to be indexed in near real time in ElasticSearch via KCL workers configured to process DynamoDB streams.
Create a large number of workers ahead of time: DynamoDB can auto-scale. Lambda functions can auto-scale. But KCL workers that process streams will not auto-scale. They will continue to process one shard per worker. When DynamoDB autoscales and increases capacity, shards split into two. When shards split, there will be double the number of shards as before and therefore require double the number of workers prior to the split to cover all shards, if the write throughput sustains at large numbers.
PS1: A worker in KCL is a thread which takes an id. Each worker is uniquely identified by the workerId. KCL configuration allows you to name your workerIds within an application. Here is the sample code to create ‘multiple workers’.
PS2: If you cannot create enough threads to process the stream in one EC2 instance/JVM, it is beneficial to scale out EC2 instances/JVMs with the same KCL worker configuration, only workerIds need to be different so they are unique across all threads that process the same stream so that the stream shards are distributed correctly. (Similar to Kafka consumer group and consumers). In our case we have a Kubernetes pod for processing DynamoDB stream that we deploy as a ReplicaSet. We can spin up more workers if we need to across multiple EC2 instances by increasing the number of replicas.
In the sample code, there are 9 worker threads are created with application fooWorker. Each worker is uniquely identified by the workerId. One streamWorker can process a single shard. Maximum throughput for processing streams to achieve near real time processing is by configuring same number of workers as the number of shards. More workers will sit idle. Having more workers is not a bad idea if there is a chance for the table/stream scale out further.
Configure DynamoDB table to use high WCU from the beginning: We did see data loss in ElasticSearch during our test runs, possibly due to workers not catching up fast enough and a few shards that were never processed. In order to avoid running into this uncertainty, if we anticipate sustained high throughput over a long period of time, it is better to provision that table with the large throughput than to wait for it to be throttled and thereafter autoscale. While autoscaling shards split up and increase in number. Since migration is only going to take a finite amount of time, we can afford to have the high WCUs provisioned on the table, instead of having to worry about proving that all data migrated correctly, the latter being much harder.
Configure auto-scaling for the worker table: The fooWorker KCL application in this case will create a fooWorker DynamoDB table for checkpointing. This table needs to have higher capacity than the defaults with which it gets automatically created. Ensure that you either provision the required capacity or enable autoScaling/onDemand so stream workers are not throttled while processing streams.
With these settings enabled, ElasticSearch indexes are being updated mostly near real time. We are able to verify that all data eventually reaches ElasticSearch without significant lag and that there is no data loss.
That brings us to the end of 2 part series for processing DynamoDB Streams for near real time indexing in ElasticSearch at high throughput.