Pipeline
Streaming ETL with backpressure and stage-based processing
Pipeline
The Pipeline module enables streaming ETL (Extract, Transform, Load) workflows with built-in backpressure control. Each pipeline is composed of sequential stages that process data flowing through them.
Roadmap Module — Pipeline is part of the v1.x Enterprise release. This page documents the planned API.
Planned API
import { pipeline } from 'oqronkit'
const etlPipeline = pipeline<RawEvent, EnrichedEvent>({
name: 'events-etl',
stages: [
{
name: 'extract',
concurrency: 5,
handler: async (ctx) => {
// Pull raw events from source
const raw = await fetchFromKafka(ctx.data.topic)
return raw.map(normalize)
},
},
{
name: 'transform',
concurrency: 10,
handler: async (ctx) => {
// Enrich with user data, geo-lookup, etc.
const enriched = await enrichEvent(ctx.data)
return enriched
},
},
{
name: 'load',
concurrency: 3,
handler: async (ctx) => {
// Write to data warehouse
await insertIntoClickHouse(ctx.data)
return { written: true }
},
},
],
// Backpressure: pause upstream stages when downstream is overwhelmed
backpressure: {
maxBufferSize: 1000,
strategy: 'pause-upstream', // or 'drop-oldest' | 'reject'
},
// Error handling per stage
onStageError: async (stage, error, ctx) => {
ctx.log('error', `Stage ${stage} failed`, { error: error.message })
// Return 'retry' | 'skip' | 'abort'
return 'retry'
},
})Use Cases
- Event Processing — Normalize, enrich, and store raw event streams
- Data Migration — Multi-step transformations with checkpoints
- Log Processing — Parse, filter, and aggregate log entries
- File Processing — Read, transform, and write large file batches
Backpressure Strategies
| Strategy | Behavior |
|---|---|
'pause-upstream' | Pauses earlier stages when downstream buffer is full |
'drop-oldest' | Drops oldest buffered items to make room |
'reject' | Rejects new items and surfaces error to producer |
Stage Composition
// Each stage can have independent concurrency, retries, and timeouts
stages: [
{
name: 'validate',
concurrency: 20, // High concurrency for CPU-light validation
timeout: 5_000,
retries: { max: 0 }, // No retries — invalid data is dropped
},
{
name: 'process',
concurrency: 3, // Low concurrency for heavy compute
timeout: 60_000,
retries: { max: 2, strategy: 'exponential', baseDelay: 1000 },
guaranteedWorker: true,
},
]Monitoring
// Get pipeline metrics
const metrics = await etlPipeline.getMetrics()
// {
// stages: {
// extract: { processed: 142000, failed: 12, avgLatencyMs: 8 },
// transform: { processed: 141988, failed: 3, avgLatencyMs: 45 },
// load: { processed: 141985, failed: 0, avgLatencyMs: 120 },
// },
// buffer: { current: 342, max: 1000, pressure: 0.34 },
// }