Type‑Safe Event Sourcing in Node.js & Next.js with TypeScript: Building Reliable State Machines and Auditable Streams
Introduction
Event sourcing flips the traditional CRUD model on its head: instead of persisting the current state of an entity, you persist the series of intent‑driven events that led to that state. The benefits are well‑known—complete audit trails, easy debugging, and the ability to rebuild any version of the system by replaying events.
When you combine event sourcing with TypeScript’s static type system you get an extra safety net: the shape of every event is verified at compile time, and the state reconstruction logic can be expressed as a type‑safe state machine. In this article we’ll build a minimal yet production‑ready event‑sourced core that works both in a Node.js backend (e.g., an API route or a microservice) and in a Next.js application that needs to render data from the same stream.
What you’ll walk away withA folder‑structured, generic event store backed by PostgreSQL.Strongly‑typed event definitions using discriminated unions.A deterministic “apply” function that builds the current aggregate state.A simple command‑handler pattern that validates intent before emitting events.Replay utilities for debugging, migrations, and snapshots.
No external libraries for the core logic are required—just TypeScript, node-postgres (pg), and zod for runtime validation of inbound payloads.
1. The Domain: A Tiny Inventory Service
We’ll model a very small domain: an inventory of products where each product can be created, stocked, reserved, and sold. The business rules are:
| Rule | Description |
|---|---|
| A product must be created before any other action. | |
| Stock can be added only to an existing product. | |
| Reservations cannot exceed available stock. | |
| Sales can only occur for previously reserved items. |
These rules will be enforced by the command handler; the resulting events will be stored immutable‑ly.
2. Defining Events with Discriminated Unions
// src/events/inventory.ts
import { z } from "zod";
export const ProductCreated = z.object({
type: z.literal("ProductCreated"),
data: z.object({
productId: z.string().uuid(),
name: z.string(),
sku: z.string(),
}),
});
export const StockAdded = z.object({
type: z.literal("StockAdded"),
data: z.object({
productId: z.string().uuid(),
quantity: z.number().int().positive(),
}),
});
export const StockReserved = z.object({
type: z.literal("StockReserved"),
data: z.object({
productId: z.string().uuid(),
reservationId: z.string().uuid(),
quantity: z.number().int().positive(),
}),
});
export const StockSold = z.object({
type: z.literal("StockSold"),
data: z.object({
productId: z.string().uuid(),
reservationId: z.string().uuid(),
quantity: z.number().int().positive(),
}),
});
export const InventoryEventSchema = z.discriminatedUnion("type", [
ProductCreated,
StockAdded,
StockReserved,
StockSold,
]);
export type InventoryEvent = z.infer<typeof InventoryEventSchema>;
Why discriminated unions?
TypeScript can narrow the union based on the type field, giving us exhaustive switch statements without runtime casts. The same schema can be used at runtime (via zod) to validate inbound JSON from HTTP requests or message queues.
3. The Aggregate State Machine
The aggregate is the in‑memory representation of a product. It is built by folding over its events.
// src/aggregates/product.ts
import { InventoryEvent } from "../events/inventory";
export interface ProductState {
productId: string;
name: string;
sku: string;
totalStock: number;
reserved: Map<string, number>; // reservationId → quantity
}
export const initialState = (): ProductState => ({
productId: "",
name: "",
sku: "",
totalStock: 0,
reserved: new Map(),
});
export function apply(
state: ProductState,
event: InventoryEvent
): ProductState {
switch (event.type) {
case "ProductCreated":
return {
...state,
productId: event.data.productId,
name: event.data.name,
sku: event.data.sku,
};
case "StockAdded":
return {
...state,
totalStock: state.totalStock + event.data.quantity,
};
case "StockReserved":
state.reserved.set(event.data.reservationId, event.data.quantity);
return {
...state,
totalStock: state.totalStock - event.data.quantity,
};
case "StockSold":
// reservation is already deducted from totalStock, just remove it
state.reserved.delete(event.data.reservationId);
return { ...state };
}
}
Key points
- The function is pure: it never performs I/O, making it trivially testable.
- The
reservedmap is mutated in‑place for performance, but the returned object is a shallow copy so TypeScript still sees a new reference. - Exhaustiveness checking: if a new event type is added, TypeScript will flag the missing case.
4. Event Store – Minimal PostgreSQL Implementation
// src/store/eventStore.ts
import { Pool } from "pg";
import { InventoryEvent, InventoryEventSchema } from "../events/inventory";
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});
export async function appendEvent(
streamId: string,
event: InventoryEvent,
expectedVersion: number
): Promise<void> {
// Serialize the event payload
const payload = JSON.stringify(event);
const client = await pool.connect();
try {
await client.query("BEGIN");
// Optimistic concurrency check
const { rows } = await client.query(
`SELECT version FROM event_streams WHERE stream_id = $1 FOR UPDATE`,
[streamId]
);
const currentVersion = rows[0]?.version ?? 0;
if (currentVersion !== expectedVersion) {
throw new Error(
`Concurrency conflict: expected ${expectedVersion}, got ${currentVersion}`
);
}
// Insert event
await client.query(
`INSERT INTO events (stream_id, version, type, payload) VALUES ($1, $2, $3, $4)`,
[streamId, expectedVersion + 1, event.type, payload]
);
// bump stream version
await client.query(
`INSERT INTO event_streams (stream_id, version)
VALUES ($1, $2)
ON CONFLICT (stream_id) DO UPDATE SET version = $2`,
[streamId, expectedVersion + 1]
);
await client.query("COMMIT");
} catch (e) {
await client.query("ROLLBACK");
throw e;
} finally {
client.release();
}
}
export async function loadEvents(
streamId: string
): Promise<InventoryEvent[]> {
const { rows } = await pool.query(
`SELECT payload FROM events WHERE stream_id = $1 ORDER BY version ASC`,
[streamId]
);
// Runtime validation of each stored payload
return rows.map((r) => {
const parsed = InventoryEventSchema.safeParse(JSON.parse(r.payload));
if (!parsed.success) {
throw new Error(`Corrupt event data for stream ${streamId}`);
}
return parsed.data;
});
}
Schema (run once):
CREATE TABLE event_streams (
stream_id TEXT PRIMARY KEY,
version INTEGER NOT NULL
);
CREATE TABLE events (
id SERIAL PRIMARY KEY,
stream_id TEXT NOT NULL REFERENCES event_streams(stream_id),
version INTEGER NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL,
UNIQUE (stream_id, version)
);
The store uses optimistic concurrency (expected version) to guarantee that two commands cannot write conflicting events.
5. Command Handlers – Guarding Invariants
// src/commands/inventory.ts
import { v4 as uuid } from "uuid";
import { appendEvent, loadEvents } from "../store/eventStore";
import {
ProductCreated,
StockAdded,
StockReserved,
StockSold,
InventoryEvent,
} from "../events/inventory";
import { initialState, apply } from "../aggregates/product";
type CommandResult = { success: true } | { success: false; error: string };
export async function createProduct(
name: string,
sku: string
): Promise<CommandResult> {
const productId = uuid();
const ev: InventoryEvent = ProductCreated.parse({
type: "ProductCreated",
data: { productId, name, sku },
});
await appendEvent(productId, ev, 0);
return { success: true };
}
export async function addStock(
productId: string,
quantity: number
): Promise<CommandResult> {
const events = await loadEvents(productId);
const state = events.reduce(apply, initialState());
if (!state.productId) {
return { success: false, error: "Product does not exist" };
}
const ev: InventoryEvent = StockAdded.parse({
type: "StockAdded",
data: { productId, quantity },
});
await appendEvent(productId, ev, events.length);
return { success: true };
}
export async function reserveStock(
productId: string,
reservationId: string,
quantity: number
): Promise<CommandResult> {
const events = await loadEvents(productId);
const state = events.reduce(apply, initialState());
if (state.totalStock < quantity) {
return { success: false, error: "Insufficient stock" };
}
const ev: InventoryEvent = StockReserved.parse({
type: "StockReserved",
data: { productId, reservationId, quantity },
});
await appendEvent(productId, ev, events.length);
return { success: true };
}
export async function sellStock(
productId: string,
reservationId: string
): Promise<CommandResult> {
const events = await loadEvents(productId);
const state = events.reduce(apply, initialState());
const reservedQty = state.reserved.get(reservationId);
if (!reservedQty) {
return { success: false, error: "Reservation not found" };
}
const ev: InventoryEvent = StockSold.parse({
type: "StockSold",
data: { productId, reservationId, quantity: reservedQty },
});
await appendEvent(productId, ev, events.length);
return { success: true };
}
Why use zod parsing inside the handler?
Even though the caller constructs the event object, parsing guarantees that no malformed data ever reaches the store. In a microservice architecture you could replace the in‑process calls with a message broker; the same validation works automatically.
6. Replaying Streams – Debugging & Snapshots
Rebuilding state is as simple as loading events and folding:
// src/util/replay.ts
import { loadEvents } from "../store/eventStore";
import { initialState, apply, ProductState } from "../aggregates/product";
export async function getCurrentState(
productId: string
): Promise<ProductState> {
const events = await loadEvents(productId);
return events.reduce(apply, initialState());
}
Snapshotting (optional)
When a stream grows large, you can store a snapshot of the aggregate state and replay only newer events.
// src/store/snapshot.ts
import { Pool } from "pg";
import { ProductState } from "../aggregates/product";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
export async function saveSnapshot(
streamId: string,
version: number,
state: ProductState
) {
await pool.query(
`INSERT INTO snapshots (stream_id, version, state)
VALUES ($1, $2, $3)
ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3`,
[streamId, version, JSON.stringify(state)]
);
}
export async function loadSnapshot(
streamId: string
): Promise<{ version: number; state: ProductState } | null> {
const { rows } = await pool.query(
`SELECT version, state FROM snapshots WHERE stream_id = $1`,
[streamId]
);
if (!rows.length) return null;
return {
version: rows[0].version,
state: JSON.parse(rows[0].state) as ProductState,
};
}
When loading, first fetch the snapshot, then only the events after snapshot.version.
7. Using the Core in a Next.js API Route
// pages/api/products/[id]/stock.ts
import type { NextApiRequest, NextApiResponse } from "next";
import {
addStock,
reserveStock,
sellStock,
} from "../../../src/commands/inventory";
export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
const { id } = req.query;
if (typeof id !== "string") {
return res.status(400).json({ error: "Invalid product id" });
}
switch (req.method) {
case "POST": {
const { quantity } = req.body;
const result = await addStock(id, Number(quantity));
return result.success
? res.status(200).json({ message: "Stock added" })
: res.status(400).json({ error: result.error });
}
case "PUT": {
const { reservationId, quantity } = req.body;
const result = await reserveStock(id, reservationId, Number(quantity));
return result.success
? res.status(200).json({ message: "Reserved" })
: res.status(400).json({ error: result.error });
}
case "DELETE": {
const { reservationId } = req.body;
const result = await sellStock(id, reservationId);
return result.success
? res.status(200).json({ message: "Sold" })
: res.status(400).json({ error: result.error });
}
default:
res.setHeader("Allow", ["POST", "PUT", "DELETE"]);
res.status(405).end(`Method ${req.method} Not Allowed`);
}
}
The same command functions can be imported into a background worker that consumes a message queue, demonstrating isomorphic usage across server‑side rendering, API routes, and off‑process jobs.
8. Testing the State Machine
// tests/product.test.ts
import { apply, initialState } from "../src/aggregates/product";
import {
ProductCreated,
StockAdded,
StockReserved,
StockSold,
} from "../src/events/inventory";
test("full lifecycle", () => {
const productId = "11111111-1111-1111-1111-111111111111";
const reservationId = "22222222-2222-2222-2222-222222222222";
const events = [
ProductCreated.parse({
type: "ProductCreated",
data: { productId, name: "Widget", sku: "WGT-01" },
}),
StockAdded.parse({
type: "StockAdded",
data: { productId, quantity: 10 },
}),
StockReserved.parse({
type: "StockReserved",
data: { productId, reservationId, quantity: 3 },
}),
StockSold.parse({
type: "StockSold",
data: { productId, reservationId, quantity: 3 },
}),
];
const finalState = events.reduce(apply, initialState());
expect(finalState.totalStock).toBe(7);
expect(finalState.reserved.size).toBe(0);
});
Because the apply function is pure, unit tests are fast and deterministic—no database needed.
9. Auditing & Debugging
Every event row contains:
stream_id– the aggregate identifier.version– the sequential number (great for replay ordering).type– human‑readable name.payload– the JSON payload (validated on write).
You can query the event table directly for an audit trail:
SELECT version, type, payload
FROM events
WHERE stream_id = $1
ORDER BY version;
Because events are immutable, you can replay any point in time by loading up to a given version. This is invaluable for post‑mortem analysis or for implementing “time‑travel” UI features.
10. Scaling Considerations
| Concern | Simple solution | Production‑grade upgrade |
|---|---|---|
| High write throughput | Single PostgreSQL instance | Partition streams across multiple DBs or use an append‑only log (Kafka, Pulsar) |
| Event size | JSONB payload | Use Protobuf/Avro + schema registry for compact binary format |
| Snapshot frequency | Manual calls | Background job that creates snapshots every N events or time interval |
| Consistency across services | Optimistic concurrency per stream | Use a dedicated event store (EventStoreDB, DynamoDB Streams) with built‑in idempotency |
The core concepts—type‑safe events, pure reducers, and explicit version checks—remain the same regardless of the storage layer.
11. Conclusion
Event sourcing can feel daunting, especially when you add static typing to the mix. By modeling events as discriminated unions, building a pure reducer, and using a thin, version‑checked store, you get:
- Compile‑time guarantees that only known events ever exist.
- Runtime safety via Zod validation of inbound data.
- Auditable, replayable streams that double as a debugging tool.
- Isomorphic code that works in Node.js services, Next.js API routes, and background workers.
Give it a try in a small bounded context—once you see the clarity of intent‑driven code, extending it to larger domains becomes a natural evolution.
Member discussion