Ingest
Event-driven stateful functions with step primitives
Ingest
The Ingest module provides ultra-fast, event-driven execution functions inspired by Inngest. Each function is composed of steps — durable, retryable units of work that survive crashes and restarts.
Roadmap Module — Ingest is a high-priority v1.x Enterprise module. This page documents the planned API.
Planned API
Define an Ingest Function
import { ingest } from 'oqronkit'
const onUserSignup = ingest({
name: 'user/signup',
event: 'user.created',
handler: async ({ event, step }) => {
// Step 1: Create account in billing system
const customer = await step.run('create-stripe-customer', async () => {
return await stripe.customers.create({
email: event.data.email,
name: event.data.name,
})
})
// Step 2: Wait 1 hour before sending welcome email
await step.sleep('wait-for-onboarding', '1h')
// Step 3: Send welcome email
await step.run('send-welcome-email', async () => {
await sendEmail({
to: event.data.email,
template: 'welcome',
vars: { name: event.data.name, customerId: customer.id },
})
})
// Step 4: Wait 3 days, then check engagement
await step.sleep('wait-for-engagement', '3d')
// Step 5: Check if user has completed onboarding
const profile = await step.run('check-onboarding', async () => {
return await db.users.findById(event.data.userId)
})
if (!profile.onboardingComplete) {
await step.run('send-nudge-email', async () => {
await sendEmail({
to: event.data.email,
template: 'onboarding-nudge',
})
})
}
return { customerId: customer.id, onboarded: profile.onboardingComplete }
},
})Send Events
import { send } from 'oqronkit'
// From your API route
app.post('/api/signup', async (req, res) => {
const user = await db.users.create(req.body)
// Fire the event — all matching ingest functions execute
await send({
name: 'user.created',
data: {
userId: user.id,
email: user.email,
name: user.name,
},
})
res.json({ userId: user.id })
})Step Primitives
| Primitive | Purpose | Example |
|---|---|---|
step.run(id, fn) | Execute a durable, retryable unit of work | DB writes, API calls |
step.sleep(id, duration) | Pause execution for a duration | '1h', '3d', '30s' |
step.sleepUntil(id, date) | Pause until a specific timestamp | new Date('2026-12-25') |
step.invoke(id, fnRef, data) | Call another ingest function and wait | Sub-workflows |
step.waitForEvent(id, opts) | Pause until a matching event arrives | Approval flows |
How Steps Work
Each step is individually durable:
Function starts
→ step.run('create-customer') ✅ saved to storage
→ step.sleep('wait-1h') ⏸️ function suspends
... 1 hour passes ...
→ function resumes
→ step.run('send-email') ✅ saved to storage
→ step.sleep('wait-3d') ⏸️ function suspends
... 3 days pass ...
→ function resumes
→ step.run('check-profile') ✅ saved to storage
→ function completesIf the process crashes between steps, OqronKit resumes from the last completed step — previous results are replayed from storage, not re-executed.
Fan-out
const processOrder = ingest({
name: 'order/process',
event: 'order.created',
handler: async ({ event, step }) => {
// Fan-out: invoke multiple functions in parallel
const [inventory, payment, notification] = await Promise.all([
step.invoke('reserve-inventory', reserveInventoryFn, event.data),
step.invoke('charge-payment', chargePaymentFn, event.data),
step.invoke('send-confirmation', sendConfirmationFn, event.data),
])
return { inventory, payment, notification }
},
})Wait for External Events
const approvalFlow = ingest({
name: 'expense/approval',
event: 'expense.submitted',
handler: async ({ event, step }) => {
// Notify manager
await step.run('notify-manager', async () => {
await sendSlack(event.data.managerId, `Expense $${event.data.amount} needs approval`)
})
// Wait up to 48 hours for approval event
const approval = await step.waitForEvent('wait-for-approval', {
event: 'expense.approved',
match: 'data.expenseId', // Match on expenseId field
timeout: '48h',
})
if (!approval) {
await step.run('auto-escalate', async () => {
await escalateToVP(event.data)
})
return { status: 'escalated' }
}
await step.run('process-reimbursement', async () => {
await reimburse(event.data)
})
return { status: 'approved', approvedBy: approval.data.approver }
},
})Configuration
const fn = ingest({
name: 'heavy-processing',
event: 'file.uploaded',
// Function-level config
concurrency: 10, // Max parallel executions
retries: 3, // Max retries per step
timeout: '30m', // Max total function duration
// Batching: accumulate events before processing
batch: {
maxSize: 100,
timeout: '5s',
},
// Rate limiting
rateLimit: {
key: 'event.data.userId',
limit: 10,
period: '1m',
},
handler: async ({ events, step }) => {
// With batching, `events` is an array
for (const event of events) {
await step.run(`process-${event.data.fileId}`, async () => {
await processFile(event.data)
})
}
},
})Next Steps
- Scheduler — Time-based job scheduling
- Pipeline — Streaming ETL with backpressure
- Crash Safety — How steps survive process crashes