OqronKitOqronKit

Workflow (DAG)

Multi-step workflows as Directed Acyclic Graphs — parallel fan-out/fan-in, conditional branching, step output piping, and sub-workflow composition

Workflow (DAG)

The Workflow module models multi-step processes as Directed Acyclic Graphs (DAGs). Steps may run in parallel or sequence, pass output to downstream steps, branch conditionally, and compose sub-workflows. All step state is persisted — workflows can be paused, inspected, and resumed after a crash.

When to use Workflow vs Queue: Use a Queue when you have independent jobs. Use a Workflow when you have multiple steps that depend on each other — when step B needs the output of step A, or when steps C and D should run in parallel after B completes.

Basic Usage

triggers/onboarding.ts
import { workflow } from 'oqronkit'

interface OnboardInput { userId: string; email: string; plan: 'free' | 'pro' }
interface OnboardOutput { workspaceId: string; subscriptionId: string }

export const onboarding = workflow<OnboardInput, OnboardOutput>({
  name: 'user-onboarding',
  description: 'Full onboarding pipeline after signup',

  steps: [
    {
      id: 'send-welcome-email',
      handler: async (ctx) => {
        await mailer.sendWelcome(ctx.input.email)
        return { sent: true }
      },
    },
    {
      id: 'provision-workspace',
      dependsOn: ['send-welcome-email'],
      handler: async (ctx) => {
        const ws = await workspace.create({ userId: ctx.input.userId })
        return { workspaceId: ws.id }
      },
    },
    {
      id: 'finalize',
      dependsOn: ['provision-workspace'],
      handler: async (ctx) => {
        const wsId = ctx.stepOutput<{ workspaceId: string }>('provision-workspace').workspaceId
        await analytics.track('user.onboarded', { userId: ctx.input.userId })
        return { workspaceId: wsId, subscriptionId: 'free' }
      },
    },
  ],
})

Triggering & Monitoring

import { onboarding } from './triggers/onboarding'

// Start a new run
const run = await onboarding.trigger({
  userId: 'u_123',
  email: 'tiger@example.com',
  plan: 'pro',
})

console.log(run.id)      // UUID
console.log(run.status)  // 'running'

// Check status at any time
const status = await onboarding.getRunStatus(run.id)
console.log(status.steps)
// {
//   'send-welcome-email':  { status: 'completed', durationMs: 342 }
//   'provision-workspace': { status: 'running', attempt: 1 }
//   'finalize':            { status: 'pending' }
// }

// Wait for completion (blocks until terminal)
const result = await onboarding.waitForRun(run.id, {
  timeout: 60_000,     // max wait time (default: 60s)
  pollInterval: 250,   // check interval (default: 250ms)
})
console.log(result.output.workspaceId) // 'ws_abc'

// Cancel a running workflow (cascades to all in-flight steps)
await onboarding.cancelRun(run.id)

// List runs with optional filter
const runs = await onboarding.listRuns({ status: 'failed', limit: 10 })

Parallel Fan-Out / Fan-In

Steps that share the same dependsOn parent run in parallel. Steps that depend on multiple parents wait for all of them (fan-in).

const etl = workflow({
  name: 'etl-pipeline',
  steps: [
    // Root step — runs first
    { id: 'extract', handler: async (ctx) => { /* ... */ } },

    // Steps 2 & 3 — fan-out: both depend only on 'extract', run in parallel
    {
      id: 'transform-users',
      dependsOn: ['extract'],
      handler: async (ctx) => { /* ... */ },
    },
    {
      id: 'transform-orders',
      dependsOn: ['extract'],
      handler: async (ctx) => { /* ... */ },
    },

    // Step 4 — fan-in: waits for BOTH transforms to complete
    {
      id: 'load',
      dependsOn: ['transform-users', 'transform-orders'],
      handler: async (ctx) => {
        const users = ctx.stepOutput('transform-users')
        const orders = ctx.stepOutput('transform-orders')
        await db.bulkInsert([...users, ...orders])
      },
    },
  ],
})

Step Output Piping

Each step's return value is persisted and accessible to downstream steps via ctx.stepOutput(stepId). This is how data flows through the DAG.

steps: [
  {
    id: 'fetch-data',
    handler: async (ctx) => {
      const data = await api.fetch(ctx.input.url)
      return { records: data.records, count: data.length }
    },
  },
  {
    id: 'process',
    dependsOn: ['fetch-data'],
    handler: async (ctx) => {
      // Type-safe access to the upstream step's output
      const upstream = ctx.stepOutput<{ records: any[]; count: number }>('fetch-data')
      ctx.log.info(`Processing ${upstream.count} records`)
      return await transform(upstream.records)
    },
  },
]

When a workflow has multiple sink steps (steps nobody depends on), the workflow's output is a map of { [sinkStepId]: output }. When there's exactly one sink, output is that step's return value directly.

Conditional Steps

Steps can be dynamically skipped at runtime using condition. Skipped steps are treated as completed — their dependents proceed normally.

{
  id: 'create-subscription',
  dependsOn: ['send-welcome-email'],
  condition: (ctx) => ctx.input.plan === 'pro',  // skip for free plan
  handler: async (ctx) => {
    const sub = await billing.createSubscription(ctx.input.userId)
    return { subscriptionId: sub.id }
  },
}

When the condition returns false, the step is marked "skipped" and any downstream steps that dependsOn it proceed as if it completed. ctx.stepOutput('create-subscription') returns undefined in downstream steps when the step was skipped.

Sub-Workflow Composition

A step can delegate to an entire other workflow instead of running a handler. The child workflow's output becomes the step's output.

const billingWorkflow = workflow({
  name: 'billing',
  steps: [
    { id: 'create-invoice', handler: async (ctx) => { /* ... */ } },
    { id: 'charge-card', dependsOn: ['create-invoice'], handler: async (ctx) => { /* ... */ } },
  ],
})

const onboarding = workflow({
  name: 'user-onboarding',
  steps: [
    { id: 'setup', handler: async (ctx) => { /* ... */ } },
    {
      id: 'billing',
      dependsOn: ['setup'],
      workflow: billingWorkflow,                        // Run as sub-workflow
      input: (ctx) => ({ userId: ctx.input.userId }),   // Map parent input → child input
    },
    {
      id: 'finalize',
      dependsOn: ['billing'],
      handler: async (ctx) => {
        // Output from the sub-workflow
        const billing = ctx.stepOutput('billing')
        return { completed: true, ...billing }
      },
    },
  ],
})

Sub-workflows have a max nesting depth of 20 to prevent runaway recursion. A workflow step that references its own workflow is rejected at definition time.

Retry Policies

OqronKit supports two retry scopes, configured per-workflow:

PolicyBehavior
"step" (default)Failed steps retry individually (up to retries per step). Other steps proceed.
"workflow"Any step failure halts the entire DAG. The coordinator re-runs the whole workflow from scratch (up to maxStepRetries total attempts).

Step-Level Retries

const resilient = workflow({
  name: 'resilient-pipeline',
  retryPolicy: 'step',  // default

  steps: [
    {
      id: 'flaky-api-call',
      retries: 3,        // This step retries up to 3 times
      timeout: 10_000,   // Per-attempt timeout
      handler: async (ctx) => {
        ctx.log.info(`Attempt ${ctx.attempt}`)
        return await externalApi.call(ctx.input)
      },
    },
    {
      id: 'always-works',
      dependsOn: ['flaky-api-call'],
      handler: async (ctx) => { /* ... */ },
    },
  ],
})

Workflow-Level Retries

const critical = workflow({
  name: 'critical-pipeline',
  retryPolicy: 'workflow',  // Re-run entire DAG on any step failure

  steps: [
    { id: 'step-a', handler: async (ctx) => { /* ... */ } },
    { id: 'step-b', dependsOn: ['step-a'], handler: async (ctx) => { /* ... */ } },
  ],
})

With retryPolicy: "workflow", each step gets 0 retries (fail fast). The coordinator tick detects the failed run and re-materializes the entire DAG fresh, incrementing run.attempt. This continues until maxStepRetries + 1 total attempts are exhausted.

Manual Retry

Retry a specific failed step (and its transitive descendants):

// Retry just the failed step — downstream steps re-run too
await onboarding.retryStep(run.id, 'provision-workspace')

// Retry the entire workflow from scratch
const newRun = await onboarding.retryRun(run.id)
console.log(newRun.attempt)  // 2

retryStep resets the target step and all downstream dependents, then re-publishes the step to the broker. retryRun tears down all existing step jobs and rebuilds the entire DAG from roots.

Lifecycle Hooks

const monitored = workflow({
  name: 'monitored-flow',
  steps: [ /* ... */ ],

  hooks: {
    onStepStart: (ctx, stepId) => {
      ctx.log.info(`Step started: ${stepId}`)
    },
    onStepComplete: (ctx, stepId) => {
      ctx.log.info(`Step completed: ${stepId}`)
      metrics.increment('workflow.step.success', { step: stepId })
    },
    onStepFail: (ctx, stepId, error) => {
      ctx.log.error(`Step failed: ${stepId} — ${error.message}`)
      alertOps(stepId, error)
    },
    onComplete: (ctx, output) => {
      ctx.log.info('Workflow complete', output)
    },
    onFail: (ctx, error) => {
      ctx.log.error('Workflow failed', error)
      alertSlack(`Workflow "${ctx.runId}" failed: ${error.message}`)
    },
  },
})

Timeouts

Timeouts can be set at the step level, workflow level, or globally via module config:

const timed = workflow({
  name: 'time-sensitive',
  timeout: 300_000,    // Entire workflow must finish in 5 minutes

  steps: [
    {
      id: 'fast-step',
      timeout: 10_000, // This step must finish in 10 seconds
      handler: async (ctx) => { /* ... */ },
    },
    {
      id: 'slow-step',
      dependsOn: ['fast-step'],
      timeout: 120_000, // 2 minutes for this step
      handler: async (ctx) => { /* ... */ },
    },
  ],
})

When a step times out, it fails like any other error and follows the retry policy. When a workflow times out, the coordinator cancels all in-flight steps and marks the run as failed with "Workflow timed out after Xms".

Step Context Reference

Every step handler receives a WorkflowStepContext:

interface WorkflowStepContext<TInput> {
  input: TInput               // Original trigger input
  stepId: string              // This step's id
  runId: string               // The workflow run id
  attempt: number             // Current attempt (1-based)
  signal: AbortSignal         // Fired on cancel/timeout

  log: WorkflowLogFn          // ctx.log.info('msg') or ctx.log('info', 'msg')
  progress: (pct, label?) => void  // Report progress (0–100)
  stepOutput: <T>(id) => T    // Read output of a completed step
}

Run Status Model

A workflow run progresses through these statuses:

StatusDescription
"pending"Created but waiting for a concurrency slot (maxConcurrentWorkflows)
"running"Active — steps are being executed
"completed"All steps completed or skipped successfully
"failed"A step failed and retries are exhausted
"cancelled"Explicitly cancelled via cancelRun()
"paused"Module is disabled

Each step within a run has its own status:

Step StatusDescription
"pending"Waiting for upstream dependencies
"ready"Dependencies met, queued for execution
"running"Handler is actively executing
"completed"Handler returned successfully
"failed"Handler threw, retries exhausted
"skipped"condition returned false

DAG Validation

The DAG is validated at definition time (when workflow() is called). Validation checks:

  • Non-empty step list
  • Unique step ids
  • Exactly one of handler or workflow per step
  • No self-dependencies
  • No references to unknown step ids
  • No circular dependencies (Kahn's algorithm topological sort)
  • No self-referencing sub-workflows (infinite recursion guard)

If any check fails, an error is thrown immediately — before the application starts processing.

Crash Safety

The Workflow module is fully crash-safe:

  1. Persisted step state: Every step transition (pending → running → completed) is persisted atomically to storage under a per-run distributed lock.
  2. Heartbeat locks: Running steps use the engine's heartbeat lock system. If a process crashes mid-step, the lock expires and the stall detector reclaims the job.
  3. DAG-driven promotion: Step promotion is event-driven via the engine's DependencyResolver.notifyChildren(). When a step job completes, its children are automatically promoted from waiting-children to waiting.
  4. Coordinator recovery: On startup, the coordinator tick scans for runs in "running" status and enforces timeouts or promotes pending steps.

Configuration Reference

workflow() Options

OptionTypeDefaultDescription
namestringrequiredUnique workflow identifier
descriptionstringundefinedHuman-readable description
timeoutnumber300_000Whole-workflow timeout in ms
retryPolicy"step" | "workflow""step"Retry scope
tagsstring[][]Tags for filtering
stepsWorkflowStepDef[]requiredThe DAG steps
hooksWorkflowHooksundefinedLifecycle hooks

Step Options

OptionTypeDefaultDescription
idstringrequiredUnique step identifier within the workflow
descriptionstringundefinedHuman-readable step description
dependsOnstring[][]Step ids that must complete first
condition(ctx) => booleanundefinedDynamic skip condition
timeoutnumberModule defaultPer-step execution timeout in ms
retriesnumberModule defaultPer-step max retry count
handler(ctx) => Promise<T>Step execution function (mutually exclusive with workflow)
workflowIWorkflowSub-workflow to run as this step (mutually exclusive with handler)
input(ctx) => anyctx.inputMap parent context to sub-workflow input

Module Configuration

Configure the workflow engine when initializing OqronKit:

import { OqronKit, workflowModule } from 'oqronkit'

await OqronKit.init({
  config: {
    modules: [
      workflowModule({
        maxStepRetries: 2,          // Default step retries (default: 2)
        stepTimeout: 120_000,       // Default per-step timeout (default: 120s)
        workflowTimeout: 300_000,   // Default workflow timeout (default: 5min)
        retryPolicy: 'step',        // Default retry scope (default: 'step')
        tickIntervalMs: 2_000,      // Coordinator tick interval (default: 2s)
        heartbeatMs: 5_000,         // Step poll interval (default: 5s)
        lockTtlMs: 30_000,          // Lock TTL for crash recovery (default: 30s)
        leaderElection: true,       // Only leader coordinates (default: true)
        maxConcurrentWorkflows: 50, // Global concurrency cap (default: unlimited)
      }),
    ],
  },
})

Architecture

The Workflow engine uses a dual-loop architecture (same pattern as Batch):

  1. Coordinator Tick (leader only): Enforces workflow timeouts, re-runs failed workflows under retryPolicy: "workflow", and promotes "pending" runs when concurrency slots free up.

  2. Poll Loop (all nodes): Claims step jobs from workflow:{name} broker topics and executes them through the shared executeJob infrastructure. DAG promotion is event-driven — when a step completes, DependencyResolver.notifyChildren() automatically promotes dependent steps.

┌─────────────┐     ┌──────────────┐     ┌───────────────┐
│  .trigger()  │────▶│  WorkflowRun │────▶│  Step Jobs    │
│              │     │  (Storage)   │     │  (Broker)     │
└─────────────┘     └──────────────┘     └───────────────┘
                      │ Coordinator       │ Poll Loop
                      │ (Leader)          │ (All Nodes)
                      ▼                   ▼
                    timeout +           stepWrapper()
                    retry +             ──▶ condition check
                    promote             ──▶ handler / sub-workflow
                                        ──▶ persist output
                                        ──▶ notifyChildren()

Next Steps

  • Task Queue — Simple background processing for independent jobs
  • Saga — Distributed transactions with compensation chains
  • Crash Safety — How heartbeat locks protect your jobs

On this page