OqronKitOqronKit

Distributed Worker

Decoupled publisher/consumer architecture for horizontal scaling

Distributed Worker

The Distributed Worker pattern separates publishers (API nodes) from consumers (worker nodes). Publishers use queue() without a handler. Consumers use worker() which only processes — no .add() method.

Architecture

┌──────────────────┐     ┌──────────┐     ┌──────────────────┐
│  API Server      │────▶│  Redis/  │◀────│  Worker Server   │
│  queue() — push  │     │ Postgres │     │  worker() — pull │
│  zero CPU cost   │     └──────────┘     │  heavy compute   │
└──────────────────┘                      └──────────────────┘

Publisher Queue (API Side)

Use queue() without a handler to create a publisher that only pushes jobs. It consumes zero CPU and polling overhead.

triggers/video-encode.ts
import { queue } from 'oqronkit'

type VideoMetadata = {
  videoId: string
  s3ResourceUri: string
  codec: 'h264' | 'hevc' | 'av1'
  bitrate: number
}

// Publisher only — no handler, no polling engine
export const videoEncodeQueue = queue<VideoMetadata, string>({
  name: 'video-encode-topic',
})

Push jobs from your API routes:

api-routes.ts
import { videoEncodeQueue } from './triggers/video-encode.js'

app.post('/api/upload', async (req, res) => {
  const job = await videoEncodeQueue.add(
    {
      videoId: `vid_${Date.now().toString(36)}`,
      s3ResourceUri: req.body.filePath,
      codec: 'hevc',
      bitrate: 4500,
    },
    { jobId: `vid-${req.body.fileHash}` } // Idempotency
  )
  res.json({ trackingId: job.id })
})

Consumer Worker (Worker Side)

Use worker() to create a consumer. Workers have no .add() method — they only process.

triggers/video-worker.ts
import { worker } from 'oqronkit'

export const videoEncodeWorker = worker<VideoMetadata, string>({
  topic: 'video-encode-topic',

  concurrency: 2,              // 2 heavy videos per server
  guaranteedWorker: true,       // Heartbeat crash-safety
  heartbeatMs: 5_000,
  lockTtlMs: 30_000,

  retries: {
    max: 1,
    strategy: 'fixed',
    baseDelay: 10_000,
  },

  hooks: {
    onSuccess: (job, finalUrl) => {
      console.log(`Video ${job.data.videoId} uploaded to: ${finalUrl}`)
    },
    onFail: (job, err) => {
      console.error(`Video ${job.data.videoId} failed:`, err)
    },
  },

  handler: async (ctx) => {
    const { videoId, codec, s3ResourceUri } = ctx.data

    ctx.log('info', `Fetching source from ${s3ResourceUri}`)
    ctx.progress(10, 'Downloading source')
    await download(s3ResourceUri)

    if (ctx.signal.aborted) {
      throw new Error('Transcoding cancelled')
    }

    ctx.progress(40, `Transcoding to ${codec}`)
    await transcode(s3ResourceUri, codec)

    ctx.progress(90, 'Uploading chunks')
    await uploadToCDN(videoId)

    ctx.progress(100, 'Done')
    return `https://cdn.example.com/videos/${videoId}.mp4`
  },
})

Bootstrap

API and worker servers initialize OqronKit independently with only the modules they need:

api-server.ts
import { OqronKit, queueModule } from 'oqronkit'

await OqronKit.init({
  config: { modules: [queueModule] }, // Push only — no worker polling
})
worker-server.ts
import { OqronKit, workerModule } from 'oqronkit'

await OqronKit.init({
  config: { modules: [workerModule] }, // Process only
})

Worker Configuration

OptionTypeDefaultDescription
topicstringrequiredQueue name to listen on
handler(ctx) => Promise<R>requiredJob processor function
concurrencynumber1Max parallel job processing
guaranteedWorkerbooleantrueHeartbeat crash-safety — set false for lightweight tasks
heartbeatMsnumber5000Lock renewal interval
lockTtlMsnumber30000Lock time-to-live
retriesRetryConfigRetry policy
hooks{ onSuccess, onFail }Lifecycle hooks

Crash Recovery Flow

  1. Worker claims job → writes workerId + TTL to lock adapter
  2. Worker runs heartbeat setInterval to renew lock
  3. If worker crashes (SIGKILL/OOM), heartbeat stops
  4. Lock expires in Redis/Postgres after lockTtlMs
  5. StallDetector finds expired lock → marks job as stalled
  6. Job is re-queued and routed to a healthy worker within ~15 seconds

Next Steps

  • Crash Safety — Deep-dive into heartbeat locks and stall detection
  • Scheduler — Add time-based job execution

On this page