DevDocsDev Docs
DynamoDB

DynamoDB Streams

Real-time change data capture for DynamoDB tables

DynamoDB Streams captures a time-ordered sequence of item-level changes in your table. Use streams to replicate data, trigger workflows, and build event-driven architectures.

Streams Overview

What are Streams?

DynamoDB Streams records every modification (insert, update, delete) to items in your table. Each record contains the item's key and optionally the before/after images.

Enabling Streams

Enable streams on table
aws dynamodb update-table \
  --table-name Orders \
  --stream-specification \
    StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

Stream View Types

View TypeContainsUse Case
KEYS_ONLYPrimary key onlyTrigger actions, fetch separately
NEW_IMAGEItem after modificationEvent processing
OLD_IMAGEItem before modificationAudit, rollback
NEW_AND_OLD_IMAGESBoth imagesFull change tracking
Create table with streams
aws dynamodb create-table \
  --table-name AuditLog \
  --attribute-definitions \
    AttributeName=id,AttributeType=S \
  --key-schema \
    AttributeName=id,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST \
  --stream-specification \
    StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

Stream Records

Each stream record contains:

Stream record structure
{
  "eventID": "abc123",
  "eventName": "INSERT" | "MODIFY" | "REMOVE",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-1",
  "dynamodb": {
    "Keys": {
      "orderId": {"S": "order-123"}
    },
    "NewImage": {
      "orderId": {"S": "order-123"},
      "status": {"S": "pending"},
      "total": {"N": "99.99"}
    },
    "OldImage": null,
    "SequenceNumber": "123456789",
    "SizeBytes": 256,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2024-01-01T00:00:00.000"
}

Processing Streams with Lambda

The most common way to process streams:

Create Lambda event source mapping
aws lambda create-event-source-mapping \
  --function-name ProcessOrderChanges \
  --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2024-01-01T00:00:00.000 \
  --batch-size 100 \
  --starting-position LATEST \
  --maximum-batching-window-in-seconds 5 \
  --parallelization-factor 2
Lambda stream handler
export const handler = async (event) => {
  const batchItemFailures = [];
  
  for (const record of event.Records) {
    try {
      const { eventName, dynamodb } = record;
      const keys = dynamodb.Keys;
      
      switch (eventName) {
        case 'INSERT':
          await handleInsert(dynamodb.NewImage);
          break;
        case 'MODIFY':
          await handleModify(dynamodb.OldImage, dynamodb.NewImage);
          break;
        case 'REMOVE':
          await handleRemove(dynamodb.OldImage);
          break;
      }
    } catch (error) {
      console.error(`Failed to process ${record.eventID}:`, error);
      batchItemFailures.push({ itemIdentifier: record.eventID });
    }
  }
  
  return { batchItemFailures };
};

const handleInsert = async (newImage) => {
  const item = unmarshall(newImage);
  console.log('New item:', item);
  // Send notification, update search index, etc.
};

const handleModify = async (oldImage, newImage) => {
  const oldItem = unmarshall(oldImage);
  const newItem = unmarshall(newImage);
  
  // Detect status change
  if (oldItem.status !== newItem.status) {
    console.log(`Status changed: ${oldItem.status} → ${newItem.status}`);
    await sendStatusNotification(newItem);
  }
};

const handleRemove = async (oldImage) => {
  const item = unmarshall(oldImage);
  console.log('Deleted item:', item);
  // Clean up related data
};

Event Filtering

Reduce Lambda invocations by filtering events:

Add event filter
aws lambda update-event-source-mapping \
  --uuid abc123-def456 \
  --filter-criteria '{
    "Filters": [
      {
        "Pattern": "{\"eventName\": [\"INSERT\", \"MODIFY\"]}"
      }
    ]
  }'
Filter by attribute value
aws lambda update-event-source-mapping \
  --uuid abc123-def456 \
  --filter-criteria '{
    "Filters": [
      {
        "Pattern": "{\"dynamodb\": {\"NewImage\": {\"status\": {\"S\": [\"pending\", \"processing\"]}}}}"
      }
    ]
  }'

Stream Processing Options

Best for:

  • Serverless processing
  • Simple transformations
  • AWS service integrations
Lambda with batch processing
export const handler = async (event) => {
  // Process up to 10,000 records per batch
  console.log(`Processing ${event.Records.length} records`);
  
  // Batch operations
  const items = event.Records
    .filter(r => r.eventName !== 'REMOVE')
    .map(r => unmarshall(r.dynamodb.NewImage));
  
  // Bulk write to another service
  await bulkIndex(items);
};

Best for:

  • Long-running consumers
  • Complex processing logic
  • Multi-consumer scenarios
KCL worker example
public class OrderStreamProcessor implements ShardRecordProcessor {
    @Override
    public void processRecords(ProcessRecordsInput input) {
        for (Record record : input.records()) {
            // Process each DynamoDB stream record
            StreamRecord streamRecord = (StreamRecord) record;
            handleRecord(streamRecord);
        }
    }
}

Enable Kinesis Data Streams for DynamoDB for advanced streaming:

Enable Kinesis destination
aws dynamodb enable-kinesis-streaming-destination \
  --table-name Orders \
  --stream-arn arn:aws:kinesis:us-east-1:123456789012:stream/order-stream

Benefits:

  • Longer retention (up to 365 days)
  • Multiple consumers
  • Fan-out capabilities
  • Enhanced data capture format

Common Use Cases

1. Cross-Region Replication

Replicate to another region
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { PutCommand, DeleteCommand, DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";

const replicaClient = DynamoDBDocumentClient.from(
  new DynamoDBClient({ region: "eu-west-1" })
);

export const handler = async (event) => {
  for (const record of event.Records) {
    const { eventName, dynamodb } = record;
    
    if (eventName === 'REMOVE') {
      await replicaClient.send(new DeleteCommand({
        TableName: "Orders-Replica",
        Key: unmarshall(dynamodb.Keys)
      }));
    } else {
      await replicaClient.send(new PutCommand({
        TableName: "Orders-Replica",
        Item: unmarshall(dynamodb.NewImage)
      }));
    }
  }
};

For production cross-region replication, use DynamoDB Global Tables instead of custom stream processing.

2. Search Index Sync

Sync to Elasticsearch/OpenSearch
import { Client } from '@opensearch-project/opensearch';

const osClient = new Client({ node: process.env.OPENSEARCH_ENDPOINT });

export const handler = async (event) => {
  const operations = [];
  
  for (const record of event.Records) {
    const { eventName, dynamodb } = record;
    const id = dynamodb.Keys.id.S;
    
    if (eventName === 'REMOVE') {
      operations.push(
        { delete: { _index: 'orders', _id: id } }
      );
    } else {
      operations.push(
        { index: { _index: 'orders', _id: id } },
        unmarshall(dynamodb.NewImage)
      );
    }
  }
  
  if (operations.length > 0) {
    await osClient.bulk({ body: operations });
  }
};

3. Audit Trail

Write to audit table
export const handler = async (event) => {
  const auditRecords = event.Records.map(record => ({
    PutRequest: {
      Item: {
        auditId: { S: record.eventID },
        tableName: { S: 'Orders' },
        eventName: { S: record.eventName },
        timestamp: { S: new Date().toISOString() },
        keys: { M: record.dynamodb.Keys },
        oldImage: record.dynamodb.OldImage 
          ? { M: record.dynamodb.OldImage } 
          : { NULL: true },
        newImage: record.dynamodb.NewImage 
          ? { M: record.dynamodb.NewImage } 
          : { NULL: true }
      }
    }
  }));
  
  // Batch write to audit table
  await dynamodb.send(new BatchWriteItemCommand({
    RequestItems: {
      'AuditLog': auditRecords
    }
  }));
};

4. Materialized Views

Update aggregates
export const handler = async (event) => {
  const updates = {};
  
  for (const record of event.Records) {
    if (record.eventName === 'INSERT') {
      const item = unmarshall(record.dynamodb.NewImage);
      const key = item.category;
      
      if (!updates[key]) {
        updates[key] = { count: 0, total: 0 };
      }
      updates[key].count += 1;
      updates[key].total += item.price;
    }
  }
  
  // Update category aggregates
  for (const [category, stats] of Object.entries(updates)) {
    await dynamodb.send(new UpdateCommand({
      TableName: 'CategoryStats',
      Key: { category },
      UpdateExpression: 'ADD itemCount :count, totalValue :total',
      ExpressionAttributeValues: {
        ':count': stats.count,
        ':total': stats.total
      }
    }));
  }
};

Stream Processing Best Practices

Enable Partial Batch Failure

Report failed items instead of reprocessing the entire batch:

return {
  batchItemFailures: [
    { itemIdentifier: failedRecordEventId }
  ]
};

Use Event Filtering

Filter events at the source to reduce Lambda invocations:

{
  "Pattern": "{\"eventName\": [\"INSERT\"]}"
}

Implement Idempotency

Use eventID or SequenceNumber as idempotency keys:

const processed = await cache.get(record.eventID);
if (processed) {
  console.log('Already processed, skipping');
  continue;
}

Handle Ordering

Records within a shard are ordered. Use parallelization carefully:

--parallelization-factor 1  # Preserve order
--parallelization-factor 10 # Maximum parallelism

Stream Shards

Streams are divided into shards:

List stream shards
aws dynamodbstreams describe-stream \
  --stream-arn arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2024-01-01T00:00:00.000
ConceptDescription
ShardOrdered sequence of stream records
Shard IteratorPosition within a shard
Parent ShardPrevious shard (for shard splits)

Lambda automatically manages shard iterators. For custom consumers (KCL), you must track shard lineage.

Stream Retention

  • Stream records are retained for 24 hours
  • After 24 hours, records are automatically deleted
  • For longer retention, use Kinesis Data Streams destination

Disabling Streams

Disable streams
aws dynamodb update-table \
  --table-name Orders \
  --stream-specification StreamEnabled=false

Disabling streams is irreversible for that stream ARN. Re-enabling creates a new stream with a new ARN.

Monitoring Streams

Get stream iterator age
aws cloudwatch get-metric-statistics \
  --namespace AWS/Lambda \
  --metric-name IteratorAge \
  --dimensions Name=FunctionName,Value=ProcessOrderChanges \
  --start-time $(date -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ) \
  --end-time $(date +%Y-%m-%dT%H:%M:%SZ) \
  --period 300 \
  --statistics Maximum

Key metrics:

  • IteratorAge: How far behind processing is (milliseconds)
  • GetRecords.Success: Successful read operations
  • GetRecords.IteratorAgeMilliseconds: Iterator age per shard

Best Practices

Stream Best Practices

  1. Use NEW_AND_OLD_IMAGES when possible for complete context
  2. Enable partial batch failure to avoid reprocessing
  3. Filter events to reduce Lambda invocations
  4. Monitor IteratorAge for processing lag
  5. Implement idempotency for at-least-once processing
  6. Use DLQ for failed records
  7. Consider Kinesis destination for longer retention
  8. Test with production-like volume before going live

Next Steps

On this page