OqronKitOqronKit

Examples

Real-world usage patterns and API reference for OqronKit modules

Examples

Production-ready patterns for common use cases. Each example shows the complete API usage with configuration, handler, and error handling.


Email Delivery System

Single-Node: Throttled Queue

triggers/email-queue.ts
import { queue, cron } from 'oqronkit'

// ── Types ──────────────────────────────────────────────
interface EmailPayload {
  to: string
  subject: string
  body: string
  templateId?: string
}

interface EmailResult {
  messageId: string
  delivered: boolean
}

// ── Queue: delivers emails with pacing ─────────────────
export const emailQueue = queue<EmailPayload, EmailResult>({
  name: 'email-delivery',

  // Parallelism: 5 emails sending simultaneously
  concurrency: 5,

  // Throughput: max 20 dispatched per 2 seconds (10/s)
  throttle: { max: 20, duration: 2_000 },

  // Retry failed deliveries with exponential backoff
  retries: {
    max: 3,
    strategy: 'exponential',
    baseDelay: 5_000, // 5s, 10s, 20s
  },

  // Keep last 100 completed jobs for audit
  keepHistory: 100,
  keepFailedHistory: 500,

  // Crash-safe: heartbeat lock ensures at-most-once delivery
  guaranteedWorker: true,
  heartbeatMs: 5_000,
  lockTtlMs: 30_000,

  // Lifecycle hooks
  hooks: {
    onSuccess: async (job, result) => {
      console.log(`Email ${result.messageId} delivered to ${job.data.to}`)
    },
    onFail: async (job, error) => {
      console.error(`Email to ${job.data.to} failed: ${error.message}`)
    },
  },

  // Dead letter queue for permanently failed emails
  deadLetter: {
    enabled: true,
    onDead: async (job) => {
      await notifyOpsTeam(`Email permanently failed: ${job.data.to}`)
    },
  },

  handler: async (ctx) => {
    // Check abort signal for long-running work
    if (ctx.signal.aborted) throw new Error('Cancelled')

    ctx.progress(10, 'Preparing email')

    const result = await ses.sendEmail({
      to: ctx.data.to,
      subject: ctx.data.subject,
      body: ctx.data.body,
    })

    ctx.progress(100, 'Delivered')
    return { messageId: result.id, delivered: true }
  },
})

API Integration

api-routes.ts
import { emailQueue } from './triggers/email-queue.js'

// ── Single email ───────────────────────────────────────
app.post('/api/send-email', async (req, res) => {
  const job = await emailQueue.add(
    { to: req.body.to, subject: req.body.subject, body: req.body.body },
    { jobId: `email-${req.body.to}-${Date.now()}` } // Idempotency
  )
  res.json({ jobId: job.id, status: job.status })
})

// ── Bulk email ─────────────────────────────────────────
app.post('/api/send-bulk', async (req, res) => {
  const jobs = await emailQueue.addBulk(
    req.body.recipients.map((r: any) => ({
      data: { to: r.email, subject: req.body.subject, body: req.body.body },
      opts: { jobId: `bulk-${r.email}-${req.body.campaignId}` },
    }))
  )
  res.json({ queued: jobs.length })
})

// ── Check job status ───────────────────────────────────
app.get('/api/email-status/:id', async (req, res) => {
  const job = await emailQueue.getJob(req.params.id)
  if (!job) return res.status(404).json({ error: 'Not found' })
  res.json({
    id: job.id,
    status: job.status,
    progress: job.progressPercent,
    result: job.result,
    error: job.error,
  })
})

// ── List failed jobs ───────────────────────────────────
app.get('/api/email-failures', async (req, res) => {
  const failed = await emailQueue.getJobs({ status: 'failed', limit: 50 })
  res.json(failed.map(j => ({
    id: j.id,
    to: j.data.to,
    error: j.error,
    attempts: j.attempts,
  })))
})

// ── Queue management ───────────────────────────────────
app.post('/api/email-queue/pause', async (req, res) => {
  await emailQueue.pause()
  res.json({ paused: true })
})

app.post('/api/email-queue/resume', async (req, res) => {
  await emailQueue.resume()
  res.json({ paused: false })
})

Cron + Queue: Database Scan & Deliver

The recommended pattern for "scan table, process rows":

triggers/gift-delivery.ts
import { cron, queue } from 'oqronkit'

// ── Step 1: Cron discovers pending work ────────────────
export const discoverGifts = cron({
  name: 'discover-pending-gifts',
  every: { minutes: 5 },

  // Overlap protection: skip if previous run is still active
  overlap: 'skip',

  handler: async (ctx) => {
    const gifts = await GiftingsModel.findAll({
      where: { sent: false, to_be_sent_on: { [Op.lte]: new Date() } },
    })

    if (gifts.length === 0) return { discovered: 0 }

    // Enqueue each gift as an independent job
    await giftDeliveryQueue.addBulk(
      gifts.map(g => ({
        data: {
          giftId: g.id,
          toEmail: g.to_email,
          toName: g.to_name,
          template: g.template,
        },
        // Idempotency: prevents double-enqueue if cron re-fires
        opts: { jobId: `gift-${g.id}` },
      }))
    )

    // Mark as queued — not "sent", just "discovered"
    await GiftingsModel.update(
      { sent: true },
      { where: { id: gifts.map(g => g.id) } }
    )

    return { discovered: gifts.length }
  },
})

// ── Step 2: Queue delivers with throttle + retries ─────
export const giftDeliveryQueue = queue<GiftPayload, GiftResult>({
  name: 'gift-email-delivery',
  concurrency: 5,
  throttle: { max: 20, duration: 2_000 },

  retries: {
    max: 3,
    strategy: 'exponential',
    baseDelay: 5_000,
  },

  hooks: {
    onFail: async (job, error) => {
      // Reset so next cron rediscovers it
      await GiftingsModel.update(
        { sent: false },
        { where: { id: job.data.giftId } }
      )
    },
  },

  handler: async (ctx) => {
    await sendGiftEmail(ctx.data)
    return { delivered: true }
  },
})

Distributed Worker: Video Encoding

Publisher and consumer on separate servers:

api-server/triggers/video-queue.ts
import { queue } from 'oqronkit'

interface VideoJob {
  videoId: string
  s3Uri: string
  codec: 'h264' | 'hevc' | 'av1'
}

// Publisher — no handler, no polling, zero CPU
export const videoQueue = queue<VideoJob, string>({
  name: 'video-encode',
})
api-server/routes.ts
app.post('/api/upload', async (req, res) => {
  const job = await videoQueue.add({
    videoId: `vid_${Date.now().toString(36)}`,
    s3Uri: req.body.filePath,
    codec: 'hevc',
  }, {
    jobId: `vid-${req.body.fileHash}`, // Dedup by file hash
    priority: req.body.premium ? 1 : 10, // Premium users get priority
  })
  res.json({ trackingId: job.id })
})
worker-server/triggers/video-worker.ts
import { worker } from 'oqronkit'

export const videoWorker = worker<VideoJob, string>({
  topic: 'video-encode',
  concurrency: 2,
  guaranteedWorker: true,
  heartbeatMs: 5_000,
  lockTtlMs: 60_000,
  timeout: 300_000, // 5 minute timeout

  handler: async (ctx) => {
    ctx.progress(10, 'Downloading source')
    await download(ctx.data.s3Uri)

    if (ctx.signal.aborted) throw new Error('Cancelled')

    ctx.progress(40, `Transcoding to ${ctx.data.codec}`)
    await transcode(ctx.data.s3Uri, ctx.data.codec)

    ctx.progress(90, 'Uploading to CDN')
    const url = await uploadToCDN(ctx.data.videoId)

    ctx.progress(100, 'Done')
    return url
  },
})

Scheduled Tasks

One-Shot: Run Once at a Specific Time

triggers/scheduled-tasks.ts
import { schedule } from 'oqronkit'

export const maintenanceWindow = schedule({
  name: 'db-maintenance',
  runAt: new Date('2025-01-15T03:00:00Z'), // Run once at 3am UTC

  handler: async (ctx) => {
    await runVacuumAnalyze()
  },
})

Recurring: Daily Report

export const dailyReport = schedule<{ region: string }>({
  name: 'daily-sales-report',
  recurring: {
    frequency: 'daily',
    at: { hour: 9, minute: 0 },
  },
  timezone: 'Asia/Kolkata',
  payload: { region: 'IN' },

  handler: async (ctx) => {
    const data = await getSalesData(ctx.payload.region, ctx.firedAt)
    await generatePDF(data)
    await emailReport(data)
  },
})

Cron Expression

export const weeklyCleanup = cron({
  name: 'weekly-cleanup',
  expression: '0 2 * * SUN', // Every Sunday at 2am

  missedFire: 'run-once',   // If server was down, run once on restart
  overlap: 'skip',           // Don't overlap if still running

  handler: async (ctx) => {
    const deleted = await cleanupOldRecords()
    return { deletedCount: deleted }
  },
})

Interval-Based

export const healthCheck = cron({
  name: 'health-check',
  every: { seconds: 30 },

  handler: async (ctx) => {
    const status = await checkExternalServices()
    if (!status.healthy) {
      await alertOpsTeam(status)
    }
  },
})

Webhook Dispatch

triggers/webhooks.ts
import { webhook } from 'oqronkit'

export const orderWebhooks = webhook<OrderEvent>({
  name: 'order-events',
  concurrency: 10,
  throttle: { max: 100, duration: 60_000 }, // 100 deliveries per minute
  timeout: 15_000,

  retries: {
    max: 5,
    strategy: 'exponential',
    baseDelay: 2_000,
  },

  security() {
    return { signingSecret: process.env.WEBHOOK_SECRET! }
  },

  async endpoints() {
    // Load from database for dynamic endpoint management
    const subs = await WebhookSubscription.findAll({ where: { active: true } })
    return subs.map(s => ({
      name: s.name,
      url: s.url,
      events: s.events,
      headers: { 'x-tenant-id': s.tenantId },
    }))
  },
})

// Fire events from your application
app.post('/api/orders', async (req, res) => {
  const order = await createOrder(req.body)

  // Delivers to all matching endpoints
  await orderWebhooks.fire('order.created', {
    orderId: order.id,
    total: order.total,
    customer: order.customerId,
  })

  res.json(order)
})

Batch: Analytics Event Flushing

Buffer individual events and bulk-insert on flush:

triggers/analytics-batch.ts
import { batch } from 'oqronkit'

interface AnalyticsEvent {
  event: string
  userId: string
  properties: Record<string, unknown>
  timestamp: number
}

export const analyticsBatch = batch<AnalyticsEvent>({
  name: 'analytics-flush',
  maxSize: 500,           // Flush every 500 events
  maxWaitMs: 10_000,      // Or every 10 seconds

  handler: async (ctx) => {
    ctx.log('info', `Flushing ${ctx.batchSize} analytics events`)
    ctx.progress(10, 'Preparing insert')

    await db.analyticsEvents.insertMany(ctx.batch)

    ctx.progress(100, 'Inserted')
    return { flushed: ctx.batchSize }
  },
})

// Usage in API routes:
app.post('/api/track', async (req, res) => {
  await analyticsBatch.add({
    event: req.body.event,
    userId: req.user.id,
    properties: req.body.properties,
    timestamp: Date.now(),
  })
  res.json({ ok: true }) // Returns instantly — item is buffered
})

Batch: Multi-Tenant Log Aggregation

Use groupBy to isolate buffers per tenant:

triggers/log-batch.ts
import { batch } from 'oqronkit'

interface LogEntry {
  tenantId: string
  level: 'info' | 'warn' | 'error'
  message: string
  meta?: Record<string, unknown>
}

export const logBatch = batch<LogEntry>({
  name: 'tenant-logs',
  maxSize: 200,
  maxWaitMs: 5_000,

  // Each tenant gets its own buffer → separate flush per tenant
  groupBy: (item) => item.tenantId,

  // Retry + DLQ for critical log delivery
  retries: { max: 3, strategy: 'exponential', baseDelay: 2_000 },
  deadLetter: {
    enabled: true,
    onDead: async (job) => {
      await alertOps(`Tenant ${job.data.groupKey} log flush permanently failed`)
    },
  },

  handler: async (ctx) => {
    // ctx.groupKey = "acme" or "globex"
    // ctx.batch = only logs for this tenant
    await TenantLogStore.bulkInsert(ctx.groupKey!, ctx.batch)
    return { tenant: ctx.groupKey, flushed: ctx.batchSize }
  },
})

// All logs go to the same batch — groupBy separates them
await logBatch.add({ tenantId: 'acme', level: 'info', message: 'User login' })
await logBatch.add({ tenantId: 'globex', level: 'error', message: 'DB timeout' })
await logBatch.add({ tenantId: 'acme', level: 'warn', message: 'Slow query' })
// acme buffer: 2 items, globex buffer: 1 item

Rate Limiting: Cron with External API

triggers/inventory-sync.ts
import { cron, rateLimit } from 'oqronkit'

const supplierLimiter = rateLimit.create({
  name: 'supplier-api',
  algorithm: 'sliding-window',
  tiers: [{ name: 'global', key: () => 'supplier', max: 100, window: '1h' }],
})

export const inventorySync = cron({
  name: 'inventory-sync',
  every: { minutes: 15 },
  rateLimiter: supplierLimiter,

  handler: async (ctx) => {
    // This only runs if rateLimiter allows
    // If blocked: fire is SKIPPED, nextRunAt advances to :30
    const products = await fetchSupplierInventory()
    await updateLocalInventory(products)
    return { synced: products.length }
  },
})

Queue API Reference

queue.add(data, opts?)

Push a single job:

const job = await emailQueue.add(
  { to: 'user@example.com', subject: 'Hello' },
  {
    jobId: 'custom-id',     // Idempotency key — prevents duplicates
    priority: 1,             // Lower = higher priority (default: 0)
    delay: 60_000,           // Delay execution by 60 seconds
  }
)

// Returns: OqronJob { id, status, data, createdAt, ... }

queue.addBulk(items)

Push multiple jobs:

const jobs = await emailQueue.addBulk([
  { data: { to: 'a@test.com', subject: 'Hi' }, opts: { priority: 1 } },
  { data: { to: 'b@test.com', subject: 'Hi' } },
])

queue.getJob(id)

const job = await emailQueue.getJob('job-123')
// Returns: OqronJob | null

queue.getJobs(filter?)

const waiting = await emailQueue.getJobs({ status: 'waiting', limit: 20 })
const failed = await emailQueue.getJobs({ status: 'failed', limit: 50 })

queue.count(status?)

const total = await emailQueue.count()
const failedCount = await emailQueue.count('failed')

queue.pause() / queue.resume()

await emailQueue.pause()        // Stops claiming new jobs
const paused = await emailQueue.isPaused()
await emailQueue.resume()       // Resumes claiming

queue.drain()

await emailQueue.drain()        // Wait for active jobs to finish, then pause

queue.obliterate()

const removed = await emailQueue.obliterate() // Remove ALL jobs from storage

Handler Context API

Every handler receives a context object with these properties:

handler: async (ctx) => {
  // ── Identity ─────────────────────────────────
  ctx.id            // Job ID (string)
  ctx.name          // Queue name (string)
  ctx.data          // Typed payload (T)

  // ── Execution State ──────────────────────────
  ctx.attempt       // Current attempt number (1-based)
  ctx.maxAttempts   // Max attempts configured
  ctx.createdAt     // When job was created (Date)
  ctx.duration      // Live elapsed ms since handler started
  ctx.environment   // Environment isolation boundary
  ctx.project       // Project isolation boundary

  // ── Cancellation ─────────────────────────────
  ctx.signal        // AbortSignal — check for cancellation
  ctx.aborted       // Shorthand for signal.aborted

  // ── Progress ─────────────────────────────────
  ctx.progress(50, 'Processing')   // Update progress (0-100)
  ctx.getProgress()                 // Read current progress

  // ── Logging ──────────────────────────────────
  ctx.log('info', 'Message')        // Structured logging
  ctx.log.info('Message')           // Shorthand
  ctx.log.warn('Warning')
  ctx.log.error('Error')

  // ── Control ──────────────────────────────────
  ctx.discard()                     // Permanently fail without retry

  // ── Child Jobs ───────────────────────────────
  const childId = await ctx.spawnChild('other-queue', { key: 'value' })
}

Next Steps

On this page