OqronKitOqronKit

Crash Safety

Heartbeat locks, stall detection, and graceful shutdown

Crash Safety

OqronKit guarantees that no job is ever lost — even if your process is killed mid-execution. The guaranteedWorker flag is the foundation of this guarantee and is available across every module in OqronKit.

guaranteedWorker — Universal Guarantee

The guaranteedWorker option enables heartbeat-based crash-safe execution. When enabled, a HeartbeatWorker atomically claims each job and continuously renews a distributed lock while the handler runs. If the process dies, the lock expires and the job is automatically recovered.

Available in Every Module

ModuleOptionDefault
queue()guaranteedWorkertrue — enabled unless explicitly set to false
worker()guaranteedWorkertrue — enabled unless explicitly set to false
cron()guaranteedWorkerfalse — opt-in for critical crons
schedule()guaranteedWorkerfalse — opt-in for critical schedules
webhook()guaranteedWorkertrue — delivery guarantees
batch()guaranteedWorkertrue — per-batch lock
pipeline()guaranteedWorkerPer-stage opt-in
ingest()guaranteedWorkerPer-step durability

Usage Across Modules

import { queue, worker, cron, schedule, webhook } from 'oqronkit'

// Queue — enabled by default, disable for fast fire-and-forget tasks
const fastQueue = queue({
  name: 'analytics-events',
  guaranteedWorker: false,  // No heartbeat needed for lightweight tasks
  handler: async (ctx) => { /* ... */ },
})

// Queue — default is true, critical financial processing
const billingQueue = queue({
  name: 'billing',
  heartbeatMs: 3_000,    // Renew lock every 3s
  lockTtlMs: 15_000,     // Lock expires after 15s if heartbeat stops
  handler: async (ctx) => { /* ... */ },
})

// Worker — default is true, heavy compute
const videoWorker = worker({
  topic: 'video-encode',
  guaranteedWorker: true,
  heartbeatMs: 5_000,
  lockTtlMs: 30_000,
  handler: async (ctx) => { /* ... */ },
})

// Cron — opt-in for critical scheduled jobs
const billingCron = cron({
  name: 'monthly-billing',
  expression: '0 0 1 * *',
  guaranteedWorker: true,   // Critical — must survive crashes
  heartbeatMs: 5_000,
  lockTtlMs: 20_000,
  handler: async (ctx) => { /* ... */ },
})

// Schedule — opt-in for critical one-off tasks
const migration = schedule({
  name: 'data-migration',
  runAt: new Date('2026-12-01'),
  guaranteedWorker: true,   // Must complete — no lost migrations
  handler: async (ctx) => { /* ... */ },
})

How It Works

  1. Worker atomically claims a job → writes workerId + TTL to the Lock adapter
  2. A setInterval heartbeat renews the lock every heartbeatMs while processing
  3. If the process crashes (SIGKILL / OOM), the heartbeat stops
  4. The lock expires in Redis/Postgres after lockTtlMs
  5. The internal StallDetector finds the expired lock → marks job as stalled
  6. Job is re-queued and routed to a healthy worker within ~15 seconds
┌─ Worker Node A ────────────────────────────────────────┐
│  1. Claim job → lock(key, workerId, ttl=30s)          │
│  2. Start heartbeat → renew lock every 5s              │
│  3. Execute handler...                                 │
│  💥 CRASH (SIGKILL / OOM)                              │
└────────────────────────────────────────────────────────┘
                        ⏱️ Lock expires after 30s
┌─ Stall Detector ──────────────────────────────────────┐
│  4. Scan for expired locks                            │
│  5. Mark job as "stalled"                             │
│  6. Re-queue to broker                                │
└───────────────────────────────────────────────────────┘
┌─ Worker Node B ────────────────────────────────────────┐
│  7. Claim re-queued job → execute handler              │
│  8. Job completes successfully ✅                      │
└────────────────────────────────────────────────────────┘

Tuning Guide

ScenarioheartbeatMslockTtlMsWhy
Fast tasks (< 30s)500030000Standard protection
Heavy compute (minutes)1000060000Longer TTL prevents premature stall
Critical financial300015000Aggressive detection, fast recovery
Spot instances500020000AWS/GCP can kill instances anytime

Rule of thumb: lockTtlMs should be at least 3 × heartbeatMs to tolerate temporary network delays.

Graceful Shutdown

When SIGINT or SIGTERM is received, OqronKit:

  1. Stops accepting new jobs from all modules
  2. Waits for active jobs to drain (configurable timeout)
  3. Releases all held locks
  4. Stops the TelemetryManager
  5. Cleans up the OqronRegistry
// Manual stop
await OqronKit.stop()

v0.0.2: The shutdown timeout handle is now properly .unref()'d and cleaned up, preventing Node.js from hanging after stop. Signal handlers bind correctly to OqronKit.stop() — a this-binding issue in v0.0.1 has been resolved.

AbortController Support

Active jobs can be cancelled mid-execution:

import { OqronManager } from 'oqronkit'

const mgr = OqronManager.from(OqronKit.getConfig())
await mgr.cancelJob('job-id') // Fires AbortSignal

Handlers should check ctx.signal.aborted periodically:

handler: async (ctx) => {
  for (const chunk of chunks) {
    if (ctx.signal.aborted) return
    await processChunk(chunk)
    ctx.progress((i / chunks.length) * 100)
  }
}

v0.0.2: Webhook HTTP delivery now receives the abort signal. Calling cancelJob() on an active webhook delivery immediately aborts the in-flight HTTP request — no more waiting for the full timeout.

Cancel Behavior

cancelJob() writes a "cancelled" tombstone to storage instead of deleting the job record. This preserves the full audit trail (timeline, error, finishedAt). The retention system handles eventual cleanup.

await mgr.cancelJob('job-id')
// Job still exists with status: "cancelled"
// Visible in dashboard and queryable via getJob()

Idempotency

Handlers will run more than once during crash scenarios. Use jobId as an idempotency key:

await emailQueue.add(
  { to: 'user@example.com' },
  { jobId: 'welcome-user@example.com' } // Prevents duplicate processing
)

Disabled Behavior Engine

All modules support disabledBehavior for controlled pausing:

BehaviorWhen DisabledBest For
'hold'Accepts jobs in paused state, resumes on re-enableBilling, order processing
'skip'Silently drops jobsCache purges, analytics
'reject'Throws error on .add()API rate limiting feedback

Next Steps

  • Architecture — Adapter-driven design and DI container
  • Adapters — Storage, Broker, and Lock adapter configuration

On this page