7 min read

Type‑Safe Event Sourcing in TypeScript + PostgreSQL: A Hands‑On Guide

Learn how to model, store, and replay domain events with full TypeScript type safety using PostgreSQL’s JSONB and generic repositories.
Type‑Safe Event Sourcing in TypeScript + PostgreSQL: A Hands‑On Guide

Introduction

Event sourcing flips the traditional CRUD model on its head: instead of persisting the current state of an entity, you persist every state‑changing event that ever happened. Re‑building the current state becomes a matter of replaying those events.

When you combine this pattern with TypeScript’s discriminated unions and PostgreSQL’s JSONB column, you get a system where:

  • Every event is a first‑class, type‑checked value.
  • The database stores events as immutable rows, yet you can query them with SQL.
  • Snapshots and optimistic concurrency are easy to add without sacrificing type safety.

This article walks through a complete, production‑ready implementation that you can drop into a Node.js service. No external event‑store libraries, just pg (or any PostgreSQL driver) and plain TypeScript.


1. Domain Modeling – The Event Types

Start by describing the domain in terms of events. Suppose we are building a simple BankAccount aggregate.

// src/domain/events.ts
export type AccountCreated = {
  type: "AccountCreated";
  data: {
    accountId: string;
    owner: string;
    createdAt: string; // ISO timestamp
  };
};

export type MoneyDeposited = {
  type: "MoneyDeposited";
  data: {
    accountId: string;
    amount: number; // cents to avoid floating point
    depositedAt: string;
  };
};

export type MoneyWithdrawn = {
  type: "MoneyWithdrawn";
  data: {
    accountId: string;
    amount: number;
    withdrawnAt: string;
  };
};

export type AccountClosed = {
  type: "AccountClosed";
  data: {
    accountId: string;
    closedAt: string;
  };
};

// Union of all possible events for the aggregate
export type BankAccountEvent =
  | AccountCreated
  | MoneyDeposited
  | MoneyWithdrawn
  | AccountClosed;

Each event is a discriminated union (type field) – the compiler can narrow the shape automatically.


2. Storing Events in PostgreSQL

Create a single table that holds every event for every aggregate. JSONB lets us keep the payload flexible while still indexing on columns we care about.

-- migrations/2024-01-create-event-table.sql
CREATE TABLE IF NOT EXISTS event_store (
  id            BIGSERIAL PRIMARY KEY,
  aggregate_id  UUID NOT NULL,
  version       BIGINT NOT NULL,          -- monotonically increasing per aggregate
  type          TEXT   NOT NULL,          -- duplicate of the discriminant for fast filtering
  payload       JSONB  NOT NULL,          -- the `data` object
  recorded_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Ensure version uniqueness per aggregate
CREATE UNIQUE INDEX uq_event_store_agg_version
  ON event_store (aggregate_id, version);

2.1. Type‑Safe Row Mapping

When reading rows back, we need to reconstruct the concrete event type. A small helper does the job:

// src/infrastructure/eventMapper.ts
import { BankAccountEvent } from "../domain/events";

export function mapRowToEvent(row: any): BankAccountEvent {
  const { type, payload } = row as {
    type: string;
    payload: unknown;
  };

  // The `as` cast is safe because we control the write path.
  switch (type) {
    case "AccountCreated":
      return { type, data: payload as AccountCreated["data"] };
    case "MoneyDeposited":
      return { type, data: payload as MoneyDeposited["data"] };
    case "MoneyWithdrawn":
      return { type, data: payload as MoneyWithdrawn["data"] };
    case "AccountClosed":
      return { type, data: payload as AccountClosed["data"] };
    default:
      throw new Error(`Unknown event type: ${type}`);
  }
}

Because the type field is a literal, TypeScript narrows the return type to the exact shape, preserving full type safety downstream.


3. The Aggregate – Replaying Events

An aggregate encapsulates business rules and knows how to apply events to its internal state.

// src/domain/BankAccount.ts
import {
  BankAccountEvent,
  AccountCreated,
  MoneyDeposited,
  MoneyWithdrawn,
  AccountClosed,
} from "./events";

export interface BankAccountState {
  accountId: string;
  owner: string;
  balance: number; // cents
  isClosed: boolean;
  createdAt: string;
  closedAt?: string;
}

export class BankAccount {
  private state!: BankAccountState;
  private pending: BankAccountEvent[] = [];

  // Rehydrate from a list of past events
  static rehydrate(events: BankAccountEvent[]): BankAccount {
    const account = new BankAccount();
    for (const ev of events) account.apply(ev, false);
    return account;
  }

  // Public API – command methods
  static create(accountId: string, owner: string): BankAccount {
    const acct = new BankAccount();
    const ev: AccountCreated = {
      type: "AccountCreated",
      data: { accountId, owner, createdAt: new Date().toISOString() },
    };
    acct.apply(ev, true);
    return acct;
  }

  deposit(amount: number) {
    if (this.state.isClosed) throw new Error("Closed accounts cannot receive deposits");
    if (amount <= 0) throw new Error("Deposit amount must be positive");
    const ev: MoneyDeposited = {
      type: "MoneyDeposited",
      data: {
        accountId: this.state.accountId,
        amount,
        depositedAt: new Date().toISOString(),
      },
    };
    this.apply(ev, true);
  }

  withdraw(amount: number) {
    if (this.state.isClosed) throw new Error("Closed accounts cannot withdraw");
    if (amount <= 0) throw new Error("Withdrawal amount must be positive");
    if (this.state.balance < amount) throw new Error("Insufficient funds");
    const ev: MoneyWithdrawn = {
      type: "MoneyWithdrawn",
      data: {
        accountId: this.state.accountId,
        amount,
        withdrawnAt: new Date().toISOString(),
      },
    };
    this.apply(ev, true);
  }

  close() {
    if (this.state.isClosed) throw new Error("Account already closed");
    const ev: AccountClosed = {
      type: "AccountClosed",
      data: {
        accountId: this.state.accountId,
        closedAt: new Date().toISOString(),
      },
    };
    this.apply(ev, true);
  }

  // Returns events that need to be persisted
  getUncommittedEvents(): readonly BankAccountEvent[] {
    return this.pending;
  }

  // Clears the pending list after successful write
  clearUncommitted() {
    this.pending = [];
  }

  // -----------------------------------------------------------------
  // Private helpers
  private apply(event: BankAccountEvent, isNew: boolean) {
    // Mutate internal state
    switch (event.type) {
      case "AccountCreated":
        this.state = {
          accountId: event.data.accountId,
          owner: event.data.owner,
          balance: 0,
          isClosed: false,
          createdAt: event.data.createdAt,
        };
        break;
      case "MoneyDeposited":
        this.state.balance += event.data.amount;
        break;
      case "MoneyWithdrawn":
        this.state.balance -= event.data.amount;
        break;
      case "AccountClosed":
        this.state.isClosed = true;
        this.state.closedAt = event.data.closedAt;
        break;
    }

    if (isNew) this.pending.push(event);
  }
}

Notice how no any appears; every mutation is driven by a concrete event type, guaranteeing that the aggregate cannot enter an invalid state.


4. Repository – Persisting and Loading Events

The repository hides the SQL details and guarantees optimistic concurrency using the version column.

// src/infrastructure/BankAccountRepository.ts
import { Pool, QueryResult } from "pg";
import { BankAccount, BankAccountState } from "../domain/BankAccount";
import { BankAccountEvent } from "../domain/events";
import { mapRowToEvent } from "./eventMapper";

export class BankAccountRepository {
  constructor(private readonly db: Pool) {}

  // Load an aggregate by its ID
  async load(accountId: string): Promise<BankAccount> {
    const res: QueryResult = await this.db.query(
      `SELECT type, payload FROM event_store
       WHERE aggregate_id = $1
       ORDER BY version ASC`,
      [accountId]
    );

    const events = res.rows.map(mapRowToEvent);
    return BankAccount.rehydrate(events);
  }

  // Append new events atomically
  async save(account: BankAccount, expectedVersion: number): Promise<void> {
    const client = await this.db.connect();
    try {
      await client.query("BEGIN");

      // Verify version hasn't changed
      const versionRes = await client.query(
        `SELECT MAX(version) as version FROM event_store WHERE aggregate_id = $1`,
        [account.getUncommittedEvents()[0].data.accountId] // safe because all events belong to same aggregate
      );
      const currentVersion = versionRes.rows[0].version ?? 0;
      if (currentVersion !== expectedVersion) {
        throw new Error(
          `Concurrency conflict: expected ${expectedVersion}, got ${currentVersion}`
        );
      }

      // Insert pending events
      const pending = account.getUncommittedEvents();
      for (let i = 0; i < pending.length; i++) {
        const ev = pending[i];
        await client.query(
          `INSERT INTO event_store
           (aggregate_id, version, type, payload)
           VALUES ($1, $2, $3, $4)`,
          [
            ev.data.accountId,
            expectedVersion + i + 1,
            ev.type,
            ev.data, // pg will serialize JS object to JSON automatically
          ]
        );
      }

      await client.query("COMMIT");
      account.clearUncommitted();
    } catch (err) {
      await client.query("ROLLBACK");
      throw err;
    } finally {
      client.release();
    }
  }
}

Why This Is Type‑Safe

  • The pending array is typed as BankAccountEvent[].
  • The INSERT statement receives ev.type (a literal) and ev.data (the exact payload shape).
  • If a new event type is added, the compiler forces us to update both the union and the repository’s INSERT mapping, preventing silent mismatches.

5. Snapshots – Keeping Reads Fast

Replaying thousands of events on every request can become costly. A snapshot stores the current state alongside the last applied version.

-- migrations/2024-02-create-snapshot-table.sql
CREATE TABLE IF NOT EXISTS account_snapshot (
  aggregate_id UUID PRIMARY KEY,
  version      BIGINT NOT NULL,
  state        JSONB NOT NULL,
  saved_at     TIMESTAMPTZ NOT NULL DEFAULT now()
);

Snapshot Helper

// src/infrastructure/SnapshotStore.ts
import { Pool, QueryResult } from "pg";
import { BankAccountState } from "../domain/BankAccount";

export class SnapshotStore {
  constructor(private readonly db: Pool) {}

  async load(accountId: string): Promise<{ state: BankAccountState; version: number } | null> {
    const res: QueryResult = await this.db.query(
      `SELECT version, state FROM account_snapshot WHERE aggregate_id = $1`,
      [accountId]
    );
    if (res.rowCount === 0) return null;
    const { version, state } = res.rows[0];
    return { state, version };
  }

  async save(accountId: string, version: number, state: BankAccountState): Promise<void> {
    await this.db.query(
      `INSERT INTO account_snapshot (aggregate_id, version, state)
       VALUES ($1, $2, $3)
       ON CONFLICT (aggregate_id) DO UPDATE
       SET version = EXCLUDED.version,
           state   = EXCLUDED.state,
           saved_at = now()`,
      [accountId, version, state]
    );
  }
}

Using Snapshots in the Repository

// src/infrastructure/BankAccountRepository.ts (excerpt)
import { SnapshotStore } from "./SnapshotStore";

export class BankAccountRepository {
  constructor(
    private readonly db: Pool,
    private readonly snapshots: SnapshotStore
  ) {}

  async load(accountId: string): Promise<BankAccount> {
    const snap = await this.snapshots.load(accountId);
    let events: BankAccountEvent[] = [];

    if (snap) {
      // Load only events after the snapshot version
      const res = await this.db.query(
        `SELECT type, payload FROM event_store
         WHERE aggregate_id = $1 AND version > $2
         ORDER BY version ASC`,
        [accountId, snap.version]
      );
      events = res.rows.map(mapRowToEvent);
      const acct = BankAccount.rehydrate(events);
      // Overwrite the rehydrated state with the snapshot (faster)
      (acct as any).state = snap.state; // internal hack – in real code expose a method
      return acct;
    }

    // No snapshot – load everything
    const res = await this.db.query(
      `SELECT type, payload FROM event_store
       WHERE aggregate_id = $1
       ORDER BY version ASC`,
      [accountId]
    );
    events = res.rows.map(mapRowToEvent);
    return BankAccount.rehydrate(events);
  }

  // After persisting, optionally store a new snapshot
  async save(account: BankAccount, expectedVersion: number): Promise<void> {
    // …same as before…
    await this.db.query("COMMIT");
    account.clearUncommitted();

    // Create a snapshot every 100 events
    const latestVersion = expectedVersion + account.getUncommittedEvents().length;
    if (latestVersion % 100 === 0) {
      await this.snapshots.save(
        account.getUncommittedEvents()[0].data.accountId,
        latestVersion,
        (account as any).state // expose a getter in production
      );
    }
  }
}

The snapshot logic stays type‑safe because the state column is typed as BankAccountState, and any change to the aggregate’s shape forces a compile‑time update to the snapshot schema.


6. Wiring It All Together

A minimal service layer demonstrates how a consumer would interact with the repository.

// src/service/AccountService.ts
import { Pool } from "pg";
import { BankAccountRepository } from "../infrastructure/BankAccountRepository";
import { SnapshotStore } from "../infrastructure/SnapshotStore";
import { BankAccount } from "../domain/BankAccount";

export class AccountService {
  private readonly repo: BankAccountRepository;

  constructor(pool: Pool) {
    const snapshots = new SnapshotStore(pool);
    this.repo = new BankAccountRepository(pool, snapshots);
  }

  async createAccount(owner: string): Promise<string> {
    const accountId = crypto.randomUUID();
    const account = BankAccount.create(accountId, owner);
    await this.repo.save(account, 0); // no prior version
    return accountId;
  }

  async deposit(accountId: string, amount: number): Promise<void> {
    const account = await this.repo.load(accountId);
    const currentVersion = await this.currentVersion(accountId);
    account.deposit(amount);
    await this.repo.save(account, currentVersion);
  }

  async withdraw(accountId: string, amount: number): Promise<void> {
    const account = await this.repo.load(accountId);
    const currentVersion = await this.currentVersion(accountId);
    account.withdraw(amount);
    await this.repo.save(account, currentVersion);
  }

  async getBalance(accountId: string): Promise<number> {
    const account = await this.repo.load(accountId);
    // Access internal state via a safe getter (omitted for brevity)
    return (account as any).state.balance;
  }

  private async currentVersion(accountId: string): Promise<number> {
    const res = await this.repo["db"].query(
      `SELECT MAX(version) as version FROM event_store WHERE aggregate_id = $1`,
      [accountId]
    );
    return res.rows[0].version ?? 0;
  }
}

All public methods are purely synchronous from the caller’s perspective; the async I/O lives inside the repository. The service never touches raw SQL or JSON parsing – it works with fully typed domain objects.


7. Testing the Event‑Sourced Flow

A quick Jest test proves the type safety end‑to‑end.

// tests/BankAccount.test.ts
import { Pool } from "pg";
import { AccountService } from "../src/service/AccountService";

let pool: Pool;
let service: AccountService;

beforeAll(async () => {
  pool = new Pool({ connectionString: process.env.DATABASE_URL });
  service = new AccountService(pool);
  // Clean tables
  await pool.query("DELETE FROM event_store");
  await pool.query("DELETE FROM account_snapshot");
});

test("deposit and withdraw produce correct balance", async () => {
  const id = await service.createAccount("Alice");
  await service.deposit(id, 5000); // $50.00
  await service.withdraw(id, 1200); // $12.00
  const balance = await service.getBalance(id);
  expect(balance).toBe(380