OqronKitOqronKit

Task Queue

Unified, monolithic queue where publisher and consumer live together

Task Queue

The Task Queue is OqronKit's simplest module. Publisher and consumer are defined together — perfect for monolithic apps and fast prototypes.

Working example → See a complete Task Queue implementation with image processing, email dispatch, PDF generation, ETL DAG pipeline, and crash-safe workers: apps/backend/src/triggers/task-queues.ts

How It Works

When you call queue() with a handler, OqronKit creates a monolithic queue where both the producer (.add()) and the consumer (polling engine) live in the same process.

Job Lifecycle

.add(data, opts)

  ├─ 1. Create OqronJob record in Storage (status: "waiting")
  ├─ 2. Publish job ID to Broker (with optional delay/priority)

  │  ┌──── Poll Loop (runs every heartbeatMs) ────┐
  │  │                                             │
  ├──┤ 3. Check concurrency: free slots available? │
  │  │    Check throttle gate: budget remaining?    │
  │  │    Check lag monitor: event loop healthy?    │
  │  │                                             │
  │  │ 4. Broker.claim(queueName, limit, strategy) │
  │  │    ── fifo / lifo / priority ordering ──     │
  │  └─────────────────────────────────────────────┘

  ├─ 5. Acquire heartbeat lock (Lock adapter, TTL-based)
  ├─ 6. Mark job as "active" in Storage
  ├─ 7. Start heartbeat renewal interval
  ├─ 8. Execute handler(ctx) with AbortSignal

  ├─ SUCCESS:
  │    ├─ Mark job "completed" in Storage
  │    ├─ Ack job in Broker (removes from queue)
  │    ├─ Stop heartbeat, release lock
  │    ├─ Run onSuccess hook
  │    └─ Apply retention policy (removeOnComplete)

  └─ FAILURE:
       ├─ Check retry config: attempts remaining?
       │   YES → Nack to Broker with backoff delay
       │         Mark job "delayed", set runAt
       │   NO  → Mark job "failed"
       │         Check DLQ: enabled?
       │           YES → Run onDead hook
       ├─ Stop heartbeat, release lock
       └─ Run onFail hook

Job State Machine

StateMeaning
waitingQueued in broker, ready to be claimed
delayedScheduled for future processing (delay or retry backoff)
activeClaimed by a worker, handler executing
completedHandler returned successfully
failedAll retries exhausted or permanently discarded
stalledHeartbeat lock expired — worker presumed crashed
pausedQueue is disabled; job held for later
waiting-childrenBlocked on parent job dependencies
cancelledExplicitly cancelled via admin API

Crash Safety

When guaranteedWorker: true (default):

  1. Heartbeat Lock — The worker atomically claims a lock via the Lock adapter with a TTL. A setInterval renews it every heartbeatMs.
  2. Stall Detector — A background scanner checks all active locks. If a lock has expired (worker crashed), the job is marked stalled and nacked back to the broker.
  3. Cross-Node Scanner — Optional (crossNodeStallScanner: true). Scans ALL active jobs in storage, not just local ones — catches orphans from nodes that crashed without cleanup.
  4. Reconciliation Engine — Optional (reconciliation: true). Heals split-brain scenarios where a job was saved to storage but the broker nack failed.

Concurrency Model

Concurrency is per-queue, per-process. If you define concurrency: 5 and run 3 worker processes, the system can process up to 15 jobs in parallel across the cluster.

Process A: queue("emails", concurrency: 5)  → max 5 active
Process B: queue("emails", concurrency: 5)  → max 5 active
Process C: queue("emails", concurrency: 5)  → max 5 active
                                       Total: max 15 active

Each queue is isolated — concurrency: 5 on the email queue does not affect the PDF queue's slots.

Basic Usage

triggers/email-queue.ts
import { queue } from 'oqronkit'

export const emailQueue = queue<
  { to: string; subject: string; body: string },
  { messageId: string }
>({
  name: 'email-sender',
  concurrency: 5,

  handler: async (ctx) => {
    ctx.log('info', `Sending "${ctx.data.subject}" to ${ctx.data.to}`)
    ctx.progress(50, 'Dispatching via SES')
    await sendEmail(ctx.data)

    ctx.progress(100, 'Delivered')
    return { messageId: `ses-${Date.now()}` }
  },
})

Adding Jobs

// Basic
await emailQueue.add({ to: 'user@example.com', subject: 'Welcome!', body: '...' })

// With options
await emailQueue.add(
  { to: 'user@example.com', subject: 'Welcome!', body: '...' },
  {
    jobId: 'welcome-user@example.com',  // Idempotency key
    delay: 60_000,                       // Process after 1 minute
    priority: 1,                         // Lower = processed first
    attempts: 5,                         // Retry count for this job
    correlationId: 'req-123',            // For distributed tracing
    dependsOn: ['parent-job-id'],        // Wait for parent to complete
  }
)

// Bulk
await emailQueue.addBulk([
  { data: { to: 'a@ex.com', subject: 'Hi', body: '...' } },
  { data: { to: 'b@ex.com', subject: 'Hi', body: '...' }, opts: { priority: 1 } },
])

Publisher-Only Queue

Omit the handler to create a publisher-only queue. A separate worker() on a different node consumes these jobs.

triggers/video-queue.ts
import { queue } from 'oqronkit'

// No handler — only pushes jobs, no CPU/polling overhead
export const videoQueue = queue<{ url: string }>({
  name: 'video-encode',
})

await videoQueue.add({ url: 'https://...' })

Configuration

OptionTypeDefaultDescription
namestringrequiredUnique queue identifier
handler(ctx) => Promise<R>optionalProcessing function
concurrencynumber5Parallel execution limit per instance
strategy'fifo' | 'lifo' | 'priority''fifo'Job ordering strategy
guaranteedWorkerbooleantrueHeartbeat crash-safety — set false for lightweight tasks
heartbeatMsnumber5000Heartbeat renewal interval
lockTtlMsnumber30000Lock expiry for crash detection
tagsstring[][]Tags for filtering and routing
prioritynumber0Default job priority
disabledBehavior'hold' | 'reject' | 'skip''hold'Action when queue is disabled

v0.0.2: The queue-level priority is now persisted on job.opts.priority at creation time. This ensures retries, stall recovery, and rerun operations preserve the original priority through the full job lifecycle.

Throttle

Use throttle to cap the dispatch rate — how many jobs are started per time window. Unlike concurrency (which limits parallelism), throttle controls throughput.

// Send at most 20 emails per 2 seconds, with 5 running in parallel
const emailQueue = queue<{ to: string; subject: string; body: string }>({
  name: 'email-sender',
  concurrency: 5,
  throttle: { max: 20, duration: 2_000 },

  handler: async (ctx) => {
    await sendEmail(ctx.data)
    return { sent: true }
  },
})
OptionTypeDescription
throttle.maxnumberMaximum dispatches allowed per window
throttle.durationnumberWindow duration in milliseconds

How it works: The throttle gate runs before Broker.claim() — it reduces the claim limit so no jobs are fetched and then returned. Zero wasted broker round-trips.

Per-process vs distributed: throttle is per-process. For cluster-wide rate limiting, compose with rateLimiter — both can be used together.

Retry & Dead Letter Queue

export const pdfQueue = queue<PdfInput, PdfOutput>({
  name: 'pdf-generation',
  concurrency: 1,
  strategy: 'priority',
  guaranteedWorker: true,

  retries: {
    max: 2,
    strategy: 'exponential',
    baseDelay: 3000,   // 3s → 6s → 12s
    maxDelay: 60_000,
  },

  deadLetter: {
    enabled: true,
    onDead: async (job) => {
      await alertSlack(`PDF ${job.data.reportId} permanently failed`)
    },
  },

  hooks: {
    onSuccess: async (job, result) => {
      console.log(`✅ PDF ready: ${result.pdfUrl}`)
    },
    onFail: async (job, error) => {
      console.error(`❌ PDF failed: ${error.message}`)
    },
  },

  handler: async (ctx) => {
    // ctx.discard() — permanently fail, skip all retries
    // ctx.signal.aborted — check for cancellation
    // ctx.progress(percent, label) — track progress
    return { pdfUrl: '...', pages: 24 }
  },
})

v0.0.2: When a job is delayed for retry, job.runAt is now set to the scheduled retry time. This makes retry schedules visible in the dashboard, queryable via getJob(), and correctly handled by the reconciliation engine.

Handler Context (ctx)

PropertyTypeDescription
ctx.idstringJob UUID
ctx.dataTTyped input payload
ctx.signalAbortSignalFires when job is cancelled
ctx.progress(pct, label)FunctionUpdate progress (0-100)
ctx.log(level, msg, meta?)FunctionStructured logging
ctx.discard()FunctionPermanently fail — skip all retries

Queue Management

await emailQueue.pause()              // Stop processing
await emailQueue.resume()             // Resume processing
const paused = await emailQueue.isPaused()
await emailQueue.drain()              // Wait for active jobs, then pause
const deleted = await emailQueue.obliterate() // Delete all jobs

const jobs = await emailQueue.getJobs({ status: 'waiting', limit: 50 })
const count = await emailQueue.count('active')
const job = await emailQueue.getJob('job-id-123')

Next Steps

On this page