Type‑Safe Reactive Data Pipelines with WebGPU Compute Shaders in TypeScript
Introduction
Modern JavaScript applications increasingly need to process large streams of data—image/video frames, sensor feeds, or financial tick streams—while keeping latency low enough for interactive experiences. CPUs are great at general‑purpose logic, but they quickly become the bottleneck when the same operation must be applied to millions of elements each frame.
WebGPU, the emerging web standard for low‑level graphics and compute, gives browsers direct access to the GPU’s massive parallelism. Paired with TypeScript’s static type system, we can construct type‑safe reactive pipelines that:
- Declare the shape of input and output buffers at compile time.
- Compose small, reusable compute shaders into larger processing graphs.
- React to data sources (WebSockets, MediaStreams, IndexedDB) and push results back to the UI without copying more than necessary.
This article walks through a real‑world example—a live video filter that detects edges and applies a stylized color map—while teaching the patterns that make the pipeline type‑safe, testable, and ergonomic.
1. Prerequisites
| What you need | Why |
|---|---|
| Browser with WebGPU (Chrome 119+, Edge 119+, or Firefox Nightly) | Only browsers that expose navigator.gpu can run compute shaders. |
| TypeScript 5.x | Leverages template literal types and satisfies for shader‑code validation. |
| Node >=20 (optional) for local dev server | Simple vite or webpack dev server. |
| Basic knowledge of WGSL (WebGPU Shading Language) | The compute kernels are written in WGSL. |
Install the dev tooling:
npm init -y
npm i -D typescript vite
npm i @webgpu/types
Create tsconfig.json with "strict": true and "moduleResolution": "bundler" so that the TypeScript compiler can resolve the .wgsl shader files as raw strings (via Vite’s raw import).
2. Defining a Typed Buffer Contract
WebGPU buffers are untyped ArrayBuffers at runtime. To keep the rest of the application type‑safe we encode the layout in TypeScript:
// src/bufferTypes.ts
export interface Vec4 {
r: number; g: number; b: number; a: number;
}
/** A generic GPU buffer description.
* `T` describes a single element; `N` is the compile‑time length.
*/
export type GpuBuffer<T, N extends number> = {
readonly data: Float32Array; // flat view
readonly length: N;
readonly stride: number; // bytes per element
};
/** Helper to create a typed buffer from a plain array. */
export function makeBuffer<T extends object, N extends number>(
arr: readonly T[],
length: N
): GpuBuffer<T, N> {
const stride = Object.keys(arr[0]).length * Float32Array.BYTES_PER_ELEMENT;
const flat = new Float32Array(stride / 4 * length);
arr.forEach((obj, i) => {
const offset = i * (stride / 4);
let j = 0;
for (const key of Object.keys(obj) as (keyof T)[]) {
flat[offset + j++] = Number(obj[key]);
}
});
return { data: flat, length, stride };
}
The GpuBuffer type guarantees that any function receiving a buffer knows exactly how many elements it holds (N) and the size of each element (stride). This eliminates off‑by‑one bugs that often surface when interfacing with GPU APIs.
3. Loading and Validating WGSL at Compile Time
WGSL code lives in separate .wgsl files. Vite’s raw plugin lets us import the source as a string, then we validate the string against a tiny TypeScript schema:
// src/shaders/edgeDetect.wgsl
export const edgeDetectWGSL = /* wgsl */`
@group(0) @binding(0) var<storage, read> src : array<vec4<f32>>;
@group(0) @binding(1) var<storage, write> dst : array<vec4<f32>>;
fn luminance(c: vec4<f32>) -> f32 {
return dot(c.rgb, vec3<f32>(0.299, 0.587, 0.114));
}
@compute @workgroup_size(16, 16)
fn main(@builtin(global_invocation_id) gid : vec3<u32>) {
let idx = gid.y * u32(imageWidth) + gid.x;
if (idx >= arrayLength(&src)) { return; }
// Simple Sobel operator (horizontal only for brevity)
let left = luminance(src[max(i32(idx) - 1, 0)]);
let right = luminance(src[min(i32(idx) + 1, i32(arrayLength(&src)) - 1)]);
let edge = abs(right - left);
dst[idx] = vec4<f32>(edge, edge, edge, 1.0);
}
`;
To make sure the shader matches the expected buffer layout we write a type‑level contract:
// src/shaders/contracts.ts
import type { GpuBuffer } from '../bufferTypes';
export interface EdgeDetectPipeline {
src: GpuBuffer<Vec4, number>;
dst: GpuBuffer<Vec4, number>;
}
When we instantiate the pipeline we use a satisfies check, which causes the compiler to verify that the actual buffers we pass conform to the contract:
import { edgeDetectWGSL } from './edgeDetect.wgsl';
import type { EdgeDetectPipeline } from './contracts';
function createEdgeDetectPipeline<T extends EdgeDetectPipeline>(params: T) {
// The generic T forces the caller to provide buffers that match the contract.
// No runtime validation required.
return params;
}
If a developer accidentally swaps src and dst, TypeScript will raise a compile‑time error because the property names are part of the contract.
4. Building the Reactive Pipeline Primitive
A pipeline stage is a small object that knows how to:
- Create a
GPUComputePipelinefrom WGSL. - Bind the required buffers.
- Dispatch work groups.
- Return a promise that resolves when the GPU work is finished.
// src/pipeline.ts
export type PipelineStage<I, O> = {
run: (input: I) => Promise<O>;
};
export function makeComputeStage<I, O, C extends Record<string, GpuBuffer<any, any>>>(
device: GPUDevice,
wgsl: string,
bindings: C,
workgroup: [number, number, number],
label?: string
): PipelineStage<I, O> {
// 1️⃣ Compile WGSL
const module = device.createShaderModule({ code: wgsl, label });
const pipeline = device.createComputePipeline({
label,
layout: 'auto',
compute: { module, entryPoint: 'main' },
});
// 2️⃣ Create bind group from the supplied buffers
const bindGroup = device.createBindGroup({
layout: pipeline.getBindGroupLayout(0),
entries: Object.entries(bindings).map(([key, buf], i) => ({
binding: i,
resource: {
buffer: device.createBuffer({
size: buf.data.byteLength,
usage: GPUBufferUsage.STORAGE | GPUBufferUsage.COPY_SRC | GPUBufferUsage.COPY_DST,
mappedAtCreation: true,
}),
},
})),
});
// 3️⃣ Return a runnable stage
return {
async run(_input: I): Promise<O> {
// Upload input data
const srcBuf = bindGroup.getBinding(0).resource.buffer;
new Float32Array(srcBuf.getMappedRange()).set(bindings.src.data);
srcBuf.unmap();
// Encode commands
const encoder = device.createCommandEncoder();
const pass = encoder.beginComputePass();
pass.setPipeline(pipeline);
pass.setBindGroup(0, bindGroup);
pass.dispatchWorkgroups(...workgroup);
pass.end();
// Submit and wait
const gpuCmd = encoder.finish();
device.queue.submit([gpuCmd]);
await device.queue.onSubmittedWorkDone();
// Read back result
const dstBuf = bindGroup.getBinding(1).resource.buffer;
await dstBuf.mapAsync(GPUMapMode.READ);
const resultArray = new Float32Array(dstBuf.getMappedRange()).slice();
dstBuf.unmap();
// Cast back to the typed buffer shape
const typed = { ...bindings.dst, data: resultArray } as O;
return typed;
},
};
}
Why is this type‑safe?
- The generic
Cforces the caller to provide a mapping whose keys line up with the shader’s bindings (the order is deterministic). - The
runmethod returns a concrete typeO, typically aGpuBufferwith the same length as the input. - No
anyor unsafe casts leak outside the primitive.
5. Composing Stages with RxJS (Optional)
Although the core of the article is about WebGPU, most real‑time apps already use a reactive library such as RxJS. We can wrap a PipelineStage as an OperatorFunction:
import type { OperatorFunction } from 'rxjs';
import { from } from 'rxjs';
export function gpuOperator<I, O>(stage: PipelineStage<I, O>): OperatorFunction<I, O> {
return (source) => source.pipe(
// Convert each emission into a promise and flatten
// (you could also use concatMap/mergeMap based on back‑pressure needs)
source => from(source).pipe(
concatMap(item => from(stage.run(item)))
)
);
}
Now a video frame source becomes a stream of typed buffers:
import { fromEvent } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { gpuOperator } from './pipeline';
import { edgeDetectWGSL } from './shaders/edgeDetect.wgsl';
// Assume we have a <video id="cam"> element already streaming
const video = document.getElementById('cam') as HTMLVideoElement;
const canvas = document.createElement('canvas');
const ctx = canvas.getContext('2d')!;
const device = await navigator.gpu.requestAdapter()
.then(a => a!.requestDevice());
const edgeStage = makeComputeStage(
device,
edgeDetectWGSL,
{
src: { data: new Float32Array(), length: 0, stride: 16 },
dst: { data: new Float32Array(), length: 0, stride: 16 },
},
[Math.ceil(video.videoWidth / 16), Math.ceil(video.videoHeight / 16), 1],
'EdgeDetect'
);
const frame$ = fromEvent(video, 'play').pipe(
switchMap(() => new Observable<number>(observer => {
const tick = () => {
if (video.paused || video.ended) { observer.complete(); return; }
observer.next(performance.now());
requestAnimationFrame(tick);
};
tick();
}))
);
frame$
.pipe(
map(() => {
// Capture current frame into a Float32Array
canvas.width = video.videoWidth;
canvas.height = video.videoHeight;
ctx.drawImage(video, 0, 0);
const imagedata = ctx.getImageData(0, 0, canvas.width, canvas.height);
const pixels = new Float32Array(imagedata.data.buffer);
return makeBuffer<Vec4, typeof canvas.width>( // length inferred
// Convert flat RGBA to Vec4 objects (omitted for brevity)
// …
[] as any,
canvas.width * canvas.height
);
}),
gpuOperator(edgeStage),
map(buf => {
// Convert Float32Array back to ImageData and paint
const imageData = new ImageData(
new Uint8ClampedArray(buf.data.buffer),
canvas.width,
canvas.height
);
ctx.putImageData(imageData, 0, 0);
})
)
.subscribe();
The observable chain does not need any any casts; each step’s input and output type is enforced by the compiler.
6. Real‑World Case Study: Stylized Video Filter
Let’s extend the pipeline with a second stage that maps the edge intensity to a color palette (e.g., a heat map). The WGSL for the color map is tiny:
// src/shaders/colormap.wgsl
@group(0) @binding(0) var<storage, read> edges : array<f32>;
@group(0) @binding(1) var<storage, write> out : array<vec4<f32>>;
fn palette(v: f32) -> vec4<f32> {
// simple gradient: black -> red -> yellow -> white
return mix(
mix(vec4<f32>(0.0,0.0,0.0,1.0), vec4<f32>(1.0,0.0,0.0,1.0), v),
mix(vec4<f32>(1.0,1.0,0.0,1.0), vec4<f32>(1.0,1.0,1.0,1.0), v),
v);
}
@compute @workgroup_size(256)
fn main(@builtin(global_invocation_id) gid : vec3<u32>) {
let i = gid.x;
if (i >= arrayLength(&edges)) { return; }
out[i] = palette(edges[i]);
}
We define a second contract and stage:
// src/shaders/contracts.ts (add)
export interface ColorMapPipeline {
edges: GpuBuffer<number, number>;
out: GpuBuffer<Vec4, number>;
}
// src/pipelineFactory.ts
export function makeColorMapStage(device: GPUDevice, width: number, height: number) {
const size = width * height;
const edges = { data: new Float32Array(size), length: size, stride: 4 };
const out = { data: new Float32Array(size * 4), length: size, stride: 16 };
return makeComputeStage<{ dummy: void }, ColorMapPipeline>(
device,
colormapWGSL,
{ edges, out },
[Math.ceil(size / 256), 1, 1],
'ColorMap'
);
}
Now we can compose the two stages:
const edgeStage = makeComputeStage(...); // as before
const colorStage = makeColorMapStage(device, video.videoWidth, video.videoHeight);
frame$
.pipe(
map(captureFrame),
gpuOperator(edgeStage), // → GpuBuffer<Vec4>
map(buf => ({
// flatten edge intensity (luminance) into a Float32Array for the next stage
edges: { data: new Float32Array(buf.data), length: buf.length, stride: 4 },
out: { data: new Float32Array(buf.length * 4), length: buf.length, stride: 16 },
})),
gpuOperator(colorStage),
map(finalBuf => renderToCanvas(finalBuf.out))
)
.subscribe();
The type‑safety chain guarantees that:
- The output of
edgeStageis compatible with the input ofcolorStage. - Buffer lengths never mismatch because they are carried as generic numeric literals (
N extends number). - Any change to the shader (e.g., adding a new binding) forces a compile‑time update of the corresponding contract.
7. Testing Compute Shaders with Jest
Because the heavy lifting happens on the GPU, unit‑testing the pure‑JS glue code is critical. We can mock GPUDevice with the gpu-mock library or, for simple arithmetic kernels, run the WGSL in a software fallback using wgsl‑to‑js (a community tool). Example test for the Sobel kernel:
test('edgeDetect produces higher values on high‑contrast edges', async () => {
const device = await getMockDevice(); // returns a GPUDevice that runs on CPU
const stage = makeComputeStage(edgeDetectWGSL, mockBuffers, [4,4,1]);
const input = makeBuffer<Vec4, 16>(/* 4x4 checkerboard */, 16);
const result = await stage.run({ src: input, dst: mockBuffers.dst });
// Expect the centre pixels to have non‑zero edge magnitude
expect(result.data[0]).toBeCloseTo(0);
expect(result.data[4]).toBeGreaterThan(0.5);
});
By keeping the shader source as a string, the same file can be fed to the mock device, ensuring logic parity between test and production environments.
8. Performance Tips
| Tip | Reason |
|---|---|
Reuse GPUBuffers |
Creating buffers each frame adds GC pressure. Create them once and only update their contents via queue.writeBuffer. |
| Align workgroup sizes | GPUs execute workgroups in multiples of the wave size (usually 32). Padding the image dimensions to a multiple of 16×16 reduces divergent branches. |
Avoid MAP_READ on every frame |
Mapping a buffer stalls the pipeline. Instead, keep a staging buffer that the GPU writes to, then copy it to a read‑back buffer only when needed (e.g., every nth frame). |
| Batch multiple logical stages into one WGSL module | The driver launch overhead is non‑trivial; fusing small kernels can improve throughput. |
Profile with gpu-timeline |
Chrome’s “WebGPU” devtools panel shows shader execution time, helping you locate bottlenecks. |
9. Going Beyond – Integration with Web Workers
For ultra‑low latency you can move the entire pipeline into a dedicated Web Worker that owns the GPUDevice. The main thread merely posts ArrayBuffers (transferable) and receives the processed buffer back, eliminating the main‑thread copy entirely.
// worker.ts
self.onmessage = async (e) => {
const { videoFrame, width, height } = e.data;
const device = await navigator.gpu.requestAdapter().then(a => a!.requestDevice());
// Build pipelines once, reuse across frames …
const edge = makeComputeStage(...);
const color = makeColorMapStage(device, width, height);
const input = makeBufferFromVideoFrame(videoFrame);
const edges = await edge.run(input);
const final = await color.run(edges);
// Transfer the final Float32Array back
self.postMessage({ result: final.data.buffer }, [final.data.buffer]);
};
The TypeScript types remain the same; only the execution context changes.
10. Conclusion
WebGPU brings the power of the GPU to the web, but without disciplined typing it is easy to slip into the same pitfalls that plague low‑level graphics code. By:
- Encoding buffer layouts as generic TypeScript types,
- Binding WGSL source to compile‑time contracts, and
- Wrapping compute passes in reusable, type‑safe pipeline stages,
we obtain a reactive data‑flow architecture that is both performant and maintainable. The example of a live edge‑detect + color‑map filter demonstrates how a few concise abstractions let you stitch together arbitrarily complex pipelines while keeping the compiler as your first line of defense.
Give it a try in your next real‑time dashboard, AR/VR preview, or scientific visualization—your users will feel the speed, and your teammates will appreciate the safety.
Member discussion