DynamoDB Streams
Real-time change data capture for DynamoDB tables
DynamoDB Streams captures a time-ordered sequence of item-level changes in your table. Use streams to replicate data, trigger workflows, and build event-driven architectures.
Streams Overview
What are Streams?
DynamoDB Streams records every modification (insert, update, delete) to items in your table. Each record contains the item's key and optionally the before/after images.
Enabling Streams
aws dynamodb update-table \
--table-name Orders \
--stream-specification \
StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGESStream View Types
| View Type | Contains | Use Case |
|---|---|---|
| KEYS_ONLY | Primary key only | Trigger actions, fetch separately |
| NEW_IMAGE | Item after modification | Event processing |
| OLD_IMAGE | Item before modification | Audit, rollback |
| NEW_AND_OLD_IMAGES | Both images | Full change tracking |
aws dynamodb create-table \
--table-name AuditLog \
--attribute-definitions \
AttributeName=id,AttributeType=S \
--key-schema \
AttributeName=id,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--stream-specification \
StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGESStream Records
Each stream record contains:
{
"eventID": "abc123",
"eventName": "INSERT" | "MODIFY" | "REMOVE",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"orderId": {"S": "order-123"}
},
"NewImage": {
"orderId": {"S": "order-123"},
"status": {"S": "pending"},
"total": {"N": "99.99"}
},
"OldImage": null,
"SequenceNumber": "123456789",
"SizeBytes": 256,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2024-01-01T00:00:00.000"
}Processing Streams with Lambda
The most common way to process streams:
aws lambda create-event-source-mapping \
--function-name ProcessOrderChanges \
--event-source-arn arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2024-01-01T00:00:00.000 \
--batch-size 100 \
--starting-position LATEST \
--maximum-batching-window-in-seconds 5 \
--parallelization-factor 2export const handler = async (event) => {
const batchItemFailures = [];
for (const record of event.Records) {
try {
const { eventName, dynamodb } = record;
const keys = dynamodb.Keys;
switch (eventName) {
case 'INSERT':
await handleInsert(dynamodb.NewImage);
break;
case 'MODIFY':
await handleModify(dynamodb.OldImage, dynamodb.NewImage);
break;
case 'REMOVE':
await handleRemove(dynamodb.OldImage);
break;
}
} catch (error) {
console.error(`Failed to process ${record.eventID}:`, error);
batchItemFailures.push({ itemIdentifier: record.eventID });
}
}
return { batchItemFailures };
};
const handleInsert = async (newImage) => {
const item = unmarshall(newImage);
console.log('New item:', item);
// Send notification, update search index, etc.
};
const handleModify = async (oldImage, newImage) => {
const oldItem = unmarshall(oldImage);
const newItem = unmarshall(newImage);
// Detect status change
if (oldItem.status !== newItem.status) {
console.log(`Status changed: ${oldItem.status} → ${newItem.status}`);
await sendStatusNotification(newItem);
}
};
const handleRemove = async (oldImage) => {
const item = unmarshall(oldImage);
console.log('Deleted item:', item);
// Clean up related data
};Event Filtering
Reduce Lambda invocations by filtering events:
aws lambda update-event-source-mapping \
--uuid abc123-def456 \
--filter-criteria '{
"Filters": [
{
"Pattern": "{\"eventName\": [\"INSERT\", \"MODIFY\"]}"
}
]
}'aws lambda update-event-source-mapping \
--uuid abc123-def456 \
--filter-criteria '{
"Filters": [
{
"Pattern": "{\"dynamodb\": {\"NewImage\": {\"status\": {\"S\": [\"pending\", \"processing\"]}}}}"
}
]
}'Stream Processing Options
Best for:
- Serverless processing
- Simple transformations
- AWS service integrations
export const handler = async (event) => {
// Process up to 10,000 records per batch
console.log(`Processing ${event.Records.length} records`);
// Batch operations
const items = event.Records
.filter(r => r.eventName !== 'REMOVE')
.map(r => unmarshall(r.dynamodb.NewImage));
// Bulk write to another service
await bulkIndex(items);
};Best for:
- Long-running consumers
- Complex processing logic
- Multi-consumer scenarios
public class OrderStreamProcessor implements ShardRecordProcessor {
@Override
public void processRecords(ProcessRecordsInput input) {
for (Record record : input.records()) {
// Process each DynamoDB stream record
StreamRecord streamRecord = (StreamRecord) record;
handleRecord(streamRecord);
}
}
}Enable Kinesis Data Streams for DynamoDB for advanced streaming:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Orders \
--stream-arn arn:aws:kinesis:us-east-1:123456789012:stream/order-streamBenefits:
- Longer retention (up to 365 days)
- Multiple consumers
- Fan-out capabilities
- Enhanced data capture format
Common Use Cases
1. Cross-Region Replication
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { PutCommand, DeleteCommand, DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";
const replicaClient = DynamoDBDocumentClient.from(
new DynamoDBClient({ region: "eu-west-1" })
);
export const handler = async (event) => {
for (const record of event.Records) {
const { eventName, dynamodb } = record;
if (eventName === 'REMOVE') {
await replicaClient.send(new DeleteCommand({
TableName: "Orders-Replica",
Key: unmarshall(dynamodb.Keys)
}));
} else {
await replicaClient.send(new PutCommand({
TableName: "Orders-Replica",
Item: unmarshall(dynamodb.NewImage)
}));
}
}
};For production cross-region replication, use DynamoDB Global Tables instead of custom stream processing.
2. Search Index Sync
import { Client } from '@opensearch-project/opensearch';
const osClient = new Client({ node: process.env.OPENSEARCH_ENDPOINT });
export const handler = async (event) => {
const operations = [];
for (const record of event.Records) {
const { eventName, dynamodb } = record;
const id = dynamodb.Keys.id.S;
if (eventName === 'REMOVE') {
operations.push(
{ delete: { _index: 'orders', _id: id } }
);
} else {
operations.push(
{ index: { _index: 'orders', _id: id } },
unmarshall(dynamodb.NewImage)
);
}
}
if (operations.length > 0) {
await osClient.bulk({ body: operations });
}
};3. Audit Trail
export const handler = async (event) => {
const auditRecords = event.Records.map(record => ({
PutRequest: {
Item: {
auditId: { S: record.eventID },
tableName: { S: 'Orders' },
eventName: { S: record.eventName },
timestamp: { S: new Date().toISOString() },
keys: { M: record.dynamodb.Keys },
oldImage: record.dynamodb.OldImage
? { M: record.dynamodb.OldImage }
: { NULL: true },
newImage: record.dynamodb.NewImage
? { M: record.dynamodb.NewImage }
: { NULL: true }
}
}
}));
// Batch write to audit table
await dynamodb.send(new BatchWriteItemCommand({
RequestItems: {
'AuditLog': auditRecords
}
}));
};4. Materialized Views
export const handler = async (event) => {
const updates = {};
for (const record of event.Records) {
if (record.eventName === 'INSERT') {
const item = unmarshall(record.dynamodb.NewImage);
const key = item.category;
if (!updates[key]) {
updates[key] = { count: 0, total: 0 };
}
updates[key].count += 1;
updates[key].total += item.price;
}
}
// Update category aggregates
for (const [category, stats] of Object.entries(updates)) {
await dynamodb.send(new UpdateCommand({
TableName: 'CategoryStats',
Key: { category },
UpdateExpression: 'ADD itemCount :count, totalValue :total',
ExpressionAttributeValues: {
':count': stats.count,
':total': stats.total
}
}));
}
};Stream Processing Best Practices
Enable Partial Batch Failure
Report failed items instead of reprocessing the entire batch:
return {
batchItemFailures: [
{ itemIdentifier: failedRecordEventId }
]
};Use Event Filtering
Filter events at the source to reduce Lambda invocations:
{
"Pattern": "{\"eventName\": [\"INSERT\"]}"
}Implement Idempotency
Use eventID or SequenceNumber as idempotency keys:
const processed = await cache.get(record.eventID);
if (processed) {
console.log('Already processed, skipping');
continue;
}Handle Ordering
Records within a shard are ordered. Use parallelization carefully:
--parallelization-factor 1 # Preserve order
--parallelization-factor 10 # Maximum parallelismStream Shards
Streams are divided into shards:
aws dynamodbstreams describe-stream \
--stream-arn arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2024-01-01T00:00:00.000| Concept | Description |
|---|---|
| Shard | Ordered sequence of stream records |
| Shard Iterator | Position within a shard |
| Parent Shard | Previous shard (for shard splits) |
Lambda automatically manages shard iterators. For custom consumers (KCL), you must track shard lineage.
Stream Retention
- Stream records are retained for 24 hours
- After 24 hours, records are automatically deleted
- For longer retention, use Kinesis Data Streams destination
Disabling Streams
aws dynamodb update-table \
--table-name Orders \
--stream-specification StreamEnabled=falseDisabling streams is irreversible for that stream ARN. Re-enabling creates a new stream with a new ARN.
Monitoring Streams
aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name IteratorAge \
--dimensions Name=FunctionName,Value=ProcessOrderChanges \
--start-time $(date -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ) \
--end-time $(date +%Y-%m-%dT%H:%M:%SZ) \
--period 300 \
--statistics MaximumKey metrics:
- IteratorAge: How far behind processing is (milliseconds)
- GetRecords.Success: Successful read operations
- GetRecords.IteratorAgeMilliseconds: Iterator age per shard
Best Practices
Stream Best Practices
- Use NEW_AND_OLD_IMAGES when possible for complete context
- Enable partial batch failure to avoid reprocessing
- Filter events to reduce Lambda invocations
- Monitor IteratorAge for processing lag
- Implement idempotency for at-least-once processing
- Use DLQ for failed records
- Consider Kinesis destination for longer retention
- Test with production-like volume before going live