PubSub
Durable topics, consumer groups, partitions, offsets, replay, and dead letters
PubSub
The PubSub module provides durable publish/subscribe messaging for events that must survive process crashes. It is built on the existing OqronKit storage, broker, and lock adapters, so there is no separate PubSub adapter family to configure or maintain.
PubSub is an at-least-once delivery system. Use idempotencyKey when publishing
and make handlers idempotent when the side effect cannot safely run twice.
Working example → See a complete PubSub implementation with partitioned order events, multiple consumer groups (billing + analytics), and manual-ack notification dispatch: apps/backend/src/triggers/pubsub.ts
How It Works
The PubSub module is built on top of OqronKit's existing Storage, Broker, and Lock adapters. Messages are durably stored, partitioned, and delivered to consumer groups with independent offsets.
Message Flow
.publish(message, opts)
│
├─ 1. Validate message (topic.validate())
├─ 2. Check idempotency key (skip if duplicate)
├─ 3. Assign partition:
│ ├─ partitionKey provided? → hash(key) % partitions
│ └─ No key? → round-robin assignment
├─ 4. Assign monotonic offset within partition
├─ 5. Persist message record to Storage
└─ 6. Notify Broker (wake subscriber poll loops)
.subscribe({ group, handler })
│
│ ┌──── Poll Loop (per group, per process) ──────────┐
│ │ │
├──┤ 1. Query Storage for messages after group offset │
│ │ (respects partition assignment, batchSize) │
│ │ 2. Apply subscription filter (if set) │
│ │ 3. Create delivery record for each message │
│ │ 4. Execute handler(ctx) per delivery │
│ │ │
│ │ On handler completion: │
│ │ ├─ auto ack mode: ack on resolve, nack on reject │
│ │ └─ manual mode: wait for ctx.ack() / ctx.nack() │
│ │ │
│ │ 5. Commit offset after successful ack │
│ │ 6. On nack: apply retry backoff │
│ │ All retries exhausted? → Dead letter │
│ └───────────────────────────────────────────────────┘Consumer Group Isolation
Each consumer group maintains independent offsets. Multiple groups subscribing to the same topic each receive every message independently:
Topic: "order-events" (16 partitions)
│
├─ Group: "billing-service" offset=142 (processing message 143)
├─ Group: "analytics-service" offset=89 (behind, catching up)
└─ Group: "shipping-service" offset=142 (up to date)Within a single group, multiple processes compete for work. Messages from the same partition are delivered to only one process at a time, preserving keyed ordering.
Reconciliation (Leader-Only)
A leader node periodically scans for:
- Expired deliveries — Handler timed out without ack/nack. Re-queued for retry.
- Due retries — Deliveries in backoff whose delay has elapsed. Re-published to Broker.
- Retention cleanup — Deletes messages older than
retention.maxAgeMsor beyondretention.maxCount.
Enable the Module
import { OqronKit, pubsubModule } from "oqronkit";
await OqronKit.init({
config: {
project: "billing-platform",
environment: "production",
modules: [
pubsubModule({
concurrency: 10,
ackTimeoutMs: 30_000,
retries: {
max: 5,
strategy: "exponential",
baseDelay: 1_000,
maxDelay: 60_000,
},
}),
],
},
});project and environment are part of the container isolation prefix. They
separate topics, offsets, group state, deliveries, and dead letters across apps
and deployment environments.
Define a Topic
import { topic } from "oqronkit";
type OrderEvent =
| { type: "order.created"; orderId: string; accountId: string; total: number }
| { type: "order.paid"; orderId: string; accountId: string; paymentId: string };
export const orderEvents = topic<OrderEvent>({
name: "order-events",
tags: ["orders", "billing"],
retention: {
maxAgeMs: 7 * 24 * 60 * 60 * 1000,
maxCount: 100_000,
},
distribution: {
partitions: 16,
partitionKey: (message) => message.accountId,
},
validate: (message) =>
message.orderId ? true : "orderId is required",
});Publish Messages
const messageId = await orderEvents.publish(
{
type: "order.created",
orderId: "ord_123",
accountId: "acct_1",
total: 99.99,
},
{
idempotencyKey: "order-created:ord_123",
partitionKey: "acct_1",
correlationId: "req_abc",
headers: {
source: "checkout",
},
},
);Batch publishing persists messages and pushes broker notifications in one call:
await orderEvents.publishBatch([
{
message: {
type: "order.created",
orderId: "ord_124",
accountId: "acct_1",
total: 149.5,
},
options: { idempotencyKey: "order-created:ord_124" },
},
{
message: {
type: "order.paid",
orderId: "ord_124",
accountId: "acct_1",
paymentId: "pay_456",
},
options: { idempotencyKey: "order-paid:ord_124" },
},
]);Consumer Groups
Different groups each receive every message. Multiple processes using the same group compete for work and share that group's offsets.
import { orderEvents } from "./order-events";
await orderEvents.subscribe({
group: "billing-service",
concurrency: 8,
batchSize: 50,
maxInFlight: 200,
startFrom: "earliest",
filter: (event) => event.type === "order.paid",
handler: async (ctx) => {
await createInvoice(ctx.message.orderId);
ctx.log.info(`invoice created for ${ctx.message.orderId}`);
},
});import { orderEvents } from "./order-events";
await orderEvents.subscribe({
group: "analytics-service",
concurrency: 20,
handler: async (ctx) => {
await trackEvent(ctx.message);
},
});billing-service and analytics-service both receive the same topic stream.
Inside each group, messages are delivered once at a time per partition to
preserve keyed ordering.
Manual Ack
The default ackMode is auto: if the handler resolves, the delivery is
acknowledged. Use manual when acknowledgement must happen only after a
specific side effect completes.
await orderEvents.subscribe({
group: "warehouse-service",
ackMode: "manual",
ackTimeoutMs: 60_000,
handler: async (ctx) => {
try {
await reserveInventory(ctx.message);
await ctx.ack();
} catch (err) {
await ctx.nack(err instanceof Error ? err.message : "reservation failed");
}
},
});If a manual handler returns without ack(), nack(), or discard(), OqronKit
nacks the delivery so it is not stranded.
Partitioning
Partitioning gives you horizontal distribution while preserving order for a key.
const accountEvents = topic<AccountEvent>({
name: "account-events",
distribution: {
partitions: 32,
partitionKey: (event) => event.accountId,
},
});You can also override the key per publish:
await accountEvents.publish(event, {
partitionKey: event.accountId,
});Use stable business keys such as tenantId, accountId, userId, or
aggregateId. Avoid random keys when ordered processing matters.
Retries and Dead Letters
await orderEvents.subscribe({
group: "shipping-service",
retries: {
max: 3,
strategy: "exponential",
baseDelay: 2_000,
maxDelay: 30_000,
},
deadLetter: {
enabled: true,
onDead: async (messageId, message, error) => {
await alertOps({ messageId, message, error });
},
},
handler: async (ctx) => {
await scheduleShipment(ctx.message);
},
});Dead letters are inspectable and retryable:
const dead = await orderEvents.deadLetters("shipping-service", { limit: 50 });
await orderEvents.retryDeadLetter("shipping-service", dead[0].messageId);
await orderEvents.retryAllDeadLetters("shipping-service");Call ctx.discard(reason) inside a handler to skip retries and send the
delivery directly to the terminal discarded state.
Replay and Offsets
Each consumer group owns independent committed offsets per partition.
// Reprocess from the beginning.
await orderEvents.seek("analytics-service", { position: "earliest" });
// Start at the newest published message.
await orderEvents.seek("analytics-service", { position: "latest" });
// Rewind to an offset, timestamp, or message.
await orderEvents.seek("analytics-service", { offset: 42 });
await orderEvents.seek("analytics-service", { timestamp: new Date("2026-01-01") });
await orderEvents.seek("analytics-service", { messageId: "msg_123" });replay() is shorthand for resetting a group to a timestamp or offset:
await orderEvents.replay({
group: "analytics-service",
from: new Date("2026-01-01"),
});Operations
const lag = await orderEvents.lag("billing-service");
const stats = await orderEvents.stats();
await orderEvents.pause("billing-service");
await orderEvents.resume("billing-service");
// Pause or resume every group on the topic.
await orderEvents.pause();
await orderEvents.resume();
// Remove all persisted PubSub state for this topic.
const removed = await orderEvents.purge();stats() includes total messages, partition count, oldest and newest message
timestamps, per-group lag, pending messages, dead-letter count, and committed
offsets.
Module Configuration
| Option | Type | Default | Description |
|---|---|---|---|
concurrency | number | 1 | Default parallel delivery limit per subscription |
pollIntervalMs | number | 100 | Broker polling interval |
lockTtlMs | number | 30000 | Broker lease TTL for claimed deliveries |
ackTimeoutMs | number | 30000 | Handler timeout before a delivery is nacked |
retries.max | number | 3 | Default retry count |
retries.strategy | "fixed" | "exponential" | "exponential" | Default retry backoff strategy |
retries.baseDelay | number | 1000 | Initial retry delay in milliseconds |
retries.maxDelay | number | 60000 | Maximum retry delay in milliseconds |
deadLetter.enabled | boolean | true | Store exhausted deliveries in dead letters |
shutdownTimeout | number | 25000 | Graceful shutdown drain timeout |
reconciliationIntervalMs | number | 30000 | Leader interval for repairing expired leases and due retries. Set 0 to disable |
reconciliationBatchSize | number | 500 | Max delivery records repaired per scan |
retentionIntervalMs | number | 60000 | Leader interval for topic retention cleanup. Set 0 to disable |
Topic Configuration
| Option | Type | Description |
|---|---|---|
name | string | Unique topic name |
tags | string[] | Optional labels for organization |
validate | (message) => boolean | string | Reject invalid messages before publish |
retention.maxAgeMs | number | Delete messages older than this age |
retention.maxCount | number | Keep only the newest N messages |
distribution.partitions | number | Number of partitions. Defaults to 1 |
distribution.partitionKey | (message, meta) => string | number | Select the key used for hash partitioning |
hooks.onPublish | Function | Runs after publish |
hooks.onAck | Function | Runs after a group acknowledges a message |
hooks.onDead | Function | Runs when a delivery enters dead letters |
Subscription Configuration
| Option | Type | Default | Description |
|---|---|---|---|
group | string | required | Consumer group name |
handler | (ctx) => Promise<void> | required | Message processor |
concurrency | number | module default | Max parallel deliveries for this subscription |
batchSize | number | 50 | Max broker claims per poll |
maxInFlight | number | concurrency | Max active deliveries in this process |
filter | (message) => boolean | optional | Skip messages that do not match |
startFrom | "latest" | "earliest" | Date | number | "latest" | Initial offset when the group is first created |
ackMode | "auto" | "manual" | "auto" | Handler acknowledgement mode |
ackTimeoutMs | number | module default | Handler timeout |
retries | object | module default | Per-subscription retry overrides |
deadLetter | object | module default | Per-subscription dead-letter overrides |
Adapter Notes
PubSub uses the same adapter surfaces as queues and workers:
- Storage holds topic metadata, message records, group offsets, deliveries, idempotency keys, and dead letters.
- Broker notifications wake subscribers and carry message IDs.
- Locks elect one node for reconciliation and retention cleanup.
For production workloads, use Redis or PostgreSQL adapters. The memory adapters are useful for local development and tests, but they do not survive process restart.