OqronKitOqronKit

Batch

Accumulate items and flush as grouped payloads — by size or time window

Batch

The Batch module accumulates incoming items and flushes them as a single batch when either a count threshold (maxSize) or a time window (maxWaitMs) is reached — whichever comes first.

Use it for analytics ingestion, bulk database writes, log aggregation, webhook fan-out, or any workload where processing items one-by-one is wasteful.

Working example → See a complete, production-grade Batch implementation with analytics ingestion, grouped log aggregation, deduplication, and lifecycle hooks: apps/backend/src/triggers/batches.ts

Basic Usage

triggers/analytics.ts
import { batch } from 'oqronkit'

const analytics = batch<AnalyticsEvent>({
  name: 'analytics-flush',
  maxSize: 500,       // Flush when 500 events accumulate
  maxWaitMs: 10_000,  // Or every 10 seconds, whichever comes first

  handler: async (ctx) => {
    await db.analyticsEvents.insertMany(ctx.batch)
    return { flushed: ctx.batchSize }
  },
})

// In your API routes — items are buffered, not processed immediately
await analytics.add({ event: 'page.view', userId: 'u_123', path: '/home' })
await analytics.add({ event: 'button.click', userId: 'u_456', path: '/pricing' })

Adding Items

// Single item
await analytics.add({ event: 'page.view', userId: 'u_123' })

// Bulk add — each item is individually buffered and deduped
await analytics.addBulk([
  { event: 'page.view', userId: 'u_1' },
  { event: 'page.view', userId: 'u_2' },
  { event: 'page.view', userId: 'u_3' },
])

Admin & Queue Management

The Batch module ships with powerful runtime controls, allowing you to pause ingestion or gracefully drain queues during deployments.

// Force-flush immediately (bypasses maxSize/maxWaitMs)
await analytics.flush()

// Flush a specific group
await analytics.flush('tenant-acme')

// Pause/resume processing
await analytics.pause()
await analytics.resume()

// Gracefully drain: Waits for all active batch jobs to finish, then pauses.
await analytics.drain()

// Check buffer state
const pending = await analytics.getBufferSize()
const groupPending = await analytics.getBufferSize('tenant-acme')

Flush Triggers

Buffers are flushed when either condition is met:

TriggerConditionUse Case
Sizebuffer.length >= maxSizeHigh-volume ingestion — flush when full
Timenow - firstItemAt >= maxWaitMsLow-volume — don't wait forever for a full batch
Manual.flush() calledAdmin actions, graceful shutdown

OR logic (not AND): A buffer with 1 item will still flush after maxWaitMs elapses, even if it never reaches maxSize.

Group By

Route items into separate buffers by key. Each group flushes independently.

const tenantFlush = batch<{ tenantId: string; data: any }>({
  name: 'tenant-sync',
  maxSize: 100,
  maxWaitMs: 5_000,
  groupBy: (item) => item.tenantId,

  handler: async (ctx) => {
    // ctx.groupKey is the resolved group key ('acme', 'globex', etc.)
    await db.sync(ctx.groupKey!, ctx.batch)
    return { synced: ctx.batchSize }
  },
})

// These go into separate buffers
await tenantFlush.add({ tenantId: 'acme', data: { /* ... */ } })
await tenantFlush.add({ tenantId: 'globex', data: { /* ... */ } })
await tenantFlush.add({ tenantId: 'acme', data: { /* ... */ } }) // Same group as first

Progress & Long-Running Batches

If your batch handler is processing thousands of items and takes several minutes, you can report progress back to the OqronKit engine. This is visible in the UI and also helps the Stall Detector know your job isn't frozen.

const massiveSync = batch<UserRecord>({
  name: 'massive-sync',
  maxSize: 10000,
  maxWaitMs: 60_000,
  timeout: 300_000, // 5 minutes

  handler: async (ctx) => {
    for (let i = 0; i < ctx.batchSize; i++) {
      await processUser(ctx.batch[i]);
      
      // Update progress every 100 items
      if (i % 100 === 0) {
        const pct = Math.floor((i / ctx.batchSize) * 100);
        await ctx.progress(pct, `Processed ${i}/${ctx.batchSize}`);
      }
    }
  }
});

Deduplication

Skip duplicate items within the same buffer window.

const events = batch<{ eventId: string; payload: any }>({
  name: 'dedup-events',
  maxSize: 200,
  maxWaitMs: 5_000,
  deduplicateBy: (item) => item.eventId, // Skip if same eventId already buffered

  handler: async (ctx) => {
    await processUnique(ctx.batch)
  },
})

Dedup keys are tracked per-buffer-window. Once a buffer flushes, the dedup set resets.

Retry & Dead Letter Queue

const bulkWriter = batch<DbRow>({
  name: 'bulk-write',
  maxSize: 1000,
  maxWaitMs: 15_000,

  retries: {
    max: 3,
    strategy: 'exponential', // 'exponential' | 'fixed'
    baseDelay: 2_000,        // 2s → 4s → 8s
  },

  deadLetter: {
    enabled: true,
    onDead: async (job, error) => {
      await alertSlack(`Bulk write permanently failed: ${error.message}`)
    },
  },

  handler: async (ctx) => {
    if (ctx.attempt > 1) {
      ctx.log('warn', `Retry attempt ${ctx.attempt}/${ctx.maxAttempts}`)
    }
    await db.bulkInsert(ctx.batch)
  },
})

Hooks

Intercept flush and execution lifecycle events.

const monitored = batch<LogEntry>({
  name: 'log-flush',
  maxSize: 500,
  maxWaitMs: 5_000,

  hooks: {
    // Filter or transform items before they become a batch job
    beforeFlush: (items, groupKey) => {
      return items.filter(item => item.level !== 'debug')
    },

    // Called after handler succeeds
    onSuccess: async (job, result) => {
      metrics.increment('batch.success', { name: 'log-flush' })
    },

    // Called after handler fails (before retry decision)
    onFail: async (job, error) => {
      metrics.increment('batch.error', { name: 'log-flush' })
    },
  },

  handler: async (ctx) => {
    await logStore.bulkWrite(ctx.batch)
  },
})

Persistent Buffer Semantics (Crash-Safety)

OqronKit is designed for absolute data safety. By default (persist: true), items are stored in your configured OqronContainer.storage.

If your Node.js process crashes (OOM, SIGKILL) while it is right in the middle of flushing a buffer:

  1. The Batch Lock remains active.
  2. The buffer data is preserved via a flush_marker.
  3. When the cluster restarts, OqronKit's Stall Detector discovers the abandoned lock, recovers the abandoned items using the marker, and instantly re-publishes the batch as a new Job to the Broker. Result: Zero data loss.

In-Memory Mode

Set persist: false for ultra-fast ephemeral buffering. Items are held in process memory instead of the storage adapter. Ideal for fire-and-forget analytics where you can tolerate data loss on crash.

const fastBuffer = batch<MetricPoint>({
  name: 'metrics-buffer',
  maxSize: 1000,
  maxWaitMs: 2_000,
  persist: false, // No storage writes for buffering

  handler: async (ctx) => {
    await metricsBackend.submitBatch(ctx.batch)
  },
})

Configuration Reference

OptionTypeDefaultDescription
namestringrequiredUnique batch identifier
maxSizenumberrequiredFlush when this many items accumulate
maxWaitMsnumberrequiredFlush after this many ms since first item
handler(ctx) => Promise<R>requiredBatch processing function
groupBy(item: T) => stringundefinedRoute items to named groups
deduplicateBy(item: T) => stringundefinedSkip items with duplicate keys
persistbooleantrueBuffer in storage adapter (true) or memory (false)
retriesRetryConfig{ max: 0, strategy: 'exponential', baseDelay: 1000 }Retry policy
deadLetter.enabledbooleanfalseEnable dead letter queue
deadLetter.onDead(job, error) => voidundefinedCalled when all retries exhausted
throttle.maxnumberundefinedMax flushes per window
throttle.durationnumberundefinedThrottle window in ms
timeoutnumberundefinedHandler timeout in ms
tagsstring[][]Tags for filtering
hooksBatchHooks{}Lifecycle hooks

Module Configuration

When initializing OqronKit, configure the batch module engine:

import { OqronKit, batchModule } from 'oqronkit'

await OqronKit.init({
  config: {
    modules: [
      batchModule({
        tickIntervalMs: 1_000,     // How often to check buffers (default: 1000)
        heartbeatMs: 5_000,        // How often to poll for batch jobs (default: 5000)
        concurrency: 5,            // Max parallel batch handlers (default: 5)
        leaderElection: true,      // Only leader scans buffers (default: true)
      }),
    ],
  },
})

Architecture

The Batch module uses a dual-loop architecture:

  1. Tick Loop (leader only): Scans all registered buffers every tickIntervalMs. If a buffer meets its flush condition, it creates a batch job with a deterministic UUID and publishes it to the broker.

  2. Poll Loop (all nodes): Claims batch jobs from the broker, extends heartbeats to prevent stall detection, and executes handlers. This distributes processing across all worker nodes.

┌─────────────┐     ┌──────────────┐     ┌──────────────┐
│  .add(item)  │────▶│   Buffer     │────▶│  Batch Job   │
│  .addBulk()  │     │  (Storage)   │     │  (Broker)    │
└─────────────┘     └──────────────┘     └──────────────┘
                      │ Tick Loop         │ Poll Loop
                      │ (Leader)          │ (All Nodes)
                      ▼                   ▼
                    maxSize OR          handler(ctx)
                    maxWaitMs           ──▶ success/retry

Next Steps

On this page