OqronKitOqronKit

PubSub

Durable topics and fan-out consumer groups

PubSub

The PubSub module provides durable publish/subscribe messaging with topic-based routing, consumer groups, and guaranteed at-least-once delivery.

Roadmap Module — PubSub is part of the v1.x Enterprise release. This page documents the planned API.

Planned API

Define a Topic

import { topic } from 'oqronkit'

const orderEvents = topic<OrderEvent>({
  name: 'order-events',

  // Message retention
  retention: {
    maxAge: '7d',       // Keep messages for 7 days
    maxCount: 100_000,  // Or max 100k messages
  },

  // Dead letter for unprocessable messages
  deadLetter: {
    enabled: true,
    maxRetries: 5,
  },
})

Publish Messages

// Publish a single message
await orderEvents.publish({
  event: 'order.created',
  orderId: 'ord_123',
  amount: 99.99,
})

// Publish batch
await orderEvents.publishBatch([
  { event: 'order.created', orderId: 'ord_123', amount: 99.99 },
  { event: 'order.created', orderId: 'ord_124', amount: 149.50 },
])

Subscribe with Consumer Groups

// Consumer group — each message is processed by exactly one consumer in the group
orderEvents.subscribe({
  group: 'billing-service',
  concurrency: 5,

  handler: async (ctx) => {
    const { event, orderId, amount } = ctx.message
    await chargeBilling(orderId, amount)
    ctx.ack() // Acknowledge processing
  },
})

// Fan-out — each subscriber gets every message independently
orderEvents.subscribe({
  group: 'analytics-service',
  concurrency: 10,

  handler: async (ctx) => {
    await trackEvent(ctx.message)
    ctx.ack()
  },
})

// Both billing-service AND analytics-service receive every message

Consumer Group Semantics

PatternBehavior
Competing consumersMultiple instances in the same group — messages load-balanced across them
Fan-outDifferent groups — each group receives every message independently
ExclusiveSingle consumer group with concurrency 1 — strict ordering

Message Filtering

orderEvents.subscribe({
  group: 'shipping',
  filter: (msg) => msg.event === 'order.paid',  // Only paid orders

  handler: async (ctx) => {
    await scheduleShipment(ctx.message.orderId)
    ctx.ack()
  },
})

Replay

// Replay messages from a specific offset or timestamp
await orderEvents.replay({
  group: 'analytics-service',
  from: new Date('2026-01-01'),
  // or: fromOffset: 42000
})

Next Steps

  • Webhook — Event-driven external endpoint distribution
  • Pipeline — Streaming ETL with backpressure

On this page