Type‑Safe Event Sourcing in TypeScript + PostgreSQL: A Hands‑On Guide
Introduction
Event sourcing flips the traditional CRUD model on its head: instead of persisting the current state of an entity, you persist every state‑changing event that ever happened. Re‑building the current state becomes a matter of replaying those events.
When you combine this pattern with TypeScript’s discriminated unions and PostgreSQL’s JSONB column, you get a system where:
- Every event is a first‑class, type‑checked value.
- The database stores events as immutable rows, yet you can query them with SQL.
- Snapshots and optimistic concurrency are easy to add without sacrificing type safety.
This article walks through a complete, production‑ready implementation that you can drop into a Node.js service. No external event‑store libraries, just pg (or any PostgreSQL driver) and plain TypeScript.
1. Domain Modeling – The Event Types
Start by describing the domain in terms of events. Suppose we are building a simple BankAccount aggregate.
// src/domain/events.ts
export type AccountCreated = {
type: "AccountCreated";
data: {
accountId: string;
owner: string;
createdAt: string; // ISO timestamp
};
};
export type MoneyDeposited = {
type: "MoneyDeposited";
data: {
accountId: string;
amount: number; // cents to avoid floating point
depositedAt: string;
};
};
export type MoneyWithdrawn = {
type: "MoneyWithdrawn";
data: {
accountId: string;
amount: number;
withdrawnAt: string;
};
};
export type AccountClosed = {
type: "AccountClosed";
data: {
accountId: string;
closedAt: string;
};
};
// Union of all possible events for the aggregate
export type BankAccountEvent =
| AccountCreated
| MoneyDeposited
| MoneyWithdrawn
| AccountClosed;
Each event is a discriminated union (type field) – the compiler can narrow the shape automatically.
2. Storing Events in PostgreSQL
Create a single table that holds every event for every aggregate. JSONB lets us keep the payload flexible while still indexing on columns we care about.
-- migrations/2024-01-create-event-table.sql
CREATE TABLE IF NOT EXISTS event_store (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
version BIGINT NOT NULL, -- monotonically increasing per aggregate
type TEXT NOT NULL, -- duplicate of the discriminant for fast filtering
payload JSONB NOT NULL, -- the `data` object
recorded_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Ensure version uniqueness per aggregate
CREATE UNIQUE INDEX uq_event_store_agg_version
ON event_store (aggregate_id, version);
2.1. Type‑Safe Row Mapping
When reading rows back, we need to reconstruct the concrete event type. A small helper does the job:
// src/infrastructure/eventMapper.ts
import { BankAccountEvent } from "../domain/events";
export function mapRowToEvent(row: any): BankAccountEvent {
const { type, payload } = row as {
type: string;
payload: unknown;
};
// The `as` cast is safe because we control the write path.
switch (type) {
case "AccountCreated":
return { type, data: payload as AccountCreated["data"] };
case "MoneyDeposited":
return { type, data: payload as MoneyDeposited["data"] };
case "MoneyWithdrawn":
return { type, data: payload as MoneyWithdrawn["data"] };
case "AccountClosed":
return { type, data: payload as AccountClosed["data"] };
default:
throw new Error(`Unknown event type: ${type}`);
}
}
Because the type field is a literal, TypeScript narrows the return type to the exact shape, preserving full type safety downstream.
3. The Aggregate – Replaying Events
An aggregate encapsulates business rules and knows how to apply events to its internal state.
// src/domain/BankAccount.ts
import {
BankAccountEvent,
AccountCreated,
MoneyDeposited,
MoneyWithdrawn,
AccountClosed,
} from "./events";
export interface BankAccountState {
accountId: string;
owner: string;
balance: number; // cents
isClosed: boolean;
createdAt: string;
closedAt?: string;
}
export class BankAccount {
private state!: BankAccountState;
private pending: BankAccountEvent[] = [];
// Rehydrate from a list of past events
static rehydrate(events: BankAccountEvent[]): BankAccount {
const account = new BankAccount();
for (const ev of events) account.apply(ev, false);
return account;
}
// Public API – command methods
static create(accountId: string, owner: string): BankAccount {
const acct = new BankAccount();
const ev: AccountCreated = {
type: "AccountCreated",
data: { accountId, owner, createdAt: new Date().toISOString() },
};
acct.apply(ev, true);
return acct;
}
deposit(amount: number) {
if (this.state.isClosed) throw new Error("Closed accounts cannot receive deposits");
if (amount <= 0) throw new Error("Deposit amount must be positive");
const ev: MoneyDeposited = {
type: "MoneyDeposited",
data: {
accountId: this.state.accountId,
amount,
depositedAt: new Date().toISOString(),
},
};
this.apply(ev, true);
}
withdraw(amount: number) {
if (this.state.isClosed) throw new Error("Closed accounts cannot withdraw");
if (amount <= 0) throw new Error("Withdrawal amount must be positive");
if (this.state.balance < amount) throw new Error("Insufficient funds");
const ev: MoneyWithdrawn = {
type: "MoneyWithdrawn",
data: {
accountId: this.state.accountId,
amount,
withdrawnAt: new Date().toISOString(),
},
};
this.apply(ev, true);
}
close() {
if (this.state.isClosed) throw new Error("Account already closed");
const ev: AccountClosed = {
type: "AccountClosed",
data: {
accountId: this.state.accountId,
closedAt: new Date().toISOString(),
},
};
this.apply(ev, true);
}
// Returns events that need to be persisted
getUncommittedEvents(): readonly BankAccountEvent[] {
return this.pending;
}
// Clears the pending list after successful write
clearUncommitted() {
this.pending = [];
}
// -----------------------------------------------------------------
// Private helpers
private apply(event: BankAccountEvent, isNew: boolean) {
// Mutate internal state
switch (event.type) {
case "AccountCreated":
this.state = {
accountId: event.data.accountId,
owner: event.data.owner,
balance: 0,
isClosed: false,
createdAt: event.data.createdAt,
};
break;
case "MoneyDeposited":
this.state.balance += event.data.amount;
break;
case "MoneyWithdrawn":
this.state.balance -= event.data.amount;
break;
case "AccountClosed":
this.state.isClosed = true;
this.state.closedAt = event.data.closedAt;
break;
}
if (isNew) this.pending.push(event);
}
}
Notice how no any appears; every mutation is driven by a concrete event type, guaranteeing that the aggregate cannot enter an invalid state.
4. Repository – Persisting and Loading Events
The repository hides the SQL details and guarantees optimistic concurrency using the version column.
// src/infrastructure/BankAccountRepository.ts
import { Pool, QueryResult } from "pg";
import { BankAccount, BankAccountState } from "../domain/BankAccount";
import { BankAccountEvent } from "../domain/events";
import { mapRowToEvent } from "./eventMapper";
export class BankAccountRepository {
constructor(private readonly db: Pool) {}
// Load an aggregate by its ID
async load(accountId: string): Promise<BankAccount> {
const res: QueryResult = await this.db.query(
`SELECT type, payload FROM event_store
WHERE aggregate_id = $1
ORDER BY version ASC`,
[accountId]
);
const events = res.rows.map(mapRowToEvent);
return BankAccount.rehydrate(events);
}
// Append new events atomically
async save(account: BankAccount, expectedVersion: number): Promise<void> {
const client = await this.db.connect();
try {
await client.query("BEGIN");
// Verify version hasn't changed
const versionRes = await client.query(
`SELECT MAX(version) as version FROM event_store WHERE aggregate_id = $1`,
[account.getUncommittedEvents()[0].data.accountId] // safe because all events belong to same aggregate
);
const currentVersion = versionRes.rows[0].version ?? 0;
if (currentVersion !== expectedVersion) {
throw new Error(
`Concurrency conflict: expected ${expectedVersion}, got ${currentVersion}`
);
}
// Insert pending events
const pending = account.getUncommittedEvents();
for (let i = 0; i < pending.length; i++) {
const ev = pending[i];
await client.query(
`INSERT INTO event_store
(aggregate_id, version, type, payload)
VALUES ($1, $2, $3, $4)`,
[
ev.data.accountId,
expectedVersion + i + 1,
ev.type,
ev.data, // pg will serialize JS object to JSON automatically
]
);
}
await client.query("COMMIT");
account.clearUncommitted();
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
}
}
Why This Is Type‑Safe
- The
pendingarray is typed asBankAccountEvent[]. - The
INSERTstatement receivesev.type(a literal) andev.data(the exact payload shape). - If a new event type is added, the compiler forces us to update both the union and the repository’s
INSERTmapping, preventing silent mismatches.
5. Snapshots – Keeping Reads Fast
Replaying thousands of events on every request can become costly. A snapshot stores the current state alongside the last applied version.
-- migrations/2024-02-create-snapshot-table.sql
CREATE TABLE IF NOT EXISTS account_snapshot (
aggregate_id UUID PRIMARY KEY,
version BIGINT NOT NULL,
state JSONB NOT NULL,
saved_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Snapshot Helper
// src/infrastructure/SnapshotStore.ts
import { Pool, QueryResult } from "pg";
import { BankAccountState } from "../domain/BankAccount";
export class SnapshotStore {
constructor(private readonly db: Pool) {}
async load(accountId: string): Promise<{ state: BankAccountState; version: number } | null> {
const res: QueryResult = await this.db.query(
`SELECT version, state FROM account_snapshot WHERE aggregate_id = $1`,
[accountId]
);
if (res.rowCount === 0) return null;
const { version, state } = res.rows[0];
return { state, version };
}
async save(accountId: string, version: number, state: BankAccountState): Promise<void> {
await this.db.query(
`INSERT INTO account_snapshot (aggregate_id, version, state)
VALUES ($1, $2, $3)
ON CONFLICT (aggregate_id) DO UPDATE
SET version = EXCLUDED.version,
state = EXCLUDED.state,
saved_at = now()`,
[accountId, version, state]
);
}
}
Using Snapshots in the Repository
// src/infrastructure/BankAccountRepository.ts (excerpt)
import { SnapshotStore } from "./SnapshotStore";
export class BankAccountRepository {
constructor(
private readonly db: Pool,
private readonly snapshots: SnapshotStore
) {}
async load(accountId: string): Promise<BankAccount> {
const snap = await this.snapshots.load(accountId);
let events: BankAccountEvent[] = [];
if (snap) {
// Load only events after the snapshot version
const res = await this.db.query(
`SELECT type, payload FROM event_store
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC`,
[accountId, snap.version]
);
events = res.rows.map(mapRowToEvent);
const acct = BankAccount.rehydrate(events);
// Overwrite the rehydrated state with the snapshot (faster)
(acct as any).state = snap.state; // internal hack – in real code expose a method
return acct;
}
// No snapshot – load everything
const res = await this.db.query(
`SELECT type, payload FROM event_store
WHERE aggregate_id = $1
ORDER BY version ASC`,
[accountId]
);
events = res.rows.map(mapRowToEvent);
return BankAccount.rehydrate(events);
}
// After persisting, optionally store a new snapshot
async save(account: BankAccount, expectedVersion: number): Promise<void> {
// …same as before…
await this.db.query("COMMIT");
account.clearUncommitted();
// Create a snapshot every 100 events
const latestVersion = expectedVersion + account.getUncommittedEvents().length;
if (latestVersion % 100 === 0) {
await this.snapshots.save(
account.getUncommittedEvents()[0].data.accountId,
latestVersion,
(account as any).state // expose a getter in production
);
}
}
}
The snapshot logic stays type‑safe because the state column is typed as BankAccountState, and any change to the aggregate’s shape forces a compile‑time update to the snapshot schema.
6. Wiring It All Together
A minimal service layer demonstrates how a consumer would interact with the repository.
// src/service/AccountService.ts
import { Pool } from "pg";
import { BankAccountRepository } from "../infrastructure/BankAccountRepository";
import { SnapshotStore } from "../infrastructure/SnapshotStore";
import { BankAccount } from "../domain/BankAccount";
export class AccountService {
private readonly repo: BankAccountRepository;
constructor(pool: Pool) {
const snapshots = new SnapshotStore(pool);
this.repo = new BankAccountRepository(pool, snapshots);
}
async createAccount(owner: string): Promise<string> {
const accountId = crypto.randomUUID();
const account = BankAccount.create(accountId, owner);
await this.repo.save(account, 0); // no prior version
return accountId;
}
async deposit(accountId: string, amount: number): Promise<void> {
const account = await this.repo.load(accountId);
const currentVersion = await this.currentVersion(accountId);
account.deposit(amount);
await this.repo.save(account, currentVersion);
}
async withdraw(accountId: string, amount: number): Promise<void> {
const account = await this.repo.load(accountId);
const currentVersion = await this.currentVersion(accountId);
account.withdraw(amount);
await this.repo.save(account, currentVersion);
}
async getBalance(accountId: string): Promise<number> {
const account = await this.repo.load(accountId);
// Access internal state via a safe getter (omitted for brevity)
return (account as any).state.balance;
}
private async currentVersion(accountId: string): Promise<number> {
const res = await this.repo["db"].query(
`SELECT MAX(version) as version FROM event_store WHERE aggregate_id = $1`,
[accountId]
);
return res.rows[0].version ?? 0;
}
}
All public methods are purely synchronous from the caller’s perspective; the async I/O lives inside the repository. The service never touches raw SQL or JSON parsing – it works with fully typed domain objects.
7. Testing the Event‑Sourced Flow
A quick Jest test proves the type safety end‑to‑end.
// tests/BankAccount.test.ts
import { Pool } from "pg";
import { AccountService } from "../src/service/AccountService";
let pool: Pool;
let service: AccountService;
beforeAll(async () => {
pool = new Pool({ connectionString: process.env.DATABASE_URL });
service = new AccountService(pool);
// Clean tables
await pool.query("DELETE FROM event_store");
await pool.query("DELETE FROM account_snapshot");
});
test("deposit and withdraw produce correct balance", async () => {
const id = await service.createAccount("Alice");
await service.deposit(id, 5000); // $50.00
await service.withdraw(id, 1200); // $12.00
const balance = await service.getBalance(id);
expect(balance).toBe(380
Member discussion