Workflow (DAG)
Multi-step workflows as Directed Acyclic Graphs — parallel fan-out/fan-in, conditional branching, step output piping, and sub-workflow composition
Workflow (DAG)
The Workflow module models multi-step processes as Directed Acyclic Graphs (DAGs). Steps may run in parallel or sequence, pass output to downstream steps, branch conditionally, and compose sub-workflows. All step state is persisted — workflows can be paused, inspected, and resumed after a crash.
When to use Workflow vs Queue: Use a Queue when you have independent jobs. Use a Workflow when you have multiple steps that depend on each other — when step B needs the output of step A, or when steps C and D should run in parallel after B completes.
Basic Usage
import { workflow } from 'oqronkit'
interface OnboardInput { userId: string; email: string; plan: 'free' | 'pro' }
interface OnboardOutput { workspaceId: string; subscriptionId: string }
export const onboarding = workflow<OnboardInput, OnboardOutput>({
name: 'user-onboarding',
description: 'Full onboarding pipeline after signup',
steps: [
{
id: 'send-welcome-email',
handler: async (ctx) => {
await mailer.sendWelcome(ctx.input.email)
return { sent: true }
},
},
{
id: 'provision-workspace',
dependsOn: ['send-welcome-email'],
handler: async (ctx) => {
const ws = await workspace.create({ userId: ctx.input.userId })
return { workspaceId: ws.id }
},
},
{
id: 'finalize',
dependsOn: ['provision-workspace'],
handler: async (ctx) => {
const wsId = ctx.stepOutput<{ workspaceId: string }>('provision-workspace').workspaceId
await analytics.track('user.onboarded', { userId: ctx.input.userId })
return { workspaceId: wsId, subscriptionId: 'free' }
},
},
],
})Triggering & Monitoring
import { onboarding } from './triggers/onboarding'
// Start a new run
const run = await onboarding.trigger({
userId: 'u_123',
email: 'tiger@example.com',
plan: 'pro',
})
console.log(run.id) // UUID
console.log(run.status) // 'running'
// Check status at any time
const status = await onboarding.getRunStatus(run.id)
console.log(status.steps)
// {
// 'send-welcome-email': { status: 'completed', durationMs: 342 }
// 'provision-workspace': { status: 'running', attempt: 1 }
// 'finalize': { status: 'pending' }
// }
// Wait for completion (blocks until terminal)
const result = await onboarding.waitForRun(run.id, {
timeout: 60_000, // max wait time (default: 60s)
pollInterval: 250, // check interval (default: 250ms)
})
console.log(result.output.workspaceId) // 'ws_abc'
// Cancel a running workflow (cascades to all in-flight steps)
await onboarding.cancelRun(run.id)
// List runs with optional filter
const runs = await onboarding.listRuns({ status: 'failed', limit: 10 })Parallel Fan-Out / Fan-In
Steps that share the same dependsOn parent run in parallel. Steps that depend on multiple parents wait for all of them (fan-in).
const etl = workflow({
name: 'etl-pipeline',
steps: [
// Root step — runs first
{ id: 'extract', handler: async (ctx) => { /* ... */ } },
// Steps 2 & 3 — fan-out: both depend only on 'extract', run in parallel
{
id: 'transform-users',
dependsOn: ['extract'],
handler: async (ctx) => { /* ... */ },
},
{
id: 'transform-orders',
dependsOn: ['extract'],
handler: async (ctx) => { /* ... */ },
},
// Step 4 — fan-in: waits for BOTH transforms to complete
{
id: 'load',
dependsOn: ['transform-users', 'transform-orders'],
handler: async (ctx) => {
const users = ctx.stepOutput('transform-users')
const orders = ctx.stepOutput('transform-orders')
await db.bulkInsert([...users, ...orders])
},
},
],
})Step Output Piping
Each step's return value is persisted and accessible to downstream steps via ctx.stepOutput(stepId). This is how data flows through the DAG.
steps: [
{
id: 'fetch-data',
handler: async (ctx) => {
const data = await api.fetch(ctx.input.url)
return { records: data.records, count: data.length }
},
},
{
id: 'process',
dependsOn: ['fetch-data'],
handler: async (ctx) => {
// Type-safe access to the upstream step's output
const upstream = ctx.stepOutput<{ records: any[]; count: number }>('fetch-data')
ctx.log.info(`Processing ${upstream.count} records`)
return await transform(upstream.records)
},
},
]When a workflow has multiple sink steps (steps nobody depends on), the workflow's output is a map of { [sinkStepId]: output }. When there's exactly one sink, output is that step's return value directly.
Conditional Steps
Steps can be dynamically skipped at runtime using condition. Skipped steps are treated as completed — their dependents proceed normally.
{
id: 'create-subscription',
dependsOn: ['send-welcome-email'],
condition: (ctx) => ctx.input.plan === 'pro', // skip for free plan
handler: async (ctx) => {
const sub = await billing.createSubscription(ctx.input.userId)
return { subscriptionId: sub.id }
},
}When the condition returns false, the step is marked "skipped" and any downstream steps that dependsOn it proceed as if it completed. ctx.stepOutput('create-subscription') returns undefined in downstream steps when the step was skipped.
Sub-Workflow Composition
A step can delegate to an entire other workflow instead of running a handler. The child workflow's output becomes the step's output.
const billingWorkflow = workflow({
name: 'billing',
steps: [
{ id: 'create-invoice', handler: async (ctx) => { /* ... */ } },
{ id: 'charge-card', dependsOn: ['create-invoice'], handler: async (ctx) => { /* ... */ } },
],
})
const onboarding = workflow({
name: 'user-onboarding',
steps: [
{ id: 'setup', handler: async (ctx) => { /* ... */ } },
{
id: 'billing',
dependsOn: ['setup'],
workflow: billingWorkflow, // Run as sub-workflow
input: (ctx) => ({ userId: ctx.input.userId }), // Map parent input → child input
},
{
id: 'finalize',
dependsOn: ['billing'],
handler: async (ctx) => {
// Output from the sub-workflow
const billing = ctx.stepOutput('billing')
return { completed: true, ...billing }
},
},
],
})Sub-workflows have a max nesting depth of 20 to prevent runaway recursion. A workflow step that references its own workflow is rejected at definition time.
Retry Policies
OqronKit supports two retry scopes, configured per-workflow:
| Policy | Behavior |
|---|---|
"step" (default) | Failed steps retry individually (up to retries per step). Other steps proceed. |
"workflow" | Any step failure halts the entire DAG. The coordinator re-runs the whole workflow from scratch (up to maxStepRetries total attempts). |
Step-Level Retries
const resilient = workflow({
name: 'resilient-pipeline',
retryPolicy: 'step', // default
steps: [
{
id: 'flaky-api-call',
retries: 3, // This step retries up to 3 times
timeout: 10_000, // Per-attempt timeout
handler: async (ctx) => {
ctx.log.info(`Attempt ${ctx.attempt}`)
return await externalApi.call(ctx.input)
},
},
{
id: 'always-works',
dependsOn: ['flaky-api-call'],
handler: async (ctx) => { /* ... */ },
},
],
})Workflow-Level Retries
const critical = workflow({
name: 'critical-pipeline',
retryPolicy: 'workflow', // Re-run entire DAG on any step failure
steps: [
{ id: 'step-a', handler: async (ctx) => { /* ... */ } },
{ id: 'step-b', dependsOn: ['step-a'], handler: async (ctx) => { /* ... */ } },
],
})With retryPolicy: "workflow", each step gets 0 retries (fail fast). The coordinator tick detects the failed run and re-materializes the entire DAG fresh, incrementing run.attempt. This continues until maxStepRetries + 1 total attempts are exhausted.
Manual Retry
Retry a specific failed step (and its transitive descendants):
// Retry just the failed step — downstream steps re-run too
await onboarding.retryStep(run.id, 'provision-workspace')
// Retry the entire workflow from scratch
const newRun = await onboarding.retryRun(run.id)
console.log(newRun.attempt) // 2retryStep resets the target step and all downstream dependents, then re-publishes the step to the broker. retryRun tears down all existing step jobs and rebuilds the entire DAG from roots.
Lifecycle Hooks
const monitored = workflow({
name: 'monitored-flow',
steps: [ /* ... */ ],
hooks: {
onStepStart: (ctx, stepId) => {
ctx.log.info(`Step started: ${stepId}`)
},
onStepComplete: (ctx, stepId) => {
ctx.log.info(`Step completed: ${stepId}`)
metrics.increment('workflow.step.success', { step: stepId })
},
onStepFail: (ctx, stepId, error) => {
ctx.log.error(`Step failed: ${stepId} — ${error.message}`)
alertOps(stepId, error)
},
onComplete: (ctx, output) => {
ctx.log.info('Workflow complete', output)
},
onFail: (ctx, error) => {
ctx.log.error('Workflow failed', error)
alertSlack(`Workflow "${ctx.runId}" failed: ${error.message}`)
},
},
})Timeouts
Timeouts can be set at the step level, workflow level, or globally via module config:
const timed = workflow({
name: 'time-sensitive',
timeout: 300_000, // Entire workflow must finish in 5 minutes
steps: [
{
id: 'fast-step',
timeout: 10_000, // This step must finish in 10 seconds
handler: async (ctx) => { /* ... */ },
},
{
id: 'slow-step',
dependsOn: ['fast-step'],
timeout: 120_000, // 2 minutes for this step
handler: async (ctx) => { /* ... */ },
},
],
})When a step times out, it fails like any other error and follows the retry policy. When a workflow times out, the coordinator cancels all in-flight steps and marks the run as failed with "Workflow timed out after Xms".
Step Context Reference
Every step handler receives a WorkflowStepContext:
interface WorkflowStepContext<TInput> {
input: TInput // Original trigger input
stepId: string // This step's id
runId: string // The workflow run id
attempt: number // Current attempt (1-based)
signal: AbortSignal // Fired on cancel/timeout
log: WorkflowLogFn // ctx.log.info('msg') or ctx.log('info', 'msg')
progress: (pct, label?) => void // Report progress (0–100)
stepOutput: <T>(id) => T // Read output of a completed step
}Run Status Model
A workflow run progresses through these statuses:
| Status | Description |
|---|---|
"pending" | Created but waiting for a concurrency slot (maxConcurrentWorkflows) |
"running" | Active — steps are being executed |
"completed" | All steps completed or skipped successfully |
"failed" | A step failed and retries are exhausted |
"cancelled" | Explicitly cancelled via cancelRun() |
"paused" | Module is disabled |
Each step within a run has its own status:
| Step Status | Description |
|---|---|
"pending" | Waiting for upstream dependencies |
"ready" | Dependencies met, queued for execution |
"running" | Handler is actively executing |
"completed" | Handler returned successfully |
"failed" | Handler threw, retries exhausted |
"skipped" | condition returned false |
DAG Validation
The DAG is validated at definition time (when workflow() is called). Validation checks:
- Non-empty step list
- Unique step ids
- Exactly one of
handlerorworkflowper step - No self-dependencies
- No references to unknown step ids
- No circular dependencies (Kahn's algorithm topological sort)
- No self-referencing sub-workflows (infinite recursion guard)
If any check fails, an error is thrown immediately — before the application starts processing.
Crash Safety
The Workflow module is fully crash-safe:
- Persisted step state: Every step transition (pending → running → completed) is persisted atomically to storage under a per-run distributed lock.
- Heartbeat locks: Running steps use the engine's heartbeat lock system. If a process crashes mid-step, the lock expires and the stall detector reclaims the job.
- DAG-driven promotion: Step promotion is event-driven via the engine's
DependencyResolver.notifyChildren(). When a step job completes, its children are automatically promoted fromwaiting-childrentowaiting. - Coordinator recovery: On startup, the coordinator tick scans for runs in
"running"status and enforces timeouts or promotes pending steps.
Configuration Reference
workflow() Options
| Option | Type | Default | Description |
|---|---|---|---|
name | string | required | Unique workflow identifier |
description | string | undefined | Human-readable description |
timeout | number | 300_000 | Whole-workflow timeout in ms |
retryPolicy | "step" | "workflow" | "step" | Retry scope |
tags | string[] | [] | Tags for filtering |
steps | WorkflowStepDef[] | required | The DAG steps |
hooks | WorkflowHooks | undefined | Lifecycle hooks |
Step Options
| Option | Type | Default | Description |
|---|---|---|---|
id | string | required | Unique step identifier within the workflow |
description | string | undefined | Human-readable step description |
dependsOn | string[] | [] | Step ids that must complete first |
condition | (ctx) => boolean | undefined | Dynamic skip condition |
timeout | number | Module default | Per-step execution timeout in ms |
retries | number | Module default | Per-step max retry count |
handler | (ctx) => Promise<T> | — | Step execution function (mutually exclusive with workflow) |
workflow | IWorkflow | — | Sub-workflow to run as this step (mutually exclusive with handler) |
input | (ctx) => any | ctx.input | Map parent context to sub-workflow input |
Module Configuration
Configure the workflow engine when initializing OqronKit:
import { OqronKit, workflowModule } from 'oqronkit'
await OqronKit.init({
config: {
modules: [
workflowModule({
maxStepRetries: 2, // Default step retries (default: 2)
stepTimeout: 120_000, // Default per-step timeout (default: 120s)
workflowTimeout: 300_000, // Default workflow timeout (default: 5min)
retryPolicy: 'step', // Default retry scope (default: 'step')
tickIntervalMs: 2_000, // Coordinator tick interval (default: 2s)
heartbeatMs: 5_000, // Step poll interval (default: 5s)
lockTtlMs: 30_000, // Lock TTL for crash recovery (default: 30s)
leaderElection: true, // Only leader coordinates (default: true)
maxConcurrentWorkflows: 50, // Global concurrency cap (default: unlimited)
}),
],
},
})Architecture
The Workflow engine uses a dual-loop architecture (same pattern as Batch):
-
Coordinator Tick (leader only): Enforces workflow timeouts, re-runs failed workflows under
retryPolicy: "workflow", and promotes"pending"runs when concurrency slots free up. -
Poll Loop (all nodes): Claims step jobs from
workflow:{name}broker topics and executes them through the sharedexecuteJobinfrastructure. DAG promotion is event-driven — when a step completes,DependencyResolver.notifyChildren()automatically promotes dependent steps.
┌─────────────┐ ┌──────────────┐ ┌───────────────┐
│ .trigger() │────▶│ WorkflowRun │────▶│ Step Jobs │
│ │ │ (Storage) │ │ (Broker) │
└─────────────┘ └──────────────┘ └───────────────┘
│ Coordinator │ Poll Loop
│ (Leader) │ (All Nodes)
▼ ▼
timeout + stepWrapper()
retry + ──▶ condition check
promote ──▶ handler / sub-workflow
──▶ persist output
──▶ notifyChildren()Next Steps
- Task Queue — Simple background processing for independent jobs
- Saga — Distributed transactions with compensation chains
- Crash Safety — How heartbeat locks protect your jobs