Distributed Worker
Decoupled publisher/consumer architecture for horizontal scaling
Distributed Worker
The Distributed Worker pattern separates publishers (API nodes) from consumers (worker nodes). Publishers use queue() without a handler. Consumers use worker() which only processes — no .add() method.
Architecture
┌──────────────────┐ ┌──────────┐ ┌──────────────────┐
│ API Server │────▶│ Redis/ │◀────│ Worker Server │
│ queue() — push │ │ Postgres │ │ worker() — pull │
│ zero CPU cost │ └──────────┘ │ heavy compute │
└──────────────────┘ └──────────────────┘Publisher Queue (API Side)
Use queue() without a handler to create a publisher that only pushes jobs. It consumes zero CPU and polling overhead.
import { queue } from 'oqronkit'
type VideoMetadata = {
videoId: string
s3ResourceUri: string
codec: 'h264' | 'hevc' | 'av1'
bitrate: number
}
// Publisher only — no handler, no polling engine
export const videoEncodeQueue = queue<VideoMetadata, string>({
name: 'video-encode-topic',
})Push jobs from your API routes:
import { videoEncodeQueue } from './triggers/video-encode.js'
app.post('/api/upload', async (req, res) => {
const job = await videoEncodeQueue.add(
{
videoId: `vid_${Date.now().toString(36)}`,
s3ResourceUri: req.body.filePath,
codec: 'hevc',
bitrate: 4500,
},
{ jobId: `vid-${req.body.fileHash}` } // Idempotency
)
res.json({ trackingId: job.id })
})Consumer Worker (Worker Side)
Use worker() to create a consumer. Workers have no .add() method — they only process.
import { worker } from 'oqronkit'
export const videoEncodeWorker = worker<VideoMetadata, string>({
topic: 'video-encode-topic',
concurrency: 2, // 2 heavy videos per server
guaranteedWorker: true, // Heartbeat crash-safety
heartbeatMs: 5_000,
lockTtlMs: 30_000,
retries: {
max: 1,
strategy: 'fixed',
baseDelay: 10_000,
},
hooks: {
onSuccess: (job, finalUrl) => {
console.log(`Video ${job.data.videoId} uploaded to: ${finalUrl}`)
},
onFail: (job, err) => {
console.error(`Video ${job.data.videoId} failed:`, err)
},
},
handler: async (ctx) => {
const { videoId, codec, s3ResourceUri } = ctx.data
ctx.log('info', `Fetching source from ${s3ResourceUri}`)
ctx.progress(10, 'Downloading source')
await download(s3ResourceUri)
if (ctx.signal.aborted) {
throw new Error('Transcoding cancelled')
}
ctx.progress(40, `Transcoding to ${codec}`)
await transcode(s3ResourceUri, codec)
ctx.progress(90, 'Uploading chunks')
await uploadToCDN(videoId)
ctx.progress(100, 'Done')
return `https://cdn.example.com/videos/${videoId}.mp4`
},
})Bootstrap
API and worker servers initialize OqronKit independently with only the modules they need:
import { OqronKit, queueModule } from 'oqronkit'
await OqronKit.init({
config: { modules: [queueModule] }, // Push only — no worker polling
})import { OqronKit, workerModule } from 'oqronkit'
await OqronKit.init({
config: { modules: [workerModule] }, // Process only
})Worker Configuration
| Option | Type | Default | Description |
|---|---|---|---|
topic | string | required | Queue name to listen on |
handler | (ctx) => Promise<R> | required | Job processor function |
concurrency | number | 1 | Max parallel job processing |
guaranteedWorker | boolean | true | Heartbeat crash-safety — set false for lightweight tasks |
heartbeatMs | number | 5000 | Lock renewal interval |
lockTtlMs | number | 30000 | Lock time-to-live |
retries | RetryConfig | — | Retry policy |
hooks | { onSuccess, onFail } | — | Lifecycle hooks |
Crash Recovery Flow
- Worker claims job → writes
workerId+ TTL to lock adapter - Worker runs heartbeat
setIntervalto renew lock - If worker crashes (SIGKILL/OOM), heartbeat stops
- Lock expires in Redis/Postgres after
lockTtlMs StallDetectorfinds expired lock → marks job asstalled- Job is re-queued and routed to a healthy worker within ~15 seconds
Next Steps
- Crash Safety — Deep-dive into heartbeat locks and stall detection
- Scheduler — Add time-based job execution