OqronKitOqronKit

Ingest

Event-driven stateful functions with step primitives

Ingest

The Ingest module provides ultra-fast, event-driven execution functions inspired by Inngest. Each function is composed of steps — durable, retryable units of work that survive crashes and restarts.

Roadmap Module — Ingest is a high-priority v1.x Enterprise module. This page documents the planned API.

Planned API

Define an Ingest Function

import { ingest } from 'oqronkit'

const onUserSignup = ingest({
  name: 'user/signup',
  event: 'user.created',

  handler: async ({ event, step }) => {
    // Step 1: Create account in billing system
    const customer = await step.run('create-stripe-customer', async () => {
      return await stripe.customers.create({
        email: event.data.email,
        name: event.data.name,
      })
    })

    // Step 2: Wait 1 hour before sending welcome email
    await step.sleep('wait-for-onboarding', '1h')

    // Step 3: Send welcome email
    await step.run('send-welcome-email', async () => {
      await sendEmail({
        to: event.data.email,
        template: 'welcome',
        vars: { name: event.data.name, customerId: customer.id },
      })
    })

    // Step 4: Wait 3 days, then check engagement
    await step.sleep('wait-for-engagement', '3d')

    // Step 5: Check if user has completed onboarding
    const profile = await step.run('check-onboarding', async () => {
      return await db.users.findById(event.data.userId)
    })

    if (!profile.onboardingComplete) {
      await step.run('send-nudge-email', async () => {
        await sendEmail({
          to: event.data.email,
          template: 'onboarding-nudge',
        })
      })
    }

    return { customerId: customer.id, onboarded: profile.onboardingComplete }
  },
})

Send Events

import { send } from 'oqronkit'

// From your API route
app.post('/api/signup', async (req, res) => {
  const user = await db.users.create(req.body)

  // Fire the event — all matching ingest functions execute
  await send({
    name: 'user.created',
    data: {
      userId: user.id,
      email: user.email,
      name: user.name,
    },
  })

  res.json({ userId: user.id })
})

Step Primitives

PrimitivePurposeExample
step.run(id, fn)Execute a durable, retryable unit of workDB writes, API calls
step.sleep(id, duration)Pause execution for a duration'1h', '3d', '30s'
step.sleepUntil(id, date)Pause until a specific timestampnew Date('2026-12-25')
step.invoke(id, fnRef, data)Call another ingest function and waitSub-workflows
step.waitForEvent(id, opts)Pause until a matching event arrivesApproval flows

How Steps Work

Each step is individually durable:

Function starts
  → step.run('create-customer') ✅ saved to storage
  → step.sleep('wait-1h')       ⏸️ function suspends
  ... 1 hour passes ...
  → function resumes
  → step.run('send-email')      ✅ saved to storage
  → step.sleep('wait-3d')       ⏸️ function suspends
  ... 3 days pass ...
  → function resumes
  → step.run('check-profile')   ✅ saved to storage
  → function completes

If the process crashes between steps, OqronKit resumes from the last completed step — previous results are replayed from storage, not re-executed.

Fan-out

const processOrder = ingest({
  name: 'order/process',
  event: 'order.created',

  handler: async ({ event, step }) => {
    // Fan-out: invoke multiple functions in parallel
    const [inventory, payment, notification] = await Promise.all([
      step.invoke('reserve-inventory', reserveInventoryFn, event.data),
      step.invoke('charge-payment', chargePaymentFn, event.data),
      step.invoke('send-confirmation', sendConfirmationFn, event.data),
    ])

    return { inventory, payment, notification }
  },
})

Wait for External Events

const approvalFlow = ingest({
  name: 'expense/approval',
  event: 'expense.submitted',

  handler: async ({ event, step }) => {
    // Notify manager
    await step.run('notify-manager', async () => {
      await sendSlack(event.data.managerId, `Expense $${event.data.amount} needs approval`)
    })

    // Wait up to 48 hours for approval event
    const approval = await step.waitForEvent('wait-for-approval', {
      event: 'expense.approved',
      match: 'data.expenseId',  // Match on expenseId field
      timeout: '48h',
    })

    if (!approval) {
      await step.run('auto-escalate', async () => {
        await escalateToVP(event.data)
      })
      return { status: 'escalated' }
    }

    await step.run('process-reimbursement', async () => {
      await reimburse(event.data)
    })

    return { status: 'approved', approvedBy: approval.data.approver }
  },
})

Configuration

const fn = ingest({
  name: 'heavy-processing',
  event: 'file.uploaded',

  // Function-level config
  concurrency: 10,          // Max parallel executions
  retries: 3,               // Max retries per step
  timeout: '30m',           // Max total function duration

  // Batching: accumulate events before processing
  batch: {
    maxSize: 100,
    timeout: '5s',
  },

  // Rate limiting
  rateLimit: {
    key: 'event.data.userId',
    limit: 10,
    period: '1m',
  },

  handler: async ({ events, step }) => {
    // With batching, `events` is an array
    for (const event of events) {
      await step.run(`process-${event.data.fileId}`, async () => {
        await processFile(event.data)
      })
    }
  },
})

Next Steps

On this page