7 min read

Type‑Safe Asynchronous Resource Pools in TypeScript: Building Generic Worker Pools and Connection Managers for Node.js & Next.js

Learn how to craft reusable, type‑safe pools for workers and DB connections in TypeScript, with concrete Node.js and Next.js examples.
Type‑Safe Asynchronous Resource Pools in TypeScript: Building Generic Worker Pools and Connection Managers for Node.js & Next.js

Introduction

Resource‑intensive operations—CPU‑bound work, database connections, HTTP clients—are usually shared among many concurrent requests.
A resource pool limits the number of active instances, recycles idle ones, and shields the rest of the application from throttling or leaks.

When the pool is built in plain JavaScript you quickly run into two problems:

  1. Runtime errors caused by mismatched resource types (e.g., treating a pg.PoolClient as a generic any).
  2. Leaked resources because the asynchronous acquire/release contract is hard to enforce.

TypeScript can solve both. By encoding the acquire/release lifecycle in the type system we get:

  • Compile‑time safety for the resource you receive.
  • Guarantees that every acquire() call is paired with a release().
  • A single, generic implementation that can serve worker threads, database clients, Redis connections, or any async‑initialised object.

This article walks through a type‑safe generic pool and shows two realistic use‑cases:

  • A worker‑thread pool for CPU‑heavy jobs in a Next.js API route.
  • A PostgreSQL connection manager for a traditional Node.js service.

Both examples use only built‑in Node APIs and the pg driver, keeping the focus on the TypeScript patterns rather than third‑party abstractions.


1. The Core Abstractions

1.1. The Resource<T> Interface

/** A resource that can be used asynchronously and then returned to the pool. */
export interface Resource<T> {
  /** The underlying value (e.g., a Worker, a DB client). */
  readonly value: T;

  /** Return the resource to its pool. Must be called exactly once. */
  release(): Promise<void>;
}

Resource<T> is a thin wrapper that guarantees the consumer cannot accidentally forget to release the resource—release returns a Promise, encouraging await usage.

1.2. The AsyncPool<T> Interface

export interface AsyncPool<T> {
  /** Acquire a resource. May wait if the pool is exhausted. */
  acquire(): Promise<Resource<T>>;

  /** Shut down the pool, releasing all idle resources and rejecting pending acquires. */
  close(): Promise<void>;

  /** Current statistics – useful for monitoring. */
  readonly stats: {
    total: number;
    idle: number;
    pending: number;
  };
}

The pool is deliberately minimal: only acquire, close, and read‑only stats. Anything more complex (prioritisation, weighted resources, etc.) can be built on top later.


2. Implementing a Generic Async Pool

The implementation works for any resource that satisfies a factory and a disposer function.

type Factory<T> = () => Promise<T>;
type Disposer<T> = (instance: T) => Promise<void>;

export class GenericAsyncPool<T> implements AsyncPool<T> {
  private readonly factory: Factory<T>;
  private readonly disposer: Disposer<T>;
  private readonly maxSize: number;

  // Internal queues
  private idle: T[] = [];
  private pendingAcquires: ((res: Resource<T>) => void)[] = [];
  private totalInstances = 0;
  private closed = false;

  constructor(opts: {
    factory: Factory<T>;
    disposer: Disposer<T>;
    maxSize?: number; // default = CPU count for workers, 10 for DB connections, etc.
  }) {
    this.factory = opts.factory;
    this.disposer = opts.disposer;
    this.maxSize = opts.maxSize ?? 10;
  }

  // -------------------------------------------------------------------------
  // Public API
  // -------------------------------------------------------------------------
  async acquire(): Promise<Resource<T>> {
    if (this.closed) {
      throw new Error('Pool is closed');
    }

    // Fast path – idle resource available
    if (this.idle.length) {
      const instance = this.idle.pop()!;
      return this.wrap(instance);
    }

    // Need to create a new instance?
    if (this.totalInstances < this.maxSize) {
      const instance = await this.factory();
      this.totalInstances++;
      return this.wrap(instance);
    }

    // Pool exhausted – queue the request
    return new Promise<Resource<T>>((resolve) => {
      this.pendingAcquires.push(resolve);
    });
  }

  async close(): Promise<void> {
    this.closed = true;

    // Reject pending acquires
    this.pendingAcquires.forEach((resolve) =>
      resolve(Promise.reject(new Error('Pool closed')))
    );
    this.pendingAcquires = [];

    // Dispose idle instances
    await Promise.all(this.idle.map((inst) => this.disposer(inst)));
    this.idle = [];
  }

  get stats() {
    return {
      total: this.totalInstances,
      idle: this.idle.length,
      pending: this.pendingAcquires.length,
    };
  }

  // -------------------------------------------------------------------------
  // Private helpers
  // -------------------------------------------------------------------------
  private wrap(instance: T): Resource<T> {
    let released = false;

    const release = async () => {
      if (released) {
        throw new Error('Resource already released');
      }
      released = true;

      if (this.closed) {
        await this.disposer(instance);
        this.totalInstances--;
        return;
      }

      // If someone is waiting, give them this instance immediately
      const waiter = this.pendingAcquires.shift();
      if (waiter) {
        waiter(this.wrap(instance));
      } else {
        this.idle.push(instance);
      }
    };

    return { value: instance, release };
  }
}

Why this design is type‑safe

  • The generic T is inferred from the factory you pass, so acquire() returns a Resource<T> with the exact type.
  • release can be called only on the object returned by acquire; any stray T you create yourself cannot be released, preventing accidental double‑release.
  • The pool tracks its own state (closed, totalInstances) and enforces the contract at runtime, while the compiler guarantees you work with the right shape.

3. Use‑Case #1 – A Worker‑Thread Pool for Next.js API Routes

3.1. Problem Statement

A Next.js API route needs to compress images, a CPU‑heavy task. Spawning a new Worker per request would overwhelm the V8 thread pool and increase latency. Instead we keep a small pool of pre‑started workers.

3.2. Worker Code (worker.ts)

// worker.ts
import { parentPort } from 'worker_threads';
import { promises as fs } from 'fs';
import sharp from 'sharp';

parentPort!.on('message', async (msg: { id: number; src: string; dest: string }) => {
  try {
    const data = await sharp(msg.src).resize(800).toFile(msg.dest);
    parentPort!.postMessage({ id: msg.id, ok: true, data });
  } catch (err) {
    parentPort!.postMessage({ id: msg.id, ok: false, error: (err as Error).message });
  }
});

3.3. Pool Construction

// workerPool.ts
import { Worker } from 'worker_threads';
import { GenericAsyncPool } from './genericPool';

type WorkerMessage = { id: number; ok: boolean; data?: any; error?: string };

export const workerPool = new GenericAsyncPool<Worker>({
  maxSize: 4, // tune to number of CPU cores
  factory: async () => {
    const worker = new Worker(new URL('./worker.ts', import.meta.url), {
      workerData: null,
    });

    // Ensure the worker is ready before handing it out
    await new Promise((resolve, reject) => {
      const onError = (err: Error) => {
        worker.removeListener('online', onOnline);
        reject(err);
      };
      const onOnline = () => {
        worker.removeListener('error', onError);
        resolve(undefined);
      };
      worker.once('error', onError);
      worker.once('online', onOnline);
    });

    return worker;
  },
  disposer: async (w) => {
    w.terminate();
  },
});

3.4. Helper to Run a Job

let requestId = 0;
export async function compressImage(src: string, dest: string): Promise<void> {
  const { value: worker, release } = await workerPool.acquire();

  try {
    const id = ++requestId;
    const result = await new Promise<WorkerMessage>((resolve) => {
      const handler = (msg: WorkerMessage) => {
        if (msg.id === id) {
          worker.off('message', handler);
          resolve(msg);
        }
      };
      worker.on('message', handler);
      worker.postMessage({ id, src, dest });
    });

    if (!result.ok) {
      throw new Error(result.error);
    }
  } finally {
    // Guarantees release even if the job throws
    await release();
  }
}

3.5. Using It in a Next.js API Route

// pages/api/compress.ts
import type { NextApiRequest, NextApiResponse } from 'next';
import { compressImage } from '../../lib/workerPool';
import { promises as fs } from 'fs';
import path from 'path';

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  if (req.method !== 'POST') {
    res.status(405).end();
    return;
  }

  const { url } = req.body;
  if (!url) {
    res.status(400).json({ error: 'Missing url' });
    return;
  }

  const tmpSrc = path.join('/tmp', `src-${Date.now()}.png`);
  const tmpDest = path.join('/tmp', `dest-${Date.now()}.png`);

  // Download the image (omitted error handling for brevity)
  const resp = await fetch(url);
  const arrayBuffer = await resp.arrayBuffer();
  await fs.writeFile(tmpSrc, Buffer.from(arrayBuffer));

  try {
    await compressImage(tmpSrc, tmpDest);
    const compressed = await fs.readFile(tmpDest);
    res.setHeader('Content-Type', 'image/png');
    res.send(compressed);
  } finally {
    // Clean up temporary files
    await Promise.all([fs.unlink(tmpSrc), fs.unlink(tmpDest)].map(p => p.catch(() => {})));
  }
}

Takeaways

  • The pool caps the number of OS threads (maxSize: 4), avoiding oversubscription.
  • The Resource<Worker> wrapper forces the caller to release() in a finally block—no leaked workers.
  • All TypeScript types flow from the worker definition (WorkerMessage) to the API response, giving end‑to‑end safety.

4. Use‑Case #2 – A PostgreSQL Connection Manager for a Node.js Service

4.1. Why Not Use pg.Pool Directly?

pg.Pool is already a solid library, but it returns a raw client (any from the TypeScript perspective) and its lifecycle is hidden behind callbacks. By wrapping it with our generic pool we gain:

  • Explicit acquire/release with compile‑time safety.
  • Ability to swap the implementation (e.g., switch to pg-native or a mocked client) without touching business logic.
  • Uniform API across all resources in the codebase.

4.2. Minimal pg Wrapper

// pgPool.ts
import { Pool, PoolClient } from 'pg';
import { GenericAsyncPool } from './genericPool';

const pg = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20, // underlying pg pool size
});

export const pgPool = new GenericAsyncPool<PoolClient>({
  maxSize: 20,
  factory: async () => {
    const client = await pg.connect();
    // Optionally run a health‑check query
    await client.query('SELECT 1');
    return client;
  },
  disposer: async (client) => {
    client.release(); // returns to pg's internal pool
  },
});

4.3. Business Logic Using the Typed Pool

// services/userService.ts
import type { User } from '../types';
import { pgPool } from '../lib/pgPool';

export async function getUserById(id: string): Promise<User | null> {
  const { value: client, release } = await pgPool.acquire();
  try {
    const res = await client.query<User>(
      'SELECT id, name, email FROM users WHERE id = $1',
      [id]
    );
    return res.rows[0] ?? null;
  } finally {
    await release(); // always returns the client
  }
}

The generic pool guarantees that every branch (even early returns or thrown errors) must release the client, eliminating the classic “forgot to release client” bug that can cause connection exhaustion.

4.4. Graceful Shutdown

// server.ts
import http from 'http';
import { pgPool } from './lib/pgPool';
import { workerPool } from './lib/workerPool';

const server = http.createServer(/* request handler */);

process.on('SIGTERM', async () => {
  console.log('Shutting down...');
  server.close(async () => {
    await Promise.all([pgPool.close(), workerPool.close()]);
    process.exit(0);
  });
});

Both pools expose a close() method that disposes of idle resources and prevents new acquires, making a clean shutdown straightforward.


5. Extending the Generic Pool

5.1. Timeout‑Based Acquire

async acquire(options?: { timeoutMs?: number }): Promise<Resource<T>> {
  if (options?.timeoutMs) {
    return Promise.race([
      this.acquire(),
      new Promise<never>((_, reject) =>
        setTimeout(() => reject(new Error('Acquire timeout')), options.timeoutMs)
      ),
    ]);
  }
  return this.acquire(); // original path
}

5.2. Validation Hook

A validator?: (instance: T) => Promise<boolean> can be added to the constructor. Before returning an idle instance, the pool runs the validator; if it fails, the instance is disposed and a fresh one is created.

5.3. Prioritised Queues

Replace the pendingAcquires array with a priority queue (e.g., tinyqueue). The API stays the same, but high‑priority requests are served first. This is handy for “admin” vs “background” tasks.


6. Best Practices Checklist

Practice
1 Always acquire within a try / finally block so release() runs even on exceptions.
2 Keep maxSize realistic – for CPU‑bound work, match the number of logical cores; for I/O, tune based on external limits.
3 Expose only the generic AsyncPool<T> to the rest of your codebase. Hide the concrete GenericAsyncPool implementation behind a module that knows the factory/disposer.
4 Instrument stats (total/idle/pending) and surface them via Prometheus or a health endpoint.
5 Handle shutdown gracefully – call close() before process exit to avoid dangling threads or open sockets.
6 Write unit tests that simulate acquisition failures (factory throws) and ensure the pool’s internal counters stay consistent.

7. Conclusion

A type‑safe asynchronous pool gives you:

  • Predictable resource usage – you never exceed the configured ceiling.
  • Compile‑time guarantees that the resource you work with matches its concrete type.
  • Uniform lifecycle handling across vastly different resources (workers, DB clients, HTTP agents).

Because the pool is completely generic, you can add new resource types with a single line of configuration, keeping your code DRY and your runtime behaviour reliable.

Give the GenericAsyncPool a spin in your next Node.js or Next.js project, and you’ll see a noticeable drop in both resource leaks and “out‑of‑memory” crashes—while enjoying the safety net that TypeScript provides.