Task Queue
Unified, monolithic queue where publisher and consumer live together
Task Queue
The Task Queue is OqronKit's simplest module. Publisher and consumer are defined together — perfect for monolithic apps and fast prototypes.
Basic Usage
import { queue } from 'oqronkit'
export const emailQueue = queue<
{ to: string; subject: string; body: string },
{ messageId: string }
>({
name: 'email-sender',
concurrency: 5,
handler: async (ctx) => {
ctx.log('info', `Sending "${ctx.data.subject}" to ${ctx.data.to}`)
ctx.progress(50, 'Dispatching via SES')
await sendEmail(ctx.data)
ctx.progress(100, 'Delivered')
return { messageId: `ses-${Date.now()}` }
},
})Adding Jobs
// Basic
await emailQueue.add({ to: 'user@example.com', subject: 'Welcome!', body: '...' })
// With options
await emailQueue.add(
{ to: 'user@example.com', subject: 'Welcome!', body: '...' },
{
jobId: 'welcome-user@example.com', // Idempotency key
delay: 60_000, // Process after 1 minute
priority: 1, // Lower = processed first
attempts: 5, // Retry count for this job
correlationId: 'req-123', // For distributed tracing
dependsOn: ['parent-job-id'], // Wait for parent to complete
}
)
// Bulk
await emailQueue.addBulk([
{ data: { to: 'a@ex.com', subject: 'Hi', body: '...' } },
{ data: { to: 'b@ex.com', subject: 'Hi', body: '...' }, opts: { priority: 1 } },
])Publisher-Only Queue
Omit the handler to create a publisher-only queue. A separate worker() on a different node consumes these jobs.
import { queue } from 'oqronkit'
// No handler — only pushes jobs, no CPU/polling overhead
export const videoQueue = queue<{ url: string }>({
name: 'video-encode',
})
await videoQueue.add({ url: 'https://...' })Configuration
| Option | Type | Default | Description |
|---|---|---|---|
name | string | required | Unique queue identifier |
handler | (ctx) => Promise<R> | optional | Processing function |
concurrency | number | 5 | Parallel execution limit per instance |
strategy | 'fifo' | 'lifo' | 'priority' | 'fifo' | Job ordering strategy |
guaranteedWorker | boolean | true | Heartbeat crash-safety — set false for lightweight tasks |
heartbeatMs | number | 5000 | Heartbeat renewal interval |
lockTtlMs | number | 30000 | Lock expiry for crash detection |
tags | string[] | [] | Tags for filtering and routing |
priority | number | 0 | Default job priority |
disabledBehavior | 'hold' | 'reject' | 'skip' | 'hold' | Action when queue is disabled |
v0.0.2: The queue-level
priorityis now persisted onjob.opts.priorityat creation time. This ensures retries, stall recovery, and rerun operations preserve the original priority through the full job lifecycle.
Retry & Dead Letter Queue
export const pdfQueue = queue<PdfInput, PdfOutput>({
name: 'pdf-generation',
concurrency: 1,
strategy: 'priority',
guaranteedWorker: true,
retries: {
max: 2,
strategy: 'exponential',
baseDelay: 3000, // 3s → 6s → 12s
maxDelay: 60_000,
},
deadLetter: {
enabled: true,
onDead: async (job) => {
await alertSlack(`PDF ${job.data.reportId} permanently failed`)
},
},
hooks: {
onSuccess: async (job, result) => {
console.log(`✅ PDF ready: ${result.pdfUrl}`)
},
onFail: async (job, error) => {
console.error(`❌ PDF failed: ${error.message}`)
},
},
handler: async (ctx) => {
// ctx.discard() — permanently fail, skip all retries
// ctx.signal.aborted — check for cancellation
// ctx.progress(percent, label) — track progress
return { pdfUrl: '...', pages: 24 }
},
})v0.0.2: When a job is delayed for retry,
job.runAtis now set to the scheduled retry time. This makes retry schedules visible in the dashboard, queryable viagetJob(), and correctly handled by the reconciliation engine.
Handler Context (ctx)
| Property | Type | Description |
|---|---|---|
ctx.id | string | Job UUID |
ctx.data | T | Typed input payload |
ctx.signal | AbortSignal | Fires when job is cancelled |
ctx.progress(pct, label) | Function | Update progress (0-100) |
ctx.log(level, msg, meta?) | Function | Structured logging |
ctx.discard() | Function | Permanently fail — skip all retries |
Queue Management
await emailQueue.pause() // Stop processing
await emailQueue.resume() // Resume processing
const paused = await emailQueue.isPaused()
await emailQueue.drain() // Wait for active jobs, then pause
const deleted = await emailQueue.obliterate() // Delete all jobs
const jobs = await emailQueue.getJobs({ status: 'waiting', limit: 50 })
const count = await emailQueue.count('active')
const job = await emailQueue.getJob('job-id-123')Next Steps
- Distributed Worker — Decouple publishers from processors
- Crash Safety — How heartbeat locks protect your jobs