Batch
Accumulate items and flush as grouped payloads — by size or time window
Batch
The Batch module accumulates incoming items and flushes them as a single batch when either a count threshold (maxSize) or a time window (maxWaitMs) is reached — whichever comes first.
Use it for analytics ingestion, bulk database writes, log aggregation, webhook fan-out, or any workload where processing items one-by-one is wasteful.
Working example → See a complete, production-grade Batch implementation with analytics ingestion, grouped log aggregation, deduplication, and lifecycle hooks: apps/backend/src/triggers/batches.ts
Basic Usage
import { batch } from 'oqronkit'
const analytics = batch<AnalyticsEvent>({
name: 'analytics-flush',
maxSize: 500, // Flush when 500 events accumulate
maxWaitMs: 10_000, // Or every 10 seconds, whichever comes first
handler: async (ctx) => {
await db.analyticsEvents.insertMany(ctx.batch)
return { flushed: ctx.batchSize }
},
})
// In your API routes — items are buffered, not processed immediately
await analytics.add({ event: 'page.view', userId: 'u_123', path: '/home' })
await analytics.add({ event: 'button.click', userId: 'u_456', path: '/pricing' })Adding Items
// Single item
await analytics.add({ event: 'page.view', userId: 'u_123' })
// Bulk add — each item is individually buffered and deduped
await analytics.addBulk([
{ event: 'page.view', userId: 'u_1' },
{ event: 'page.view', userId: 'u_2' },
{ event: 'page.view', userId: 'u_3' },
])Admin & Queue Management
The Batch module ships with powerful runtime controls, allowing you to pause ingestion or gracefully drain queues during deployments.
// Force-flush immediately (bypasses maxSize/maxWaitMs)
await analytics.flush()
// Flush a specific group
await analytics.flush('tenant-acme')
// Pause/resume processing
await analytics.pause()
await analytics.resume()
// Gracefully drain: Waits for all active batch jobs to finish, then pauses.
await analytics.drain()
// Check buffer state
const pending = await analytics.getBufferSize()
const groupPending = await analytics.getBufferSize('tenant-acme')Flush Triggers
Buffers are flushed when either condition is met:
| Trigger | Condition | Use Case |
|---|---|---|
| Size | buffer.length >= maxSize | High-volume ingestion — flush when full |
| Time | now - firstItemAt >= maxWaitMs | Low-volume — don't wait forever for a full batch |
| Manual | .flush() called | Admin actions, graceful shutdown |
OR logic (not AND): A buffer with 1 item will still flush after maxWaitMs elapses, even if it never reaches maxSize.
Group By
Route items into separate buffers by key. Each group flushes independently.
const tenantFlush = batch<{ tenantId: string; data: any }>({
name: 'tenant-sync',
maxSize: 100,
maxWaitMs: 5_000,
groupBy: (item) => item.tenantId,
handler: async (ctx) => {
// ctx.groupKey is the resolved group key ('acme', 'globex', etc.)
await db.sync(ctx.groupKey!, ctx.batch)
return { synced: ctx.batchSize }
},
})
// These go into separate buffers
await tenantFlush.add({ tenantId: 'acme', data: { /* ... */ } })
await tenantFlush.add({ tenantId: 'globex', data: { /* ... */ } })
await tenantFlush.add({ tenantId: 'acme', data: { /* ... */ } }) // Same group as firstProgress & Long-Running Batches
If your batch handler is processing thousands of items and takes several minutes, you can report progress back to the OqronKit engine. This is visible in the UI and also helps the Stall Detector know your job isn't frozen.
const massiveSync = batch<UserRecord>({
name: 'massive-sync',
maxSize: 10000,
maxWaitMs: 60_000,
timeout: 300_000, // 5 minutes
handler: async (ctx) => {
for (let i = 0; i < ctx.batchSize; i++) {
await processUser(ctx.batch[i]);
// Update progress every 100 items
if (i % 100 === 0) {
const pct = Math.floor((i / ctx.batchSize) * 100);
await ctx.progress(pct, `Processed ${i}/${ctx.batchSize}`);
}
}
}
});Deduplication
Skip duplicate items within the same buffer window.
const events = batch<{ eventId: string; payload: any }>({
name: 'dedup-events',
maxSize: 200,
maxWaitMs: 5_000,
deduplicateBy: (item) => item.eventId, // Skip if same eventId already buffered
handler: async (ctx) => {
await processUnique(ctx.batch)
},
})Dedup keys are tracked per-buffer-window. Once a buffer flushes, the dedup set resets.
Retry & Dead Letter Queue
const bulkWriter = batch<DbRow>({
name: 'bulk-write',
maxSize: 1000,
maxWaitMs: 15_000,
retries: {
max: 3,
strategy: 'exponential', // 'exponential' | 'fixed'
baseDelay: 2_000, // 2s → 4s → 8s
},
deadLetter: {
enabled: true,
onDead: async (job, error) => {
await alertSlack(`Bulk write permanently failed: ${error.message}`)
},
},
handler: async (ctx) => {
if (ctx.attempt > 1) {
ctx.log('warn', `Retry attempt ${ctx.attempt}/${ctx.maxAttempts}`)
}
await db.bulkInsert(ctx.batch)
},
})Hooks
Intercept flush and execution lifecycle events.
const monitored = batch<LogEntry>({
name: 'log-flush',
maxSize: 500,
maxWaitMs: 5_000,
hooks: {
// Filter or transform items before they become a batch job
beforeFlush: (items, groupKey) => {
return items.filter(item => item.level !== 'debug')
},
// Called after handler succeeds
onSuccess: async (job, result) => {
metrics.increment('batch.success', { name: 'log-flush' })
},
// Called after handler fails (before retry decision)
onFail: async (job, error) => {
metrics.increment('batch.error', { name: 'log-flush' })
},
},
handler: async (ctx) => {
await logStore.bulkWrite(ctx.batch)
},
})Persistent Buffer Semantics (Crash-Safety)
OqronKit is designed for absolute data safety. By default (persist: true), items are stored in your configured OqronContainer.storage.
If your Node.js process crashes (OOM, SIGKILL) while it is right in the middle of flushing a buffer:
- The Batch Lock remains active.
- The buffer data is preserved via a
flush_marker. - When the cluster restarts, OqronKit's Stall Detector discovers the abandoned lock, recovers the abandoned items using the marker, and instantly re-publishes the batch as a new Job to the Broker. Result: Zero data loss.
In-Memory Mode
Set persist: false for ultra-fast ephemeral buffering. Items are held in process memory instead of the storage adapter. Ideal for fire-and-forget analytics where you can tolerate data loss on crash.
const fastBuffer = batch<MetricPoint>({
name: 'metrics-buffer',
maxSize: 1000,
maxWaitMs: 2_000,
persist: false, // No storage writes for buffering
handler: async (ctx) => {
await metricsBackend.submitBatch(ctx.batch)
},
})Configuration Reference
| Option | Type | Default | Description |
|---|---|---|---|
name | string | required | Unique batch identifier |
maxSize | number | required | Flush when this many items accumulate |
maxWaitMs | number | required | Flush after this many ms since first item |
handler | (ctx) => Promise<R> | required | Batch processing function |
groupBy | (item: T) => string | undefined | Route items to named groups |
deduplicateBy | (item: T) => string | undefined | Skip items with duplicate keys |
persist | boolean | true | Buffer in storage adapter (true) or memory (false) |
retries | RetryConfig | { max: 0, strategy: 'exponential', baseDelay: 1000 } | Retry policy |
deadLetter.enabled | boolean | false | Enable dead letter queue |
deadLetter.onDead | (job, error) => void | undefined | Called when all retries exhausted |
throttle.max | number | undefined | Max flushes per window |
throttle.duration | number | undefined | Throttle window in ms |
timeout | number | undefined | Handler timeout in ms |
tags | string[] | [] | Tags for filtering |
hooks | BatchHooks | {} | Lifecycle hooks |
Module Configuration
When initializing OqronKit, configure the batch module engine:
import { OqronKit, batchModule } from 'oqronkit'
await OqronKit.init({
config: {
modules: [
batchModule({
tickIntervalMs: 1_000, // How often to check buffers (default: 1000)
heartbeatMs: 5_000, // How often to poll for batch jobs (default: 5000)
concurrency: 5, // Max parallel batch handlers (default: 5)
leaderElection: true, // Only leader scans buffers (default: true)
}),
],
},
})Architecture
The Batch module uses a dual-loop architecture:
-
Tick Loop (leader only): Scans all registered buffers every
tickIntervalMs. If a buffer meets its flush condition, it creates a batch job with a deterministicUUIDand publishes it to the broker. -
Poll Loop (all nodes): Claims batch jobs from the broker, extends heartbeats to prevent stall detection, and executes handlers. This distributes processing across all worker nodes.
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ .add(item) │────▶│ Buffer │────▶│ Batch Job │
│ .addBulk() │ │ (Storage) │ │ (Broker) │
└─────────────┘ └──────────────┘ └──────────────┘
│ Tick Loop │ Poll Loop
│ (Leader) │ (All Nodes)
▼ ▼
maxSize OR handler(ctx)
maxWaitMs ──▶ success/retryNext Steps
- Task Queue — For immediate, per-job processing
- Distributed Worker — Decouple publishers from processors
- Crash Safety — How heartbeat locks protect your jobs