PubSub
Durable topics and fan-out consumer groups
PubSub
The PubSub module provides durable publish/subscribe messaging with topic-based routing, consumer groups, and guaranteed at-least-once delivery.
Roadmap Module — PubSub is part of the v1.x Enterprise release. This page documents the planned API.
Planned API
Define a Topic
import { topic } from 'oqronkit'
const orderEvents = topic<OrderEvent>({
name: 'order-events',
// Message retention
retention: {
maxAge: '7d', // Keep messages for 7 days
maxCount: 100_000, // Or max 100k messages
},
// Dead letter for unprocessable messages
deadLetter: {
enabled: true,
maxRetries: 5,
},
})Publish Messages
// Publish a single message
await orderEvents.publish({
event: 'order.created',
orderId: 'ord_123',
amount: 99.99,
})
// Publish batch
await orderEvents.publishBatch([
{ event: 'order.created', orderId: 'ord_123', amount: 99.99 },
{ event: 'order.created', orderId: 'ord_124', amount: 149.50 },
])Subscribe with Consumer Groups
// Consumer group — each message is processed by exactly one consumer in the group
orderEvents.subscribe({
group: 'billing-service',
concurrency: 5,
handler: async (ctx) => {
const { event, orderId, amount } = ctx.message
await chargeBilling(orderId, amount)
ctx.ack() // Acknowledge processing
},
})
// Fan-out — each subscriber gets every message independently
orderEvents.subscribe({
group: 'analytics-service',
concurrency: 10,
handler: async (ctx) => {
await trackEvent(ctx.message)
ctx.ack()
},
})
// Both billing-service AND analytics-service receive every messageConsumer Group Semantics
| Pattern | Behavior |
|---|---|
| Competing consumers | Multiple instances in the same group — messages load-balanced across them |
| Fan-out | Different groups — each group receives every message independently |
| Exclusive | Single consumer group with concurrency 1 — strict ordering |
Message Filtering
orderEvents.subscribe({
group: 'shipping',
filter: (msg) => msg.event === 'order.paid', // Only paid orders
handler: async (ctx) => {
await scheduleShipment(ctx.message.orderId)
ctx.ack()
},
})Replay
// Replay messages from a specific offset or timestamp
await orderEvents.replay({
group: 'analytics-service',
from: new Date('2026-01-01'),
// or: fromOffset: 42000
})