Event-Driven Architecture
Build reactive systems with asynchronous event-based communication
Event-Driven Architecture
Event-Driven Architecture (EDA) is a software design pattern where the flow of the program is determined by events—significant changes in state. Components communicate through events rather than direct calls, enabling loose coupling and high scalability.
Core Concepts
Events
Immutable records of something that happened. Events are facts, not requests.
Producers
Components that detect state changes and publish events.
Consumers
Components that subscribe to and react to events.
Event Broker
Infrastructure that routes events from producers to consumers.
Event Store
Persistent log of all events for replay and audit.
Eventual Consistency
System reaches consistent state over time, not immediately.
Event Types
// Base event interface
interface DomainEvent<T = unknown> {
id: string;
type: string;
aggregateId: string;
aggregateType: string;
payload: T;
metadata: {
timestamp: Date;
version: number;
correlationId: string;
causationId?: string;
userId?: string;
};
}
// Domain Events - What happened in the domain
interface OrderCreatedEvent extends DomainEvent<{
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
shippingAddress: Address;
}> {
type: 'OrderCreated';
aggregateType: 'Order';
}
interface PaymentCompletedEvent extends DomainEvent<{
orderId: string;
amount: number;
method: 'card' | 'paypal' | 'bank';
transactionId: string;
}> {
type: 'PaymentCompleted';
aggregateType: 'Payment';
}
// Integration Events - For cross-service communication
interface OrderCreatedIntegrationEvent {
eventId: string;
eventType: 'integration.order.created';
timestamp: Date;
data: {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number }>;
total: number;
};
}
// Event factory
const createEvent = <T>(
type: string,
aggregateId: string,
aggregateType: string,
payload: T,
correlationId?: string
): DomainEvent<T> => ({
id: crypto.randomUUID(),
type,
aggregateId,
aggregateType,
payload,
metadata: {
timestamp: new Date(),
version: 1,
correlationId: correlationId || crypto.randomUUID(),
},
});Messaging Patterns
Publish/Subscribe Pattern
Producers publish events without knowing who will consume them.
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
const sns = new SNSClient({ region: 'us-east-1' });
const createEventPublisher = (topicArn: string) => ({
publish: async <T extends DomainEvent>(event: T) => {
const command = new PublishCommand({
TopicArn: topicArn,
Message: JSON.stringify(event),
MessageAttributes: {
eventType: {
DataType: 'String',
StringValue: event.type,
},
aggregateType: {
DataType: 'String',
StringValue: event.aggregateType,
},
},
});
await sns.send(command);
console.log(`Published event: ${event.type}`, { eventId: event.id });
},
});
// Usage
const orderEvents = createEventPublisher(process.env.ORDER_TOPIC_ARN!);
await orderEvents.publish({
id: crypto.randomUUID(),
type: 'OrderCreated',
aggregateId: orderId,
aggregateType: 'Order',
payload: { customerId, items, total },
metadata: {
timestamp: new Date(),
version: 1,
correlationId: crypto.randomUUID(),
},
});import type { SQSEvent } from 'aws-lambda';
// SNS -> SQS -> Lambda subscriber
export const handler = async (event: SQSEvent) => {
for (const record of event.Records) {
// SNS wraps the message
const snsMessage = JSON.parse(record.body);
const domainEvent = JSON.parse(snsMessage.Message) as DomainEvent;
console.log(`Received event: ${domainEvent.type}`, {
eventId: domainEvent.id,
aggregateId: domainEvent.aggregateId,
});
await handleEvent(domainEvent);
}
};
const handleEvent = async (event: DomainEvent) => {
switch (event.type) {
case 'OrderCreated':
await sendOrderConfirmationEmail(event.payload);
break;
case 'PaymentCompleted':
await updateOrderStatus(event.payload.orderId, 'paid');
break;
default:
console.log(`Unhandled event type: ${event.type}`);
}
};Event Streaming (Kafka-style)
Persistent, ordered event log that consumers can replay.
// Using Upstash Kafka (serverless-friendly)
import { Kafka } from '@upstash/kafka';
const kafka = new Kafka({
url: process.env.UPSTASH_KAFKA_URL!,
username: process.env.UPSTASH_KAFKA_USERNAME!,
password: process.env.UPSTASH_KAFKA_PASSWORD!,
});
const producer = kafka.producer();
const publishToStream = async <T extends DomainEvent>(
topic: string,
event: T,
partitionKey?: string
) => {
await producer.produce(topic, JSON.stringify(event), {
key: partitionKey || event.aggregateId,
headers: {
'event-type': event.type,
'correlation-id': event.metadata.correlationId,
},
});
};
// Publish with ordering guarantee (same partition key = same partition)
await publishToStream('orders', orderCreatedEvent, order.customerId);import { Kafka } from '@upstash/kafka';
const kafka = new Kafka({
url: process.env.UPSTASH_KAFKA_URL!,
username: process.env.UPSTASH_KAFKA_USERNAME!,
password: process.env.UPSTASH_KAFKA_PASSWORD!,
});
const consumer = kafka.consumer();
// Consume from a specific offset (enables replay)
const consumeFromStream = async (
topic: string,
groupId: string,
handler: (event: DomainEvent) => Promise<void>
) => {
const messages = await consumer.consume({
consumerGroupId: groupId,
instanceId: `${groupId}-${process.pid}`,
topics: [topic],
autoCommit: false,
});
for (const message of messages) {
const event = JSON.parse(message.value) as DomainEvent;
try {
await handler(event);
// Commit offset after successful processing
await consumer.commit({
consumerGroupId: groupId,
instanceId: `${groupId}-${process.pid}`,
topics: [{ topic, partition: message.partition, offset: message.offset }],
});
} catch (error) {
console.error('Failed to process event:', error);
// Don't commit - message will be redelivered
throw error;
}
}
};Point-to-Point Message Queue
Each message is processed by exactly one consumer.
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({ region: 'us-east-1' });
const createQueueProducer = (queueUrl: string) => ({
send: async <T>(message: T, options?: {
delaySeconds?: number;
deduplicationId?: string;
groupId?: string;
}) => {
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message),
DelaySeconds: options?.delaySeconds,
// For FIFO queues
MessageDeduplicationId: options?.deduplicationId,
MessageGroupId: options?.groupId,
});
const result = await sqs.send(command);
return result.MessageId;
},
sendBatch: async <T>(messages: T[]) => {
const entries = messages.map((msg, i) => ({
Id: `msg-${i}`,
MessageBody: JSON.stringify(msg),
}));
// SQS allows max 10 messages per batch
const batches = chunk(entries, 10);
for (const batch of batches) {
await sqs.send(new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: batch,
}));
}
},
});
// Usage
const orderQueue = createQueueProducer(process.env.ORDER_QUEUE_URL!);
await orderQueue.send({ orderId, action: 'process' });import type { SQSEvent, SQSBatchResponse } from 'aws-lambda';
export const handler = async (event: SQSEvent): Promise<SQSBatchResponse> => {
const batchItemFailures: { itemIdentifier: string }[] = [];
// Process messages in parallel
const results = await Promise.allSettled(
event.Records.map(async (record) => {
const message = JSON.parse(record.body);
await processMessage(message);
return record.messageId;
})
);
// Report failures for retry
results.forEach((result, index) => {
if (result.status === 'rejected') {
batchItemFailures.push({
itemIdentifier: event.Records[index].messageId,
});
}
});
return { batchItemFailures };
};Request/Reply Pattern
Asynchronous request-response over messaging.
import { SQSClient, SendMessageCommand, ReceiveMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({ region: 'us-east-1' });
const requestReply = async <TReq, TRes>(
requestQueue: string,
replyQueue: string,
request: TReq,
timeoutMs = 30000
): Promise<TRes> => {
const correlationId = crypto.randomUUID();
// Send request
await sqs.send(new SendMessageCommand({
QueueUrl: requestQueue,
MessageBody: JSON.stringify(request),
MessageAttributes: {
correlationId: { DataType: 'String', StringValue: correlationId },
replyTo: { DataType: 'String', StringValue: replyQueue },
},
}));
// Poll for response
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
const response = await sqs.send(new ReceiveMessageCommand({
QueueUrl: replyQueue,
MessageAttributeNames: ['correlationId'],
WaitTimeSeconds: 5,
MaxNumberOfMessages: 10,
}));
const match = response.Messages?.find(
m => m.MessageAttributes?.correlationId?.StringValue === correlationId
);
if (match) {
// Delete the message
await sqs.send(new DeleteMessageCommand({
QueueUrl: replyQueue,
ReceiptHandle: match.ReceiptHandle!,
}));
return JSON.parse(match.Body!) as TRes;
}
}
throw new Error('Request timeout');
};
// Usage
const inventory = await requestReply<CheckInventoryRequest, InventoryResponse>(
INVENTORY_REQUEST_QUEUE,
INVENTORY_REPLY_QUEUE,
{ productIds: ['prod-1', 'prod-2'] }
);Event Sourcing
Store state as a sequence of events instead of current state.
interface StoredEvent {
id: string;
streamId: string;
type: string;
data: unknown;
metadata: {
timestamp: Date;
version: number;
correlationId: string;
};
}
const createEventStore = (db: Database) => ({
append: async (
streamId: string,
events: Omit<StoredEvent, 'id' | 'metadata'>[],
expectedVersion?: number
) => {
return db.transaction(async (tx) => {
// Optimistic concurrency check
const currentVersion = await tx.eventStream
.findUnique({ where: { id: streamId } })
.then(s => s?.version ?? 0);
if (expectedVersion !== undefined && currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, got ${currentVersion}`
);
}
// Append events
const storedEvents = events.map((event, i) => ({
id: crypto.randomUUID(),
streamId,
type: event.type,
data: event.data,
metadata: {
timestamp: new Date(),
version: currentVersion + i + 1,
correlationId: crypto.randomUUID(),
},
}));
await tx.event.createMany({ data: storedEvents });
// Update stream version
await tx.eventStream.upsert({
where: { id: streamId },
create: { id: streamId, version: currentVersion + events.length },
update: { version: currentVersion + events.length },
});
return storedEvents;
});
},
getStream: async (streamId: string, fromVersion = 0) => {
return db.event.findMany({
where: {
streamId,
metadata: { path: ['version'], gte: fromVersion },
},
orderBy: { metadata: { path: ['version'], sort: 'asc' } },
});
},
getAllStreams: async (fromTimestamp?: Date) => {
return db.event.findMany({
where: fromTimestamp ? {
metadata: { path: ['timestamp'], gte: fromTimestamp },
} : undefined,
orderBy: { metadata: { path: ['timestamp'], sort: 'asc' } },
});
},
});// Order aggregate with event sourcing
type OrderEvent =
| { type: 'OrderCreated'; data: { customerId: string; items: OrderItem[] } }
| { type: 'OrderConfirmed'; data: { confirmedAt: Date } }
| { type: 'OrderShipped'; data: { trackingNumber: string } }
| { type: 'OrderCancelled'; data: { reason: string } };
type OrderState = {
id: string;
customerId: string;
items: OrderItem[];
status: 'pending' | 'confirmed' | 'shipped' | 'cancelled';
trackingNumber?: string;
};
// Rebuild state from events
const rebuildOrder = (events: OrderEvent[]): OrderState | null => {
return events.reduce<OrderState | null>((state, event) => {
switch (event.type) {
case 'OrderCreated':
return {
id: crypto.randomUUID(),
customerId: event.data.customerId,
items: event.data.items,
status: 'pending',
};
case 'OrderConfirmed':
return state ? { ...state, status: 'confirmed' } : null;
case 'OrderShipped':
return state ? {
...state,
status: 'shipped',
trackingNumber: event.data.trackingNumber,
} : null;
case 'OrderCancelled':
return state ? { ...state, status: 'cancelled' } : null;
default:
return state;
}
}, null);
};
// Order aggregate with business rules
const createOrderAggregate = (eventStore: EventStore) => ({
create: async (customerId: string, items: OrderItem[]) => {
const orderId = crypto.randomUUID();
await eventStore.append(`order-${orderId}`, [{
type: 'OrderCreated',
data: { customerId, items },
}]);
return orderId;
},
confirm: async (orderId: string) => {
const events = await eventStore.getStream(`order-${orderId}`);
const order = rebuildOrder(events as OrderEvent[]);
if (!order) throw new NotFoundError('Order not found');
if (order.status !== 'pending') {
throw new InvalidOperationError('Can only confirm pending orders');
}
await eventStore.append(
`order-${orderId}`,
[{ type: 'OrderConfirmed', data: { confirmedAt: new Date() } }],
events.length // Optimistic concurrency
);
},
ship: async (orderId: string, trackingNumber: string) => {
const events = await eventStore.getStream(`order-${orderId}`);
const order = rebuildOrder(events as OrderEvent[]);
if (!order) throw new NotFoundError('Order not found');
if (order.status !== 'confirmed') {
throw new InvalidOperationError('Can only ship confirmed orders');
}
await eventStore.append(
`order-${orderId}`,
[{ type: 'OrderShipped', data: { trackingNumber } }],
events.length
);
},
getState: async (orderId: string) => {
const events = await eventStore.getStream(`order-${orderId}`);
return rebuildOrder(events as OrderEvent[]);
},
});Project Structure
Projections (Read Models)
Build optimized read models from events.
// Projection that builds a read model from events
type OrderSummaryProjection = {
orderId: string;
customerId: string;
customerName: string;
totalItems: number;
totalAmount: number;
status: string;
createdAt: Date;
updatedAt: Date;
};
const createOrderSummaryProjector = (db: Database) => ({
project: async (event: DomainEvent) => {
switch (event.type) {
case 'OrderCreated': {
const payload = event.payload as OrderCreatedPayload;
await db.orderSummary.create({
data: {
orderId: event.aggregateId,
customerId: payload.customerId,
totalItems: payload.items.reduce((sum, i) => sum + i.quantity, 0),
totalAmount: payload.total,
status: 'pending',
createdAt: event.metadata.timestamp,
updatedAt: event.metadata.timestamp,
},
});
break;
}
case 'OrderConfirmed':
await db.orderSummary.update({
where: { orderId: event.aggregateId },
data: {
status: 'confirmed',
updatedAt: event.metadata.timestamp,
},
});
break;
case 'OrderShipped': {
const payload = event.payload as OrderShippedPayload;
await db.orderSummary.update({
where: { orderId: event.aggregateId },
data: {
status: 'shipped',
trackingNumber: payload.trackingNumber,
updatedAt: event.metadata.timestamp,
},
});
break;
}
}
},
// Rebuild projection from scratch
rebuild: async () => {
await db.orderSummary.deleteMany();
const events = await eventStore.getAllStreams();
for (const event of events) {
await this.project(event);
}
},
});
// Subscribe to events and update projection
eventBus.subscribe('Order.*', async (event) => {
await orderSummaryProjector.project(event);
});Error Handling & Reliability
import type { SQSEvent } from 'aws-lambda';
// DLQ processor - handle failed messages
export const dlqHandler = async (event: SQSEvent) => {
for (const record of event.Records) {
const originalMessage = JSON.parse(record.body);
const attributes = record.messageAttributes;
// Log for investigation
console.error('DLQ Message:', {
messageId: record.messageId,
originalQueue: attributes?.SourceQueue?.stringValue,
errorMessage: attributes?.ErrorMessage?.stringValue,
retryCount: attributes?.RetryCount?.stringValue,
body: originalMessage,
});
// Store in database for manual review
await db.deadLetterMessage.create({
data: {
messageId: record.messageId,
body: originalMessage,
sourceQueue: attributes?.SourceQueue?.stringValue,
errorMessage: attributes?.ErrorMessage?.stringValue,
receivedAt: new Date(),
},
});
// Alert if critical
if (isCriticalEvent(originalMessage)) {
await alertOps('Critical event in DLQ', originalMessage);
}
}
};// Ensure events are processed exactly once
const createIdempotentHandler = <T>(
handler: (event: DomainEvent<T>) => Promise<void>,
storage: IdempotencyStorage
) => {
return async (event: DomainEvent<T>) => {
const key = `${event.type}:${event.id}`;
// Check if already processed
const existing = await storage.get(key);
if (existing) {
console.log(`Event already processed: ${key}`);
return;
}
try {
// Process the event
await handler(event);
// Mark as processed
await storage.set(key, {
processedAt: new Date(),
eventId: event.id,
}, { ttl: 86400 * 7 }); // 7 days TTL
} catch (error) {
// Don't mark as processed if failed
throw error;
}
};
};
// DynamoDB-based idempotency storage
const dynamoIdempotencyStorage: IdempotencyStorage = {
get: async (key: string) => {
const result = await dynamodb.get({
TableName: 'IdempotencyKeys',
Key: { pk: key },
});
return result.Item;
},
set: async (key: string, value: unknown, options?: { ttl?: number }) => {
await dynamodb.put({
TableName: 'IdempotencyKeys',
Item: {
pk: key,
...value,
ttl: options?.ttl
? Math.floor(Date.now() / 1000) + options.ttl
: undefined,
},
});
},
};
// Usage
const idempotentOrderHandler = createIdempotentHandler(
processOrderEvent,
dynamoIdempotencyStorage
);type RetryConfig = {
maxRetries: number;
baseDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
};
const defaultRetryConfig: RetryConfig = {
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000,
backoffMultiplier: 2,
};
const withRetry = async <T>(
fn: () => Promise<T>,
config: Partial<RetryConfig> = {}
): Promise<T> => {
const { maxRetries, baseDelayMs, maxDelayMs, backoffMultiplier } = {
...defaultRetryConfig,
...config,
};
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
if (attempt === maxRetries) break;
// Don't retry non-retryable errors
if (!isRetryableError(error)) throw error;
// Exponential backoff with jitter
const delay = Math.min(
baseDelayMs * Math.pow(backoffMultiplier, attempt) + Math.random() * 1000,
maxDelayMs
);
console.log(`Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
await sleep(delay);
}
}
throw lastError!;
};
const isRetryableError = (error: unknown): boolean => {
if (error instanceof Error) {
// Network errors, timeouts, rate limits
return [
'ECONNRESET',
'ETIMEDOUT',
'ENOTFOUND',
'TooManyRequestsException',
'ServiceUnavailable',
].some(code => error.message.includes(code));
}
return false;
};When to Use Event-Driven Architecture
✅ Good For
- Loose coupling - Services don't need to know about each other
- Scalability - Producers and consumers scale independently
- Audit trails - Full history of what happened
- Real-time features - Notifications, dashboards
- Complex workflows - Sagas, choreography
- Multiple consumers - Same event, different reactions
❌ Avoid When
- Simple CRUD - Overhead not worth it
- Strong consistency - Need immediate confirmation
- Simple request/response - Direct calls are simpler
- Small teams - Complexity overhead
- Debugging priority - Harder to trace flows
Summary
Event-Driven Architecture enables:
- Decoupling - Services communicate through events, not direct calls
- Scalability - Producers and consumers scale independently
- Resilience - Failed consumers don't affect producers
- Flexibility - Add new consumers without changing producers
- Audit Trail - Complete history of all state changes
Key patterns to master:
- Pub/Sub for broadcasting events
- Event Streaming for ordered, replayable logs
- Event Sourcing for audit trails and temporal queries
- Projections for optimized read models
- Sagas for distributed transactions