Examples
Real-world usage patterns and API reference for OqronKit modules
Examples
Production-ready patterns for common use cases. Each example shows the complete API usage with configuration, handler, and error handling.
Email Delivery System
Single-Node: Throttled Queue
import { queue, cron } from 'oqronkit'
// ── Types ──────────────────────────────────────────────
interface EmailPayload {
to: string
subject: string
body: string
templateId?: string
}
interface EmailResult {
messageId: string
delivered: boolean
}
// ── Queue: delivers emails with pacing ─────────────────
export const emailQueue = queue<EmailPayload, EmailResult>({
name: 'email-delivery',
// Parallelism: 5 emails sending simultaneously
concurrency: 5,
// Throughput: max 20 dispatched per 2 seconds (10/s)
throttle: { max: 20, duration: 2_000 },
// Retry failed deliveries with exponential backoff
retries: {
max: 3,
strategy: 'exponential',
baseDelay: 5_000, // 5s, 10s, 20s
},
// Keep last 100 completed jobs for audit
keepHistory: 100,
keepFailedHistory: 500,
// Crash-safe: heartbeat lock ensures at-most-once delivery
guaranteedWorker: true,
heartbeatMs: 5_000,
lockTtlMs: 30_000,
// Lifecycle hooks
hooks: {
onSuccess: async (job, result) => {
console.log(`Email ${result.messageId} delivered to ${job.data.to}`)
},
onFail: async (job, error) => {
console.error(`Email to ${job.data.to} failed: ${error.message}`)
},
},
// Dead letter queue for permanently failed emails
deadLetter: {
enabled: true,
onDead: async (job) => {
await notifyOpsTeam(`Email permanently failed: ${job.data.to}`)
},
},
handler: async (ctx) => {
// Check abort signal for long-running work
if (ctx.signal.aborted) throw new Error('Cancelled')
ctx.progress(10, 'Preparing email')
const result = await ses.sendEmail({
to: ctx.data.to,
subject: ctx.data.subject,
body: ctx.data.body,
})
ctx.progress(100, 'Delivered')
return { messageId: result.id, delivered: true }
},
})API Integration
import { emailQueue } from './triggers/email-queue.js'
// ── Single email ───────────────────────────────────────
app.post('/api/send-email', async (req, res) => {
const job = await emailQueue.add(
{ to: req.body.to, subject: req.body.subject, body: req.body.body },
{ jobId: `email-${req.body.to}-${Date.now()}` } // Idempotency
)
res.json({ jobId: job.id, status: job.status })
})
// ── Bulk email ─────────────────────────────────────────
app.post('/api/send-bulk', async (req, res) => {
const jobs = await emailQueue.addBulk(
req.body.recipients.map((r: any) => ({
data: { to: r.email, subject: req.body.subject, body: req.body.body },
opts: { jobId: `bulk-${r.email}-${req.body.campaignId}` },
}))
)
res.json({ queued: jobs.length })
})
// ── Check job status ───────────────────────────────────
app.get('/api/email-status/:id', async (req, res) => {
const job = await emailQueue.getJob(req.params.id)
if (!job) return res.status(404).json({ error: 'Not found' })
res.json({
id: job.id,
status: job.status,
progress: job.progressPercent,
result: job.result,
error: job.error,
})
})
// ── List failed jobs ───────────────────────────────────
app.get('/api/email-failures', async (req, res) => {
const failed = await emailQueue.getJobs({ status: 'failed', limit: 50 })
res.json(failed.map(j => ({
id: j.id,
to: j.data.to,
error: j.error,
attempts: j.attempts,
})))
})
// ── Queue management ───────────────────────────────────
app.post('/api/email-queue/pause', async (req, res) => {
await emailQueue.pause()
res.json({ paused: true })
})
app.post('/api/email-queue/resume', async (req, res) => {
await emailQueue.resume()
res.json({ paused: false })
})Cron + Queue: Database Scan & Deliver
The recommended pattern for "scan table, process rows":
import { cron, queue } from 'oqronkit'
// ── Step 1: Cron discovers pending work ────────────────
export const discoverGifts = cron({
name: 'discover-pending-gifts',
every: { minutes: 5 },
// Overlap protection: skip if previous run is still active
overlap: 'skip',
handler: async (ctx) => {
const gifts = await GiftingsModel.findAll({
where: { sent: false, to_be_sent_on: { [Op.lte]: new Date() } },
})
if (gifts.length === 0) return { discovered: 0 }
// Enqueue each gift as an independent job
await giftDeliveryQueue.addBulk(
gifts.map(g => ({
data: {
giftId: g.id,
toEmail: g.to_email,
toName: g.to_name,
template: g.template,
},
// Idempotency: prevents double-enqueue if cron re-fires
opts: { jobId: `gift-${g.id}` },
}))
)
// Mark as queued — not "sent", just "discovered"
await GiftingsModel.update(
{ sent: true },
{ where: { id: gifts.map(g => g.id) } }
)
return { discovered: gifts.length }
},
})
// ── Step 2: Queue delivers with throttle + retries ─────
export const giftDeliveryQueue = queue<GiftPayload, GiftResult>({
name: 'gift-email-delivery',
concurrency: 5,
throttle: { max: 20, duration: 2_000 },
retries: {
max: 3,
strategy: 'exponential',
baseDelay: 5_000,
},
hooks: {
onFail: async (job, error) => {
// Reset so next cron rediscovers it
await GiftingsModel.update(
{ sent: false },
{ where: { id: job.data.giftId } }
)
},
},
handler: async (ctx) => {
await sendGiftEmail(ctx.data)
return { delivered: true }
},
})Distributed Worker: Video Encoding
Publisher and consumer on separate servers:
import { queue } from 'oqronkit'
interface VideoJob {
videoId: string
s3Uri: string
codec: 'h264' | 'hevc' | 'av1'
}
// Publisher — no handler, no polling, zero CPU
export const videoQueue = queue<VideoJob, string>({
name: 'video-encode',
})app.post('/api/upload', async (req, res) => {
const job = await videoQueue.add({
videoId: `vid_${Date.now().toString(36)}`,
s3Uri: req.body.filePath,
codec: 'hevc',
}, {
jobId: `vid-${req.body.fileHash}`, // Dedup by file hash
priority: req.body.premium ? 1 : 10, // Premium users get priority
})
res.json({ trackingId: job.id })
})import { worker } from 'oqronkit'
export const videoWorker = worker<VideoJob, string>({
topic: 'video-encode',
concurrency: 2,
guaranteedWorker: true,
heartbeatMs: 5_000,
lockTtlMs: 60_000,
timeout: 300_000, // 5 minute timeout
handler: async (ctx) => {
ctx.progress(10, 'Downloading source')
await download(ctx.data.s3Uri)
if (ctx.signal.aborted) throw new Error('Cancelled')
ctx.progress(40, `Transcoding to ${ctx.data.codec}`)
await transcode(ctx.data.s3Uri, ctx.data.codec)
ctx.progress(90, 'Uploading to CDN')
const url = await uploadToCDN(ctx.data.videoId)
ctx.progress(100, 'Done')
return url
},
})Scheduled Tasks
One-Shot: Run Once at a Specific Time
import { schedule } from 'oqronkit'
export const maintenanceWindow = schedule({
name: 'db-maintenance',
runAt: new Date('2025-01-15T03:00:00Z'), // Run once at 3am UTC
handler: async (ctx) => {
await runVacuumAnalyze()
},
})Recurring: Daily Report
export const dailyReport = schedule<{ region: string }>({
name: 'daily-sales-report',
recurring: {
frequency: 'daily',
at: { hour: 9, minute: 0 },
},
timezone: 'Asia/Kolkata',
payload: { region: 'IN' },
handler: async (ctx) => {
const data = await getSalesData(ctx.payload.region, ctx.firedAt)
await generatePDF(data)
await emailReport(data)
},
})Cron Expression
export const weeklyCleanup = cron({
name: 'weekly-cleanup',
expression: '0 2 * * SUN', // Every Sunday at 2am
missedFire: 'run-once', // If server was down, run once on restart
overlap: 'skip', // Don't overlap if still running
handler: async (ctx) => {
const deleted = await cleanupOldRecords()
return { deletedCount: deleted }
},
})Interval-Based
export const healthCheck = cron({
name: 'health-check',
every: { seconds: 30 },
handler: async (ctx) => {
const status = await checkExternalServices()
if (!status.healthy) {
await alertOpsTeam(status)
}
},
})Webhook Dispatch
import { webhook } from 'oqronkit'
export const orderWebhooks = webhook<OrderEvent>({
name: 'order-events',
concurrency: 10,
throttle: { max: 100, duration: 60_000 }, // 100 deliveries per minute
timeout: 15_000,
retries: {
max: 5,
strategy: 'exponential',
baseDelay: 2_000,
},
security() {
return { signingSecret: process.env.WEBHOOK_SECRET! }
},
async endpoints() {
// Load from database for dynamic endpoint management
const subs = await WebhookSubscription.findAll({ where: { active: true } })
return subs.map(s => ({
name: s.name,
url: s.url,
events: s.events,
headers: { 'x-tenant-id': s.tenantId },
}))
},
})
// Fire events from your application
app.post('/api/orders', async (req, res) => {
const order = await createOrder(req.body)
// Delivers to all matching endpoints
await orderWebhooks.fire('order.created', {
orderId: order.id,
total: order.total,
customer: order.customerId,
})
res.json(order)
})Batch: Analytics Event Flushing
Buffer individual events and bulk-insert on flush:
import { batch } from 'oqronkit'
interface AnalyticsEvent {
event: string
userId: string
properties: Record<string, unknown>
timestamp: number
}
export const analyticsBatch = batch<AnalyticsEvent>({
name: 'analytics-flush',
maxSize: 500, // Flush every 500 events
maxWaitMs: 10_000, // Or every 10 seconds
handler: async (ctx) => {
ctx.log('info', `Flushing ${ctx.batchSize} analytics events`)
ctx.progress(10, 'Preparing insert')
await db.analyticsEvents.insertMany(ctx.batch)
ctx.progress(100, 'Inserted')
return { flushed: ctx.batchSize }
},
})
// Usage in API routes:
app.post('/api/track', async (req, res) => {
await analyticsBatch.add({
event: req.body.event,
userId: req.user.id,
properties: req.body.properties,
timestamp: Date.now(),
})
res.json({ ok: true }) // Returns instantly — item is buffered
})Batch: Multi-Tenant Log Aggregation
Use groupBy to isolate buffers per tenant:
import { batch } from 'oqronkit'
interface LogEntry {
tenantId: string
level: 'info' | 'warn' | 'error'
message: string
meta?: Record<string, unknown>
}
export const logBatch = batch<LogEntry>({
name: 'tenant-logs',
maxSize: 200,
maxWaitMs: 5_000,
// Each tenant gets its own buffer → separate flush per tenant
groupBy: (item) => item.tenantId,
// Retry + DLQ for critical log delivery
retries: { max: 3, strategy: 'exponential', baseDelay: 2_000 },
deadLetter: {
enabled: true,
onDead: async (job) => {
await alertOps(`Tenant ${job.data.groupKey} log flush permanently failed`)
},
},
handler: async (ctx) => {
// ctx.groupKey = "acme" or "globex"
// ctx.batch = only logs for this tenant
await TenantLogStore.bulkInsert(ctx.groupKey!, ctx.batch)
return { tenant: ctx.groupKey, flushed: ctx.batchSize }
},
})
// All logs go to the same batch — groupBy separates them
await logBatch.add({ tenantId: 'acme', level: 'info', message: 'User login' })
await logBatch.add({ tenantId: 'globex', level: 'error', message: 'DB timeout' })
await logBatch.add({ tenantId: 'acme', level: 'warn', message: 'Slow query' })
// acme buffer: 2 items, globex buffer: 1 itemRate Limiting: Cron with External API
import { cron, rateLimit } from 'oqronkit'
const supplierLimiter = rateLimit.create({
name: 'supplier-api',
algorithm: 'sliding-window',
tiers: [{ name: 'global', key: () => 'supplier', max: 100, window: '1h' }],
})
export const inventorySync = cron({
name: 'inventory-sync',
every: { minutes: 15 },
rateLimiter: supplierLimiter,
handler: async (ctx) => {
// This only runs if rateLimiter allows
// If blocked: fire is SKIPPED, nextRunAt advances to :30
const products = await fetchSupplierInventory()
await updateLocalInventory(products)
return { synced: products.length }
},
})Queue API Reference
queue.add(data, opts?)
Push a single job:
const job = await emailQueue.add(
{ to: 'user@example.com', subject: 'Hello' },
{
jobId: 'custom-id', // Idempotency key — prevents duplicates
priority: 1, // Lower = higher priority (default: 0)
delay: 60_000, // Delay execution by 60 seconds
}
)
// Returns: OqronJob { id, status, data, createdAt, ... }queue.addBulk(items)
Push multiple jobs:
const jobs = await emailQueue.addBulk([
{ data: { to: 'a@test.com', subject: 'Hi' }, opts: { priority: 1 } },
{ data: { to: 'b@test.com', subject: 'Hi' } },
])queue.getJob(id)
const job = await emailQueue.getJob('job-123')
// Returns: OqronJob | nullqueue.getJobs(filter?)
const waiting = await emailQueue.getJobs({ status: 'waiting', limit: 20 })
const failed = await emailQueue.getJobs({ status: 'failed', limit: 50 })queue.count(status?)
const total = await emailQueue.count()
const failedCount = await emailQueue.count('failed')queue.pause() / queue.resume()
await emailQueue.pause() // Stops claiming new jobs
const paused = await emailQueue.isPaused()
await emailQueue.resume() // Resumes claimingqueue.drain()
await emailQueue.drain() // Wait for active jobs to finish, then pausequeue.obliterate()
const removed = await emailQueue.obliterate() // Remove ALL jobs from storageHandler Context API
Every handler receives a context object with these properties:
handler: async (ctx) => {
// ── Identity ─────────────────────────────────
ctx.id // Job ID (string)
ctx.name // Queue name (string)
ctx.data // Typed payload (T)
// ── Execution State ──────────────────────────
ctx.attempt // Current attempt number (1-based)
ctx.maxAttempts // Max attempts configured
ctx.createdAt // When job was created (Date)
ctx.duration // Live elapsed ms since handler started
ctx.environment // Environment isolation boundary
ctx.project // Project isolation boundary
// ── Cancellation ─────────────────────────────
ctx.signal // AbortSignal — check for cancellation
ctx.aborted // Shorthand for signal.aborted
// ── Progress ─────────────────────────────────
ctx.progress(50, 'Processing') // Update progress (0-100)
ctx.getProgress() // Read current progress
// ── Logging ──────────────────────────────────
ctx.log('info', 'Message') // Structured logging
ctx.log.info('Message') // Shorthand
ctx.log.warn('Warning')
ctx.log.error('Error')
// ── Control ──────────────────────────────────
ctx.discard() // Permanently fail without retry
// ── Child Jobs ───────────────────────────────
const childId = await ctx.spawnChild('other-queue', { key: 'value' })
}Next Steps
- Rate Limiter — Deep-dive into throttle vs rateLimiter
- Crash Safety — Heartbeat locks and stall detection
- Architecture — Core design principles