OqronKitOqronKit

PubSub

Durable topics, consumer groups, partitions, offsets, replay, and dead letters

PubSub

The PubSub module provides durable publish/subscribe messaging for events that must survive process crashes. It is built on the existing OqronKit storage, broker, and lock adapters, so there is no separate PubSub adapter family to configure or maintain.

PubSub is an at-least-once delivery system. Use idempotencyKey when publishing and make handlers idempotent when the side effect cannot safely run twice.

Working example → See a complete PubSub implementation with partitioned order events, multiple consumer groups (billing + analytics), and manual-ack notification dispatch: apps/backend/src/triggers/pubsub.ts

How It Works

The PubSub module is built on top of OqronKit's existing Storage, Broker, and Lock adapters. Messages are durably stored, partitioned, and delivered to consumer groups with independent offsets.

Message Flow

.publish(message, opts)

  ├─ 1. Validate message (topic.validate())
  ├─ 2. Check idempotency key (skip if duplicate)
  ├─ 3. Assign partition:
  │    ├─ partitionKey provided? → hash(key) % partitions
  │    └─ No key? → round-robin assignment
  ├─ 4. Assign monotonic offset within partition
  ├─ 5. Persist message record to Storage
  └─ 6. Notify Broker (wake subscriber poll loops)


.subscribe({ group, handler })

  │  ┌──── Poll Loop (per group, per process) ──────────┐
  │  │                                                   │
  ├──┤ 1. Query Storage for messages after group offset  │
  │  │    (respects partition assignment, batchSize)     │
  │  │ 2. Apply subscription filter (if set)             │
  │  │ 3. Create delivery record for each message        │
  │  │ 4. Execute handler(ctx) per delivery              │
  │  │                                                   │
  │  │ On handler completion:                            │
  │  │  ├─ auto ack mode: ack on resolve, nack on reject │
  │  │  └─ manual mode: wait for ctx.ack() / ctx.nack()  │
  │  │                                                   │
  │  │ 5. Commit offset after successful ack             │
  │  │ 6. On nack: apply retry backoff                   │
  │  │    All retries exhausted? → Dead letter            │
  │  └───────────────────────────────────────────────────┘

Consumer Group Isolation

Each consumer group maintains independent offsets. Multiple groups subscribing to the same topic each receive every message independently:

Topic: "order-events" (16 partitions)

  ├─ Group: "billing-service"    offset=142  (processing message 143)
  ├─ Group: "analytics-service"  offset=89   (behind, catching up)
  └─ Group: "shipping-service"   offset=142  (up to date)

Within a single group, multiple processes compete for work. Messages from the same partition are delivered to only one process at a time, preserving keyed ordering.

Reconciliation (Leader-Only)

A leader node periodically scans for:

  • Expired deliveries — Handler timed out without ack/nack. Re-queued for retry.
  • Due retries — Deliveries in backoff whose delay has elapsed. Re-published to Broker.
  • Retention cleanup — Deletes messages older than retention.maxAgeMs or beyond retention.maxCount.

Enable the Module

index.ts
import { OqronKit, pubsubModule } from "oqronkit";

await OqronKit.init({
  config: {
    project: "billing-platform",
    environment: "production",
    modules: [
      pubsubModule({
        concurrency: 10,
        ackTimeoutMs: 30_000,
        retries: {
          max: 5,
          strategy: "exponential",
          baseDelay: 1_000,
          maxDelay: 60_000,
        },
      }),
    ],
  },
});

project and environment are part of the container isolation prefix. They separate topics, offsets, group state, deliveries, and dead letters across apps and deployment environments.

Define a Topic

triggers/order-events.ts
import { topic } from "oqronkit";

type OrderEvent =
  | { type: "order.created"; orderId: string; accountId: string; total: number }
  | { type: "order.paid"; orderId: string; accountId: string; paymentId: string };

export const orderEvents = topic<OrderEvent>({
  name: "order-events",
  tags: ["orders", "billing"],

  retention: {
    maxAgeMs: 7 * 24 * 60 * 60 * 1000,
    maxCount: 100_000,
  },

  distribution: {
    partitions: 16,
    partitionKey: (message) => message.accountId,
  },

  validate: (message) =>
    message.orderId ? true : "orderId is required",
});

Publish Messages

const messageId = await orderEvents.publish(
  {
    type: "order.created",
    orderId: "ord_123",
    accountId: "acct_1",
    total: 99.99,
  },
  {
    idempotencyKey: "order-created:ord_123",
    partitionKey: "acct_1",
    correlationId: "req_abc",
    headers: {
      source: "checkout",
    },
  },
);

Batch publishing persists messages and pushes broker notifications in one call:

await orderEvents.publishBatch([
  {
    message: {
      type: "order.created",
      orderId: "ord_124",
      accountId: "acct_1",
      total: 149.5,
    },
    options: { idempotencyKey: "order-created:ord_124" },
  },
  {
    message: {
      type: "order.paid",
      orderId: "ord_124",
      accountId: "acct_1",
      paymentId: "pay_456",
    },
    options: { idempotencyKey: "order-paid:ord_124" },
  },
]);

Consumer Groups

Different groups each receive every message. Multiple processes using the same group compete for work and share that group's offsets.

triggers/billing-subscription.ts
import { orderEvents } from "./order-events";

await orderEvents.subscribe({
  group: "billing-service",
  concurrency: 8,
  batchSize: 50,
  maxInFlight: 200,
  startFrom: "earliest",

  filter: (event) => event.type === "order.paid",

  handler: async (ctx) => {
    await createInvoice(ctx.message.orderId);
    ctx.log.info(`invoice created for ${ctx.message.orderId}`);
  },
});
triggers/analytics-subscription.ts
import { orderEvents } from "./order-events";

await orderEvents.subscribe({
  group: "analytics-service",
  concurrency: 20,

  handler: async (ctx) => {
    await trackEvent(ctx.message);
  },
});

billing-service and analytics-service both receive the same topic stream. Inside each group, messages are delivered once at a time per partition to preserve keyed ordering.

Manual Ack

The default ackMode is auto: if the handler resolves, the delivery is acknowledged. Use manual when acknowledgement must happen only after a specific side effect completes.

await orderEvents.subscribe({
  group: "warehouse-service",
  ackMode: "manual",
  ackTimeoutMs: 60_000,

  handler: async (ctx) => {
    try {
      await reserveInventory(ctx.message);
      await ctx.ack();
    } catch (err) {
      await ctx.nack(err instanceof Error ? err.message : "reservation failed");
    }
  },
});

If a manual handler returns without ack(), nack(), or discard(), OqronKit nacks the delivery so it is not stranded.

Partitioning

Partitioning gives you horizontal distribution while preserving order for a key.

const accountEvents = topic<AccountEvent>({
  name: "account-events",
  distribution: {
    partitions: 32,
    partitionKey: (event) => event.accountId,
  },
});

You can also override the key per publish:

await accountEvents.publish(event, {
  partitionKey: event.accountId,
});

Use stable business keys such as tenantId, accountId, userId, or aggregateId. Avoid random keys when ordered processing matters.

Retries and Dead Letters

await orderEvents.subscribe({
  group: "shipping-service",
  retries: {
    max: 3,
    strategy: "exponential",
    baseDelay: 2_000,
    maxDelay: 30_000,
  },
  deadLetter: {
    enabled: true,
    onDead: async (messageId, message, error) => {
      await alertOps({ messageId, message, error });
    },
  },
  handler: async (ctx) => {
    await scheduleShipment(ctx.message);
  },
});

Dead letters are inspectable and retryable:

const dead = await orderEvents.deadLetters("shipping-service", { limit: 50 });

await orderEvents.retryDeadLetter("shipping-service", dead[0].messageId);
await orderEvents.retryAllDeadLetters("shipping-service");

Call ctx.discard(reason) inside a handler to skip retries and send the delivery directly to the terminal discarded state.

Replay and Offsets

Each consumer group owns independent committed offsets per partition.

// Reprocess from the beginning.
await orderEvents.seek("analytics-service", { position: "earliest" });

// Start at the newest published message.
await orderEvents.seek("analytics-service", { position: "latest" });

// Rewind to an offset, timestamp, or message.
await orderEvents.seek("analytics-service", { offset: 42 });
await orderEvents.seek("analytics-service", { timestamp: new Date("2026-01-01") });
await orderEvents.seek("analytics-service", { messageId: "msg_123" });

replay() is shorthand for resetting a group to a timestamp or offset:

await orderEvents.replay({
  group: "analytics-service",
  from: new Date("2026-01-01"),
});

Operations

const lag = await orderEvents.lag("billing-service");
const stats = await orderEvents.stats();

await orderEvents.pause("billing-service");
await orderEvents.resume("billing-service");

// Pause or resume every group on the topic.
await orderEvents.pause();
await orderEvents.resume();

// Remove all persisted PubSub state for this topic.
const removed = await orderEvents.purge();

stats() includes total messages, partition count, oldest and newest message timestamps, per-group lag, pending messages, dead-letter count, and committed offsets.

Module Configuration

OptionTypeDefaultDescription
concurrencynumber1Default parallel delivery limit per subscription
pollIntervalMsnumber100Broker polling interval
lockTtlMsnumber30000Broker lease TTL for claimed deliveries
ackTimeoutMsnumber30000Handler timeout before a delivery is nacked
retries.maxnumber3Default retry count
retries.strategy"fixed" | "exponential""exponential"Default retry backoff strategy
retries.baseDelaynumber1000Initial retry delay in milliseconds
retries.maxDelaynumber60000Maximum retry delay in milliseconds
deadLetter.enabledbooleantrueStore exhausted deliveries in dead letters
shutdownTimeoutnumber25000Graceful shutdown drain timeout
reconciliationIntervalMsnumber30000Leader interval for repairing expired leases and due retries. Set 0 to disable
reconciliationBatchSizenumber500Max delivery records repaired per scan
retentionIntervalMsnumber60000Leader interval for topic retention cleanup. Set 0 to disable

Topic Configuration

OptionTypeDescription
namestringUnique topic name
tagsstring[]Optional labels for organization
validate(message) => boolean | stringReject invalid messages before publish
retention.maxAgeMsnumberDelete messages older than this age
retention.maxCountnumberKeep only the newest N messages
distribution.partitionsnumberNumber of partitions. Defaults to 1
distribution.partitionKey(message, meta) => string | numberSelect the key used for hash partitioning
hooks.onPublishFunctionRuns after publish
hooks.onAckFunctionRuns after a group acknowledges a message
hooks.onDeadFunctionRuns when a delivery enters dead letters

Subscription Configuration

OptionTypeDefaultDescription
groupstringrequiredConsumer group name
handler(ctx) => Promise<void>requiredMessage processor
concurrencynumbermodule defaultMax parallel deliveries for this subscription
batchSizenumber50Max broker claims per poll
maxInFlightnumberconcurrencyMax active deliveries in this process
filter(message) => booleanoptionalSkip messages that do not match
startFrom"latest" | "earliest" | Date | number"latest"Initial offset when the group is first created
ackMode"auto" | "manual""auto"Handler acknowledgement mode
ackTimeoutMsnumbermodule defaultHandler timeout
retriesobjectmodule defaultPer-subscription retry overrides
deadLetterobjectmodule defaultPer-subscription dead-letter overrides

Adapter Notes

PubSub uses the same adapter surfaces as queues and workers:

  • Storage holds topic metadata, message records, group offsets, deliveries, idempotency keys, and dead letters.
  • Broker notifications wake subscribers and carry message IDs.
  • Locks elect one node for reconciliation and retention cleanup.

For production workloads, use Redis or PostgreSQL adapters. The memory adapters are useful for local development and tests, but they do not survive process restart.

Next Steps

On this page