Task Queue
Unified, monolithic queue where publisher and consumer live together
Task Queue
The Task Queue is OqronKit's simplest module. Publisher and consumer are defined together — perfect for monolithic apps and fast prototypes.
Working example → See a complete Task Queue implementation with image processing, email dispatch, PDF generation, ETL DAG pipeline, and crash-safe workers: apps/backend/src/triggers/task-queues.ts
How It Works
When you call queue() with a handler, OqronKit creates a monolithic queue where both the producer (.add()) and the consumer (polling engine) live in the same process.
Job Lifecycle
.add(data, opts)
│
├─ 1. Create OqronJob record in Storage (status: "waiting")
├─ 2. Publish job ID to Broker (with optional delay/priority)
│
│ ┌──── Poll Loop (runs every heartbeatMs) ────┐
│ │ │
├──┤ 3. Check concurrency: free slots available? │
│ │ Check throttle gate: budget remaining? │
│ │ Check lag monitor: event loop healthy? │
│ │ │
│ │ 4. Broker.claim(queueName, limit, strategy) │
│ │ ── fifo / lifo / priority ordering ── │
│ └─────────────────────────────────────────────┘
│
├─ 5. Acquire heartbeat lock (Lock adapter, TTL-based)
├─ 6. Mark job as "active" in Storage
├─ 7. Start heartbeat renewal interval
├─ 8. Execute handler(ctx) with AbortSignal
│
├─ SUCCESS:
│ ├─ Mark job "completed" in Storage
│ ├─ Ack job in Broker (removes from queue)
│ ├─ Stop heartbeat, release lock
│ ├─ Run onSuccess hook
│ └─ Apply retention policy (removeOnComplete)
│
└─ FAILURE:
├─ Check retry config: attempts remaining?
│ YES → Nack to Broker with backoff delay
│ Mark job "delayed", set runAt
│ NO → Mark job "failed"
│ Check DLQ: enabled?
│ YES → Run onDead hook
├─ Stop heartbeat, release lock
└─ Run onFail hookJob State Machine
| State | Meaning |
|---|---|
waiting | Queued in broker, ready to be claimed |
delayed | Scheduled for future processing (delay or retry backoff) |
active | Claimed by a worker, handler executing |
completed | Handler returned successfully |
failed | All retries exhausted or permanently discarded |
stalled | Heartbeat lock expired — worker presumed crashed |
paused | Queue is disabled; job held for later |
waiting-children | Blocked on parent job dependencies |
cancelled | Explicitly cancelled via admin API |
Crash Safety
When guaranteedWorker: true (default):
- Heartbeat Lock — The worker atomically claims a lock via the Lock adapter with a TTL. A
setIntervalrenews it everyheartbeatMs. - Stall Detector — A background scanner checks all active locks. If a lock has expired (worker crashed), the job is marked
stalledand nacked back to the broker. - Cross-Node Scanner — Optional (
crossNodeStallScanner: true). Scans ALL active jobs in storage, not just local ones — catches orphans from nodes that crashed without cleanup. - Reconciliation Engine — Optional (
reconciliation: true). Heals split-brain scenarios where a job was saved to storage but the broker nack failed.
Concurrency Model
Concurrency is per-queue, per-process. If you define concurrency: 5 and run 3 worker processes, the system can process up to 15 jobs in parallel across the cluster.
Process A: queue("emails", concurrency: 5) → max 5 active
Process B: queue("emails", concurrency: 5) → max 5 active
Process C: queue("emails", concurrency: 5) → max 5 active
Total: max 15 activeEach queue is isolated — concurrency: 5 on the email queue does not affect the PDF queue's slots.
Basic Usage
import { queue } from 'oqronkit'
export const emailQueue = queue<
{ to: string; subject: string; body: string },
{ messageId: string }
>({
name: 'email-sender',
concurrency: 5,
handler: async (ctx) => {
ctx.log('info', `Sending "${ctx.data.subject}" to ${ctx.data.to}`)
ctx.progress(50, 'Dispatching via SES')
await sendEmail(ctx.data)
ctx.progress(100, 'Delivered')
return { messageId: `ses-${Date.now()}` }
},
})Adding Jobs
// Basic
await emailQueue.add({ to: 'user@example.com', subject: 'Welcome!', body: '...' })
// With options
await emailQueue.add(
{ to: 'user@example.com', subject: 'Welcome!', body: '...' },
{
jobId: 'welcome-user@example.com', // Idempotency key
delay: 60_000, // Process after 1 minute
priority: 1, // Lower = processed first
attempts: 5, // Retry count for this job
correlationId: 'req-123', // For distributed tracing
dependsOn: ['parent-job-id'], // Wait for parent to complete
}
)
// Bulk
await emailQueue.addBulk([
{ data: { to: 'a@ex.com', subject: 'Hi', body: '...' } },
{ data: { to: 'b@ex.com', subject: 'Hi', body: '...' }, opts: { priority: 1 } },
])Publisher-Only Queue
Omit the handler to create a publisher-only queue. A separate worker() on a different node consumes these jobs.
import { queue } from 'oqronkit'
// No handler — only pushes jobs, no CPU/polling overhead
export const videoQueue = queue<{ url: string }>({
name: 'video-encode',
})
await videoQueue.add({ url: 'https://...' })Configuration
| Option | Type | Default | Description |
|---|---|---|---|
name | string | required | Unique queue identifier |
handler | (ctx) => Promise<R> | optional | Processing function |
concurrency | number | 5 | Parallel execution limit per instance |
strategy | 'fifo' | 'lifo' | 'priority' | 'fifo' | Job ordering strategy |
guaranteedWorker | boolean | true | Heartbeat crash-safety — set false for lightweight tasks |
heartbeatMs | number | 5000 | Heartbeat renewal interval |
lockTtlMs | number | 30000 | Lock expiry for crash detection |
tags | string[] | [] | Tags for filtering and routing |
priority | number | 0 | Default job priority |
disabledBehavior | 'hold' | 'reject' | 'skip' | 'hold' | Action when queue is disabled |
v0.0.2: The queue-level priority is now persisted on job.opts.priority at creation time. This ensures retries, stall recovery, and rerun operations preserve the original priority through the full job lifecycle.
Throttle
Use throttle to cap the dispatch rate — how many jobs are started per time window. Unlike concurrency (which limits parallelism), throttle controls throughput.
// Send at most 20 emails per 2 seconds, with 5 running in parallel
const emailQueue = queue<{ to: string; subject: string; body: string }>({
name: 'email-sender',
concurrency: 5,
throttle: { max: 20, duration: 2_000 },
handler: async (ctx) => {
await sendEmail(ctx.data)
return { sent: true }
},
})| Option | Type | Description |
|---|---|---|
throttle.max | number | Maximum dispatches allowed per window |
throttle.duration | number | Window duration in milliseconds |
How it works: The throttle gate runs before Broker.claim() — it reduces the claim limit so no jobs are fetched and then returned. Zero wasted broker round-trips.
Per-process vs distributed: throttle is per-process. For cluster-wide rate limiting, compose with rateLimiter — both can be used together.
Retry & Dead Letter Queue
export const pdfQueue = queue<PdfInput, PdfOutput>({
name: 'pdf-generation',
concurrency: 1,
strategy: 'priority',
guaranteedWorker: true,
retries: {
max: 2,
strategy: 'exponential',
baseDelay: 3000, // 3s → 6s → 12s
maxDelay: 60_000,
},
deadLetter: {
enabled: true,
onDead: async (job) => {
await alertSlack(`PDF ${job.data.reportId} permanently failed`)
},
},
hooks: {
onSuccess: async (job, result) => {
console.log(`✅ PDF ready: ${result.pdfUrl}`)
},
onFail: async (job, error) => {
console.error(`❌ PDF failed: ${error.message}`)
},
},
handler: async (ctx) => {
// ctx.discard() — permanently fail, skip all retries
// ctx.signal.aborted — check for cancellation
// ctx.progress(percent, label) — track progress
return { pdfUrl: '...', pages: 24 }
},
})v0.0.2: When a job is delayed for retry, job.runAt is now set to the scheduled retry time. This makes retry schedules visible in the dashboard, queryable via getJob(), and correctly handled by the reconciliation engine.
Handler Context (ctx)
| Property | Type | Description |
|---|---|---|
ctx.id | string | Job UUID |
ctx.data | T | Typed input payload |
ctx.signal | AbortSignal | Fires when job is cancelled |
ctx.progress(pct, label) | Function | Update progress (0-100) |
ctx.log(level, msg, meta?) | Function | Structured logging |
ctx.discard() | Function | Permanently fail — skip all retries |
Queue Management
await emailQueue.pause() // Stop processing
await emailQueue.resume() // Resume processing
const paused = await emailQueue.isPaused()
await emailQueue.drain() // Wait for active jobs, then pause
const deleted = await emailQueue.obliterate() // Delete all jobs
const jobs = await emailQueue.getJobs({ status: 'waiting', limit: 50 })
const count = await emailQueue.count('active')
const job = await emailQueue.getJob('job-id-123')Next Steps
- Distributed Worker — Decouple publishers from processors
- Crash Safety — How heartbeat locks protect your jobs