Mastering the Saga Pattern for Distributed Transactions in Node.js & TypeScript: A Practical Guide to Reliable Microservices Coordination
Introduction
Micro‑service architectures give you the freedom to scale, deploy, and evolve services independently.
That freedom comes at a price: data consistency across service boundaries.
Traditional ACID transactions don’t work when a business operation spans several databases and network hops.
The Saga pattern solves this problem by breaking a distributed transaction into a series of local steps, each with a compensating action that undoes its work if something later fails.
In this article we’ll:
- Explain the two classic saga coordination models (choreography vs. orchestration).
- Show how to model sagas with TypeScript types that make intent explicit.
- Build a lightweight orchestration engine using Node.js, Express, and BullMQ (Redis‑backed job queues).
- Demonstrate compensations, idempotency, and observability.
- Provide a test harness that simulates failures and validates the saga’s correctness.
No external framework magic—just plain Node.js libraries you can drop into any existing code‑base.
1. Saga Basics Refresher
A saga is a sequence of local transactions T1 … Tn.
Each Ti:
- Executes within its own service and database (so it can commit instantly).
- Emits an event (or returns a result) that triggers the next step.
If any step fails, previously successful steps are compensated by running Ci, the logical inverse of Ti.
T1 → T2 → T3 → … → Tn
│ │ │ │
C1 ← C2 ← C3 ← … ← Cn (run backwards on error)
Two coordination styles exist:
| Model | Description | When to use |
|---|---|---|
| Choreography | Services react to events published on a message broker. No central controller. | Simple linear flows, low latency, few steps. |
| Orchestration | A dedicated saga orchestrator decides the next step, sends commands, and tracks state. | Complex branching, retries, time‑outs, or when you need a single source of truth. |
Our guide focuses on orchestration, because it gives deterministic visibility and fits well with TypeScript’s static typing.
2. Defining a Typed Saga Contract
We start by describing the saga’s shape in a shared TypeScript module. This contract lives in a separate @myorg/saga-contracts package that both orchestrator and participants can import.
// saga-contracts/src/orders.ts
export type OrderId = string & { readonly __brand: unique symbol };
export type PaymentId = string & { readonly __brand: unique symbol };
export type ShipmentId = string & { readonly __brand: unique symbol };
export interface CreateOrderCmd {
type: 'CreateOrder';
payload: { orderId: OrderId; amount: number };
}
export interface ReservePaymentCmd {
type: 'ReservePayment';
payload: { orderId: OrderId; paymentId: PaymentId; amount: number };
}
export interface ShipOrderCmd {
type: 'ShipOrder';
payload: { orderId: OrderId; shipmentId: ShipmentId };
}
/** Union of all commands the orchestrator can issue */
export type SagaCommand = CreateOrderCmd | ReservePaymentCmd | ShipOrderCmd;
/** Events emitted by services */
export interface OrderCreatedEvt {
type: 'OrderCreated';
payload: { orderId: OrderId };
}
export interface PaymentReservedEvt {
type: 'PaymentReserved';
payload: { orderId: OrderId; paymentId: PaymentId };
}
export interface OrderShippedEvt {
type: 'OrderShipped';
payload: { orderId: OrderId; shipmentId: ShipmentId };
}
export type SagaEvent = OrderCreatedEvt | PaymentReservedEvt | OrderShippedEvt;
/** Compensation commands */
export interface CancelOrderCmd {
type: 'CancelOrder';
payload: { orderId: OrderId };
}
export interface ReleasePaymentCmd {
type: 'ReleasePayment';
payload: { paymentId: PaymentId };
}
export interface CancelShipmentCmd {
type: 'CancelShipment';
payload: { shipmentId: ShipmentId };
}
export type CompensationCmd =
| CancelOrderCmd
| ReleasePaymentCmd
| CancelShipmentCmd;
Why brand types?
They prevent accidental mixing of IDs (orderId vs paymentId) while still being plain strings at runtime.
3. Building the Orchestrator
The orchestrator’s responsibilities:
- Persist saga state (
Pending,Completed,Compensating,Failed). - Issue commands to services via a job queue (BullMQ).
- Listen for events (via a Redis pub/sub channel).
- Trigger compensations when needed.
3.1 Project Layout
/orchestrator
│─ src/
│ ├─ saga/
│ │ ├─ state.ts // saga state model
│ │ └─ engine.ts // core orchestration logic
│ ├─ infra/
│ │ ├─ queue.ts // BullMQ wrapper
│ │ └─ eventBus.ts // Redis pub/sub wrapper
│ └─ server.ts // Express entry point
└─ tsconfig.json
3.2 Saga State Model
// src/saga/state.ts
import {
OrderId,
PaymentId,
ShipmentId,
SagaCommand,
CompensationCmd,
} from '@myorg/saga-contracts';
export type SagaStep = {
command: SagaCommand;
compensating: CompensationCmd;
completed: boolean;
};
export interface SagaInstance {
id: string; // UUID of the saga run
orderId: OrderId;
steps: SagaStep[];
status: 'Pending' | 'Completed' | 'Compensating' | 'Failed';
}
Persist the SagaInstance in Redis (HASH) or any durable store; for brevity we’ll use an in‑memory map in the example.
3.3 Queue Wrapper
// src/infra/queue.ts
import { Queue, Worker, Job } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis();
export const commandQueue = new Queue<SagaCommand>('saga:commands', {
connection,
});
export const compensationQueue = new Queue<CompensationCmd>('saga:compensations', {
connection,
});
/** Workers are created by each microservice; here we expose a helper for the orchestrator to listen to results */
export function createResultWorker<T>(name: string, handler: (job: Job<T>) => Promise<void>) {
return new Worker<T>(name, async (job) => {
await handler(job);
}, { connection });
}
3.4 Event Bus
// src/infra/eventBus.ts
import IORedis from 'ioredis';
import { SagaEvent } from '@myorg/saga-contracts';
const sub = new IORedis();
const pub = new IORedis();
const CHANNEL = 'saga:events';
export async function publish(event: SagaEvent) {
await pub.publish(CHANNEL, JSON.stringify(event));
}
export function subscribe(handler: (event: SagaEvent) => void) {
sub.subscribe(CHANNEL, (err, count) => {
if (err) throw err;
console.log(`Subscribed to ${CHANNEL} (${count} channels)`);
});
sub.on('message', (_, message) => {
const ev: SagaEvent = JSON.parse(message);
handler(ev);
});
}
3.5 Orchestration Engine
// src/saga/engine.ts
import { v4 as uuidv4 } from 'uuid';
import { commandQueue, compensationQueue } from '../infra/queue';
import { publish, subscribe } from '../infra/eventBus';
import {
SagaCommand,
CompensationCmd,
SagaEvent,
OrderCreatedEvt,
PaymentReservedEvt,
OrderShippedEvt,
} from '@myorg/saga-contracts';
import { SagaInstance, SagaStep } from './state';
const sagaStore = new Map<string, SagaInstance>(); // replace with Redis in prod
/** Helper to build a new saga definition */
function buildOrderSaga(orderId: string, amount: number): SagaInstance {
const steps: SagaStep[] = [
{
command: { type: 'CreateOrder', payload: { orderId, amount } },
compensating: { type: 'CancelOrder', payload: { orderId } },
completed: false,
},
{
command: {
type: 'ReservePayment',
payload: { orderId, paymentId: `${orderId}-pay` as any, amount },
},
compensating: {
type: 'ReleasePayment',
payload: { paymentId: `${orderId}-pay` as any },
},
completed: false,
},
{
command: {
type: 'ShipOrder',
payload: { orderId, shipmentId: `${orderId}-ship` as any },
},
compensating: {
type: 'CancelShipment',
payload: { shipmentId: `${orderId}-ship` as any },
},
completed: false,
},
];
return {
id: uuidv4(),
orderId,
steps,
status: 'Pending',
};
}
/** Starts a saga */
export async function startOrderSaga(orderId: string, amount: number) {
const saga = buildOrderSaga(orderId, amount);
sagaStore.set(saga.id, saga);
await dispatchNext(saga);
}
/** Send the next unapplied command */
async function dispatchNext(saga: SagaInstance) {
const nextStep = saga.steps.find((s) => !s.completed);
if (!nextStep) {
saga.status = 'Completed';
console.log(`Saga ${saga.id} completed`);
return;
}
await commandQueue.add('command', nextStep.command);
}
/** Event handler – called for every service event */
function onEvent(event: SagaEvent) {
// Find the saga that owns this orderId
const saga = [...sagaStore.values()].find((s) => s.orderId === (event.payload as any).orderId);
if (!saga) return;
// Mark the step as completed
const step = saga.steps.find((s) => s.command.type === mapEventToCommand(event.type));
if (step) step.completed = true;
// If the event signals failure we start compensation
if (isFailure(event)) {
saga.status = 'Compensating';
compensate(saga);
return;
}
// Otherwise continue
dispatchNext(saga);
}
/** Simple mapping – in real code use a lookup table */
function mapEventToCommand(eventType: SagaEvent['type']): SagaCommand['type'] {
switch (eventType) {
case 'OrderCreated':
return 'CreateOrder';
case 'PaymentReserved':
return 'ReservePayment';
case 'OrderShipped':
return 'ShipOrder';
default:
return '' as any;
}
}
/** Detect failure events – here we treat a missing event after timeout as failure */
function isFailure(event: SagaEvent): boolean {
// Placeholder: extend with explicit Failure events
return false;
}
/** Run compensations in reverse order */
async function compensate(saga: SagaInstance) {
for (let i = saga.steps.length - 1; i >= 0; i--) {
const step = saga.steps[i];
if (!step.completed) continue; // nothing to undo
await compensationQueue.add('compensate', step.compensating);
}
saga.status = 'Failed';
console.log(`Saga ${saga.id} compensated`);
}
/** Wire everything up */
export function initSagaEngine() {
subscribe(onEvent);
}
3.6 Express Entry Point
// src/server.ts
import express from 'express';
import { json } from 'body-parser';
import { startOrderSaga, initSagaEngine } from './saga/engine';
const app = express();
app.use(json());
app.post('/api/sagas/orders', async (req, res) => {
const { orderId, amount } = req.body;
if (!orderId || typeof amount !== 'number')
return res.status(400).json({ error: 'Invalid payload' });
await startOrderSaga(orderId, amount);
res.status(202).json({ message: 'Saga started' });
});
const PORT = process.env.PORT ?? 3000;
app.listen(PORT, () => {
console.log(`Saga orchestrator listening on ${PORT}`);
initSagaEngine();
});
4. Implementing Service Workers
Each microservice runs a BullMQ worker that consumes commands, performs its local transaction, and publishes an event.
4.1 Example: Payment Service
// payment-service/src/worker.ts
import { createResultWorker } from '@myorg/orchestrator/infra/queue';
import { publish } from '@myorg/orchestrator/infra/eventBus';
import { ReservePaymentCmd, PaymentReservedEvt } from '@myorg/saga-contracts';
createResultWorker<ReservePaymentCmd>('payment:commands', async (job) => {
const { orderId, paymentId, amount } = job.data.payload;
// Simulate DB write
console.log(`Reserving $${amount} for order ${orderId}`);
// idempotency guard – check a Redis key or DB row
// ...
const evt: PaymentReservedEvt = {
type: 'PaymentReserved',
payload: { orderId, paymentId },
};
await publish(evt);
});
4.2 Compensation Worker
// payment-service/src/compWorker.ts
import { createResultWorker } from '@myorg/orchestrator/infra/queue';
import { ReleasePaymentCmd } from '@myorg/saga-contracts';
createResultWorker<ReleasePaymentCmd>('payment:compensations', async (job) => {
const { paymentId } = job.data.payload;
console.log(`Releasing payment ${paymentId}`);
// Undo DB changes, refund, etc.
});
Similar workers exist for order and shipping services.
5. Idempotency & Exactly‑Once Guarantees
Distributed systems inevitably see retries. To keep compensations safe:
| Concern | Technique |
|---|---|
| Command processing | Store a processed command IDs set in Redis (SETNX) before executing. |
| Event publishing | Use outbox pattern: write the event to the same DB transaction that updates state, then a background poller forwards it. |
| Compensation | Compensating actions must also be idempotent (e.g., DELETE WHERE id = … or SET status = 'canceled' if not already). |
In the sample code we omitted persistence for brevity, but the same command.id can be attached and checked.
6. Observability
Add OpenTelemetry traces around each step:
import { trace } from '@opentelemetry/api';
const tracer = trace.getTracer('saga-orchestrator');
async function dispatchNext(saga: SagaInstance) {
const span = tracer.startSpan('dispatchNext', {
attributes: { 'saga.id': saga.id, 'order.id': saga.orderId },
});
try {
// ... existing logic
} finally {
span.end();
}
}
Export metrics (e.g., saga_running, saga_failed) via Prometheus for dashboards.
7. Testing the Saga
A robust test suite simulates normal flow, a failure in the middle, and a retry scenario.
// tests/saga.int.test.ts
import { startOrderSaga } from '../src/saga/engine';
import { publish } from '../src/infra/eventBus';
import { OrderCreatedEvt, PaymentReservedEvt } from '@myorg/saga-contracts';
import { delay } from 'ts-delay';
test('happy path completes', async () => {
await startOrderSaga('order-123', 42);
// Simulate services emitting events
await publish({ type: 'OrderCreated', payload: { orderId: 'order-123' } } as OrderCreatedEvt);
await publish({ type: 'PaymentReserved', payload: { orderId: 'order-123', paymentId: 'pay-1' } } as PaymentReservedEvt);
await publish({ type: 'OrderShipped', payload: { orderId: 'order-123', shipmentId: 'ship-1' } } as any);
// Wait for orchestrator to process
await delay(200);
const saga = [...sagaStore.values()].find(s => s.orderId === 'order-123');
expect(saga?.status).toBe('Completed');
});
test('failure triggers compensation', async () => {
await startOrderSaga('order-456', 99);
await publish({ type: 'OrderCreated', payload: { orderId: 'order-456' } } as OrderCreatedEvt);
// Simulate payment service failing to emit PaymentReserved
await delay(500); // timeout logic would detect missing event
// Force failure for demo
const saga = [...sagaStore.values()].find(s => s.orderId === 'order-456')!;
saga.status = 'Compensating';
await compensate(saga);
expect(saga.status).toBe('Failed');
});
Running the tests validates that the orchestrator correctly moves forward on success and rolls back on error.
8. Deploying in Production
| Concern | Recommendation |
|---|---|
| State storage | Use Redis hash + snapshot to PostgreSQL for durability. |
| Message broker | BullMQ works with Redis; for higher scale consider Kafka or RabbitMQ with exactly‑once delivery. |
| Scaling workers | Each service can run multiple BullMQ workers; the queue guarantees at‑most‑once per worker, combined with idempotent handlers. |
| Graceful shutdown | Listen to SIGTERM, pause queue processing, flush pending jobs, then exit. |
| Security | Sign commands/events with HMAC or JWT to avoid spoofing between services. |
9. When Not to Use Saga
- Very short-lived transactions that can be handled within a single service.
- Operations requiring strict ACID semantics (e.g., financial ledger entries) – consider using a two‑phase commit with a dedicated transaction manager.
Sagas excel when business processes are event‑driven and you can tolerate eventual consistency.
10. Recap
- Saga orchestration gives you a deterministic, observable way to coordinate distributed work.
- Strong TypeScript contracts keep command/event shapes consistent across services.
- A lightweight engine built on BullMQ and Redis handles retries, ordering, and compensation.
- Idempotent handlers and an outbox‑style event publisher protect against duplicates.
- Adding OpenTelemetry makes debugging production failures feasible.
Armed with the patterns and sample code in this guide, you can start converting ad‑hoc “fire‑and‑forget” calls into reliable, testable sagas—bringing true transactional safety to your Node.js microservices ecosystem.
Member discussion