Durable Work in PostgresPart 8
Schema and operations
Full Postgres schema, inbox SQL, observability, testing, and schema evolution for order:9182.
A durable work queue is not done when the worker loop runs. It needs schema, indexes, support queries, housekeeping, and tests that prove the guarantees.
This chapter turns the pattern into an operating surface your team can inspect during an incident without reconstructing the design from memory.
The operational contract
The reference table is named inbox, and the same columns work for outbox and projection queues. No Redis, ZooKeeper, workflow engine, or external coordinator is required for this version. Download the migration or read section by section.
The schema carries three kinds of state: worker liveness, row ownership, and retry history. Keep those concepts explicit. If a row is stuck, you should be able to answer who claimed it, when the lease expires, how many attempts ran, and whether the next action is retry, dead-letter, or manual inspection.
The canonical SQL migration lives at schema.sql. The diagram below mirrors the two core tables; per-post source.sql files extend this base.
| Ship this | Purpose | Minimum check |
|---|---|---|
| Worker registry | Know which workers are alive, draining, or dead | Heartbeat upsert updates last_seen_at. |
| Inbox table | Persist work, ownership, attempts, and idempotency | Duplicate enqueue with the same key creates one logical row. |
| Claim query | Give each row to one worker at a time | Concurrent claims never return the same row generation. |
| Lease cleanup | Recover rows from crashed workers | Expired processing rows return to pending or dead_letter. |
| Support queries | Let on-call inspect lag and stuck work | Runbook covers oldest pending, expired processing, and dead letters. |
| Guarantee tests | Protect the contract during refactors | Tests cover dedupe, exclusivity, expiry, stale completion, and dead lettering. |
Workers registry
The worker registry is the source of live membership for hash-ring ownership and housekeeping. It is intentionally small: identity, status, heartbeat time, and optional metadata for deploy version or host diagnostics.
CREATE TABLE workers (
id TEXT PRIMARY KEY, -- e.g. hostname + pid
status TEXT NOT NULL DEFAULT 'alive'
CHECK (status IN ('alive', 'draining', 'dead')),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
metadata JSONB DEFAULT '{}' -- version, host, load
);
CREATE INDEX idx_workers_alive
ON workers (last_seen_at)
WHERE status = 'alive';
Queue table: inbox
CREATE TYPE inbox_status AS ENUM (
'pending', 'processing', 'completed', 'failed', 'dead_letter'
);
CREATE TABLE inbox (
id UUID PRIMARY KEY DEFAULT uuidv7(),
partition_key TEXT NOT NULL, -- stream id (producer sets this)
partition_bucket INT NOT NULL
CHECK (partition_bucket >= 0 AND partition_bucket < 1024),
payload JSONB NOT NULL,
status inbox_status NOT NULL DEFAULT 'pending',
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 5,
claimed_by TEXT REFERENCES workers(id),
claimed_at TIMESTAMPTZ,
lease_expires_at TIMESTAMPTZ,
lease_generation BIGINT NOT NULL DEFAULT 0, -- fencing token
available_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ,
last_error TEXT,
idempotency_key TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_inbox_claim
ON inbox (available_at, created_at, id)
WHERE status = 'pending';
CREATE INDEX idx_inbox_bucket_claim
ON inbox (partition_bucket, available_at, created_at, id)
WHERE status = 'pending';
CREATE INDEX idx_inbox_partition_processing
ON inbox (partition_key)
WHERE status = 'processing';
CREATE INDEX idx_inbox_stale_processing
ON inbox (lease_expires_at, id)
WHERE status = 'processing';
CREATE UNIQUE INDEX idx_inbox_idempotency_key
ON inbox (idempotency_key)
WHERE idempotency_key IS NOT NULL;
Heartbeat: upsert on every tick
Heartbeat writes should be small and cheap. They do not claim work and they do not run recovery. They only keep the live worker list fresh enough for ring ownership and stale-worker detection.
-- Called every 10 seconds by each worker
INSERT INTO workers (id, last_seen_at, metadata)
VALUES ('worker-host1-4821', now(), '{"version":"1.2.0"}')
ON CONFLICT (id) DO UPDATE SET
last_seen_at = EXCLUDED.last_seen_at,
status = 'alive',
metadata = EXCLUDED.metadata;
-- Heartbeat only, stale-worker sweep runs in maybeRunHousekeeping() (see patterns)
Claim work: filter by bucket in SQL
The claim path is where most performance mistakes happen. Build the ring in application code, turn it into an owned bucket list, and push that filter into SQL. Do not claim rows you plan to discard in memory.
-- Step 1: live workers → build ring → owned_buckets[] in app
SELECT id FROM workers
WHERE status = 'alive'
AND last_seen_at > now() - interval '30 seconds'
ORDER BY id;
-- Step 2: claim only rows this worker owns (partition_bucket in SQL)
WITH candidates AS (
SELECT i.id
FROM inbox i
WHERE i.status = 'pending'
AND i.available_at <= now()
AND i.partition_bucket = ANY($1::int[]) -- owned buckets
ORDER BY i.created_at, i.id
LIMIT 25
FOR UPDATE SKIP LOCKED
)
UPDATE inbox i
SET
status = 'processing',
claimed_by = $2,
claimed_at = now(),
lease_expires_at = now() + interval '90 seconds',
lease_generation = i.lease_generation + 1,
attempts = i.attempts + 1
FROM candidates c
WHERE i.id = c.id
RETURNING i.*;
Lease cleanup: recover from crashes
Run these inside maybeRunHousekeeping() when the advisory lock is held, not in a separate cron job.
UPDATE inbox
SET
status = 'pending',
claimed_by = NULL,
claimed_at = NULL,
lease_expires_at = NULL,
available_at = now() + (LEAST(POWER(2, attempts)::int, 3600) * interval '1 second')
WHERE status = 'processing'
AND lease_expires_at < now()
AND attempts < max_attempts;
-- Move exhausted retries to dead letter
UPDATE inbox
SET status = 'dead_letter'
WHERE status = 'processing'
AND lease_expires_at < now()
AND attempts >= max_attempts;
Support queries
Operationally, the queue is only as useful as the questions it can answer quickly. Keep these queries in a runbook so support and on-call engineers can inspect backlog health without writing SQL from scratch during an incident.
-- Oldest pending work: user-visible lag signal
SELECT now() - min(created_at) AS oldest_pending_age
FROM inbox
WHERE status = 'pending';
-- Rows stuck in processing beyond their lease
SELECT id, partition_key, claimed_by, lease_expires_at, attempts, last_error
FROM inbox
WHERE status = 'processing'
AND lease_expires_at < now()
ORDER BY lease_expires_at
LIMIT 50;
-- Dead letters grouped by handler or stream
SELECT partition_key, count(*) AS dead_letters, max(last_error) AS sample_error
FROM inbox
WHERE status = 'dead_letter'
GROUP BY partition_key
ORDER BY dead_letters DESC;
Guarantee tests
Test the guarantees, not the implementation trivia. The database contract should prove that duplicate enqueue is blocked, claimed rows are exclusive, expired leases return to pending, stale owners cannot complete, and exhausted rows move to dead_letter.
- Producer retry with the same
idempotency_keycreates one logical row. - Two workers claiming concurrently never receive the same row in the same generation.
- An expired lease is returned to
pendingand can be claimed by a new worker. - A stale worker completing with an old
claimed_byor fence token affects zero rows. - A row past
max_attemptslands indead_letterand no longer blocks the stream.
Source
Use the article for explanation, then use these files when you want the complete SQL and TypeScript in one place.