OqronKitOqronKit

Task Queue

Unified, monolithic queue where publisher and consumer live together

Task Queue

The Task Queue is OqronKit's simplest module. Publisher and consumer are defined together — perfect for monolithic apps and fast prototypes.

Basic Usage

triggers/email-queue.ts
import { queue } from 'oqronkit'

export const emailQueue = queue<
  { to: string; subject: string; body: string },
  { messageId: string }
>({
  name: 'email-sender',
  concurrency: 5,

  handler: async (ctx) => {
    ctx.log('info', `Sending "${ctx.data.subject}" to ${ctx.data.to}`)
    ctx.progress(50, 'Dispatching via SES')
    await sendEmail(ctx.data)

    ctx.progress(100, 'Delivered')
    return { messageId: `ses-${Date.now()}` }
  },
})

Adding Jobs

// Basic
await emailQueue.add({ to: 'user@example.com', subject: 'Welcome!', body: '...' })

// With options
await emailQueue.add(
  { to: 'user@example.com', subject: 'Welcome!', body: '...' },
  {
    jobId: 'welcome-user@example.com',  // Idempotency key
    delay: 60_000,                       // Process after 1 minute
    priority: 1,                         // Lower = processed first
    attempts: 5,                         // Retry count for this job
    correlationId: 'req-123',            // For distributed tracing
    dependsOn: ['parent-job-id'],        // Wait for parent to complete
  }
)

// Bulk
await emailQueue.addBulk([
  { data: { to: 'a@ex.com', subject: 'Hi', body: '...' } },
  { data: { to: 'b@ex.com', subject: 'Hi', body: '...' }, opts: { priority: 1 } },
])

Publisher-Only Queue

Omit the handler to create a publisher-only queue. A separate worker() on a different node consumes these jobs.

triggers/video-queue.ts
import { queue } from 'oqronkit'

// No handler — only pushes jobs, no CPU/polling overhead
export const videoQueue = queue<{ url: string }>({
  name: 'video-encode',
})

await videoQueue.add({ url: 'https://...' })

Configuration

OptionTypeDefaultDescription
namestringrequiredUnique queue identifier
handler(ctx) => Promise<R>optionalProcessing function
concurrencynumber5Parallel execution limit per instance
strategy'fifo' | 'lifo' | 'priority''fifo'Job ordering strategy
guaranteedWorkerbooleantrueHeartbeat crash-safety — set false for lightweight tasks
heartbeatMsnumber5000Heartbeat renewal interval
lockTtlMsnumber30000Lock expiry for crash detection
tagsstring[][]Tags for filtering and routing
prioritynumber0Default job priority
disabledBehavior'hold' | 'reject' | 'skip''hold'Action when queue is disabled

v0.0.2: The queue-level priority is now persisted on job.opts.priority at creation time. This ensures retries, stall recovery, and rerun operations preserve the original priority through the full job lifecycle.

Retry & Dead Letter Queue

export const pdfQueue = queue<PdfInput, PdfOutput>({
  name: 'pdf-generation',
  concurrency: 1,
  strategy: 'priority',
  guaranteedWorker: true,

  retries: {
    max: 2,
    strategy: 'exponential',
    baseDelay: 3000,   // 3s → 6s → 12s
    maxDelay: 60_000,
  },

  deadLetter: {
    enabled: true,
    onDead: async (job) => {
      await alertSlack(`PDF ${job.data.reportId} permanently failed`)
    },
  },

  hooks: {
    onSuccess: async (job, result) => {
      console.log(`✅ PDF ready: ${result.pdfUrl}`)
    },
    onFail: async (job, error) => {
      console.error(`❌ PDF failed: ${error.message}`)
    },
  },

  handler: async (ctx) => {
    // ctx.discard() — permanently fail, skip all retries
    // ctx.signal.aborted — check for cancellation
    // ctx.progress(percent, label) — track progress
    return { pdfUrl: '...', pages: 24 }
  },
})

v0.0.2: When a job is delayed for retry, job.runAt is now set to the scheduled retry time. This makes retry schedules visible in the dashboard, queryable via getJob(), and correctly handled by the reconciliation engine.

Handler Context (ctx)

PropertyTypeDescription
ctx.idstringJob UUID
ctx.dataTTyped input payload
ctx.signalAbortSignalFires when job is cancelled
ctx.progress(pct, label)FunctionUpdate progress (0-100)
ctx.log(level, msg, meta?)FunctionStructured logging
ctx.discard()FunctionPermanently fail — skip all retries

Queue Management

await emailQueue.pause()              // Stop processing
await emailQueue.resume()             // Resume processing
const paused = await emailQueue.isPaused()
await emailQueue.drain()              // Wait for active jobs, then pause
const deleted = await emailQueue.obliterate() // Delete all jobs

const jobs = await emailQueue.getJobs({ status: 'waiting', limit: 50 })
const count = await emailQueue.count('active')
const job = await emailQueue.getJob('job-id-123')

Next Steps

On this page