OqronKitOqronKit

Pipeline

Streaming ETL with backpressure and stage-based processing

Pipeline

The Pipeline module enables streaming ETL (Extract, Transform, Load) workflows with built-in backpressure control. Each pipeline is composed of sequential stages that process data flowing through them.

Roadmap Module — Pipeline is part of the v1.x Enterprise release. This page documents the planned API.

Planned API

import { pipeline } from 'oqronkit'

const etlPipeline = pipeline<RawEvent, EnrichedEvent>({
  name: 'events-etl',

  stages: [
    {
      name: 'extract',
      concurrency: 5,
      handler: async (ctx) => {
        // Pull raw events from source
        const raw = await fetchFromKafka(ctx.data.topic)
        return raw.map(normalize)
      },
    },
    {
      name: 'transform',
      concurrency: 10,
      handler: async (ctx) => {
        // Enrich with user data, geo-lookup, etc.
        const enriched = await enrichEvent(ctx.data)
        return enriched
      },
    },
    {
      name: 'load',
      concurrency: 3,
      handler: async (ctx) => {
        // Write to data warehouse
        await insertIntoClickHouse(ctx.data)
        return { written: true }
      },
    },
  ],

  // Backpressure: pause upstream stages when downstream is overwhelmed
  backpressure: {
    maxBufferSize: 1000,
    strategy: 'pause-upstream', // or 'drop-oldest' | 'reject'
  },

  // Error handling per stage
  onStageError: async (stage, error, ctx) => {
    ctx.log('error', `Stage ${stage} failed`, { error: error.message })
    // Return 'retry' | 'skip' | 'abort'
    return 'retry'
  },
})

Use Cases

  • Event Processing — Normalize, enrich, and store raw event streams
  • Data Migration — Multi-step transformations with checkpoints
  • Log Processing — Parse, filter, and aggregate log entries
  • File Processing — Read, transform, and write large file batches

Backpressure Strategies

StrategyBehavior
'pause-upstream'Pauses earlier stages when downstream buffer is full
'drop-oldest'Drops oldest buffered items to make room
'reject'Rejects new items and surfaces error to producer

Stage Composition

// Each stage can have independent concurrency, retries, and timeouts
stages: [
  {
    name: 'validate',
    concurrency: 20,     // High concurrency for CPU-light validation
    timeout: 5_000,
    retries: { max: 0 }, // No retries — invalid data is dropped
  },
  {
    name: 'process',
    concurrency: 3,      // Low concurrency for heavy compute
    timeout: 60_000,
    retries: { max: 2, strategy: 'exponential', baseDelay: 1000 },
    guaranteedWorker: true,
  },
]

Monitoring

// Get pipeline metrics
const metrics = await etlPipeline.getMetrics()
// {
//   stages: {
//     extract: { processed: 142000, failed: 12, avgLatencyMs: 8 },
//     transform: { processed: 141988, failed: 3, avgLatencyMs: 45 },
//     load: { processed: 141985, failed: 0, avgLatencyMs: 120 },
//   },
//   buffer: { current: 342, max: 1000, pressure: 0.34 },
// }

Next Steps

  • Batch — Accumulate items before processing
  • Ingest — Event-driven stateful functions

On this page