DynamoDB Stream Processing
DynamoDB Streams makes change data capture from database available on an event stream. One of the use cases for processing DynamoDB streams is to index the data in ElasticSearch for full text search or doing analytics. In this post, we will evaluate technology options to process streams for this use case. In a subsequent post, we will dive into details on scaling up the stream processing, if this approach is followed.
DynamoDB Streams
Enable DynamoDB Streams in the table specification
"StreamSpecification": {
"StreamEnabled": true,
"StreamViewType": "NEW_AND_OLD_IMAGES"
}
Note: If you are planning to use GlobalTables for DynamoDB, where a copy of your table is maintained in a different AWS region, “NEW_AND_OLD_IMAGES” needs to be enabled.
After streams are enabled on a table, the streamArn is required to configure a client application to process streams. It will look like this:
"arn:aws:dynamodb:{aws-region}:{aws-account-number}:table/{table-name}/stream/2019-11-07T20:49:20.459"
More on how table activity is captured on DynamoDB Streams
2 approaches to process streams
Serverless approach:
The easiest approach to index data from DynamoDB into ElasticSearch for example is to enable a Lambda function, as documented here: https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws-integrations-dynamodb-es
There are several reasons why I do not prefer a Lambda function for our use case. Some of them are:
- Deployment complexity: We run our services in Kubernetes pods, one for each type of application. Adding in a lambda function/serverless will change the deployment topology and bring in more complexity to our deployment automation.
- Observability: The only way to observe what happens inside a Lambda function is to use CloudWatch service. We already have a different stack of observability framework to use and analyze information from application logs and would like to continue to leverage that. If we decide to use Lambda function, we need to capture logs from Cloudwatch and publish them to s3 buckets to push to the stack.
- Skill set of the team: We are primarily application engineers who switch to DevOps mode when needed. We prefer to work with client libraries in java/kotlin compared to other languages/tools/frameworks for production systems that we need to maintain as a team of 3 engineers.
Here are the reasons why AWS advocates use of Lambda function:
- Ability to autoscale stream processing. Unless you have a really large workload and really complicated processing, lambda functions would work. There is no need to make additional effort to scale up stream processing.
- CloudWatch metrics: All metrics go to CloudWatch and that should help with observability if you already have that built in place.
- Limitation on throughput: There is a 100 record per shard limit on how many records are processed at a time. KCL workers allow more throughput per batch based on what I heard.
Hosted Service approach:
Since we ruled out Lambda function, the other approach is to use KCL(Kinesis Client Library) worker with DynamoDB Adapter for processing DynamoDB streams. Since we are building java/kotlin services and are primarily application developers, this option is better aligned with the skill set of the team for long term maintainability of the stack.
In this case an application is built around KCL with DynamoDB Adapter, that creates a worker configured to listen to changes to the stream and process them.
The disadvantage with using KCL workers is that we need to scale up workers on our own based on performance requirements in processing the stream. More about that in the upcoming post. The advantage is that it is really another application deployed alongside your main service and you can leverage your existing deployment infrastructure(a separate pod on a Kubernetes cluster), code infrastructure(Springboot application) and the telemetry/observability stack you are already familiar with for logging and troubleshooting.
Stream processing requires KCL to instantiate a worker. We must provide the worker with configuration information for the application, such as the stream arn and AWS credentials, and the record processor factory implementation.
As mentioned in the documentation, the worker performs the following tasks. For most cases, we don’t have to tweak any of these settings. It is good to know that these are the activities happening behind the scenes. The worker:
- Connects to the stream.
- Enumerates the shards within the stream.
- Coordinates shard associations with other workers (if any).
- Instantiates a record processor for every shard it manages.
- Pulls records from the stream.
- Pushes the records to the corresponding record processor.
- Checkpoints processed records.
- Balances shard-worker associations when the worker instance count changes.
- Balances shard-worker associations when shards are split.
DynamoDB writes data into shards(based on the partition key). Each shard is open for writes for 4 hours and open for reads for 24 hours. Essentially, KCL worker will subscribe to this stream, pulls records from the stream and pushes them to the record processor implementation that we will provide. KCL will allow a worker per shard and the data lives in the stream for 24 hours. These are important limits to remember. We will discuss throughput and latency of stream processing in a bit.
Worker configuration
So far we know that we need a KCL worker with the right configuration and a record processor implementation that processes the stream and does the checkpointing. How do we actually go about doing it?
Let’s say we have 4 DynamoDB tables whose data need to be indexed in ElasticSearch. Each table produces a stream, identified by the streamArn. Now we need KCL 4 workers, one each for each stream. Here is a sample. Most values can be left as defaults, except the AWS credentials and the identifiers of stream and worker.
StreamRecordProcessor implementation
KCL requires us to provide a StreamRecordProcessorFactory implementation to actually process the stream. Details in the docs: https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html
Provide implementations for IRecordProcessor and IRecordProcessorFactory. Refer https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java
override fun processRecords(processRecordsInput: ProcessRecordsInput) {
processRecordsWithRetries(processRecordsInput.records)
checkpoint(processRecordsInput);
}
processRecordsWithRetries: This is where the stream processing logic will live. In our specific case, we will generate an id for the document based on the keys in DynamoDB table and create an index/delete request in ElasticSearch. Note that it is advantageous to use the Bulk indexing in ElasticSearch to reduce roundtrip time thereby increasing throughput and reducing latency for data to appear in ElasticSearch. At the rate of indexing a few hundred records every second, I have seen them appear in ElasticSearch within 200 ms. Note that, KCL absorbs any exception thrown from the processRecords and moves forward to process next batch of events. So it is really critical to have an effective exception handling strategy, one that retries for retry-able errors(intermediate technical glitches) and another for handling non-retry-able errors(eg. invalid document wrt ElasticSearch mapping).
checkPoint: This is the mechanism used by the KCL worker to keep track of how much data from the stream has been read by the worker. So in case worker terminates/application restarts, it will catch up from the point where it was last checkpointed in the stream. This is similar to committing offsets in Kafka.
Code samples & References
AWS documentation on using KCL to process DynamoDB Stream is here: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
Here is some sample code from the docs that get one started on the record processing:
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.html
Throughput and Latency
What we have done so far will create a single worker to process the stream. What if that is not enough? We can determine if we need more worker threads based on the amount of writes to both DynamoDB and ElasticSearch. There are 2 ways to compare:
- Analyze the number of DynamoDB writes per minute and compare that to ElasticSearch writes.
- Instrument logging to trace a single record through the entire pipeline, both DynamoDB and ElasticSearch. So monitoring a single item can also provide data on how much lag is there for a record to move from DynamoDB to ElasticSearch.
If the application writes to DynamoDB a few hundred records at a time, usually 1 worker is probably enough. It also depends on how distributed the partition key is. Let’s say we found that it takes several minutes for the data to appear in ElasticSearch once it is written in DynamoDB. In such a case, the first parameter to examine is streamConfig.batchSize in the configuration above.
KinesisClientLibrary::maxRecords
If your application writes thousands of Items to DynamoDB, there is no point in keeping maxRecords low, eg. 100. A high number (default: 1000) will definitely improve the throughput and therefore latency of your data appearing in ElasticSearch. There is no reason to lower this value for most cases.
Now, there will be cases when you have high throughput writes (ie. several thousand writes per second) on your DynamoDB tables. In such cases a single worker is not going to be enough. We will discuss scaling up stream processing using KCL workers in the next post in this series.