OqronKitOqronKit

Webhook

Fan-out event distribution with HMAC signing and glob matching

Webhook

OqronKit's Webhook module handles fan-out event distribution with HMAC signing, glob pattern matching, circuit breakers, and a dead letter queue.

Working example → See a complete Webhook implementation with multi-endpoint dispatch, glob event matching, HMAC signing, and circuit breakers: apps/backend/src/triggers/webhooks.ts

How It Works

The Webhook module uses the same poll-based architecture as the Queue engine, but adds fan-out routing, HMAC signing, circuit breakers, and per-endpoint rate limiting.

Delivery Lifecycle

.fire(eventName, payload)

  ├─ 1. Match event against endpoint globs
  │    "order.payment.completed" matches:
  │      ✓ "order.**"  (AcmeCorp)
  │      ✗ "user.signup" (AcmeCorp)
  │      ✗ "user.*.activated" (Analytics)

  ├─ 2. For EACH matched endpoint:
  │    ├─ Create delivery job record in Storage
  │    │   (url, method, headers, body, endpointName)
  │    └─ Publish job ID to Broker

  │  ┌──── Poll Loop (per dispatcher) ───────────────────┐
  │  │                                                    │
  ├──┤ 3. Claim jobs from Broker                          │
  │  │    (respects concurrency + throttle gate)          │
  │  └────────────────────────────────────────────────────┘

  ├─ 4. Acquire heartbeat lock
  ├─ 5. Resolve endpoint config (static or dynamic)
  ├─ 6. Sign payload with HMAC (if security configured)
  │    ├─ Compute signature: HMAC-SHA256(body, secret)
  │    └─ Attach X-Oqron-Signature + X-Oqron-Timestamp

  ├─ 7. Check outbound rate limiter (per-endpoint)
  │    Blocked? → Re-queue with delay

  ├─ 8. Check circuit breaker (per-endpoint)
  │    Open? → Re-queue with 15s delay

  ├─ 9. HTTP delivery (POST/PUT with timeout)
  │    ├─ Uses AbortSignal for cancellation
  │    └─ Respects Retry-After header

  ├─ 2xx SUCCESS:
  │    ├─ Record circuit breaker success
  │    ├─ Mark job "completed", run onSuccess hook
  │    └─ Ack in Broker

  └─ 4xx/5xx FAILURE:
       ├─ Record circuit breaker failure
       ├─ Retryable status? (5xx, 429, 408)
       │   YES → Calculate backoff (exponential/fixed)
       │         Respect Retry-After header if present
       │         Nack to Broker with delay
       │   NO  → Hard fail (e.g., 401, 404)
       └─ All retries exhausted? → DLQ + onDead hook

Fan-Out Pattern

A single .fire() call can create multiple delivery jobs — one per matched endpoint. Each delivery is an independent job with its own retries, circuit breaker state, and DLQ entry.

fire("order.payment.completed", { orderId: "123" })

  ├─ → Job 1: POST https://api.acme.com/webhooks  (matched "order.**")
  ├─ → Job 2: POST https://data.internal/ingest    (matched "order.*")
  └─ → Job 3: POST https://partner.com/hooks       (matched "*")

Circuit Breaker States

StateBehavior
ClosedNormal delivery — failures counted
OpenAll deliveries re-queued — endpoint presumed down
Half-OpenOne probe delivery allowed — success resets to Closed

The circuit breaker is per-endpoint and shared across the cluster when using Redis/Postgres adapters.

Basic Usage

triggers/webhooks.ts
import { webhook } from 'oqronkit'

export const platformWebhooks = webhook({
  name: 'platform-events',
  concurrency: 10,

  retries: {
    max: 3,
    strategy: 'exponential',
    baseDelay: 2000,
  },

  security() {
    return {
      signingSecret: process.env.WEBHOOK_SECRET!,
    }
  },

  async endpoints() {
    return [
      {
        name: 'AcmeCorp Integration',
        url: 'https://api.acme.com/v1/webhooks/receive',
        events: ['order.**', 'user.signup'], // Glob matching
        security: {
          signingAlgorithm: 'sha256',
          signingSecret: process.env.ACME_SECRET!,
          signingHeader: 'x-acme-signature',
        },
      },
      {
        name: 'Internal Analytics',
        url: 'https://data.internal.svc/ingest',
        events: ['user.*.activated', 'system.health'],
      },
    ]
  },
})

Fire Events

// Matches AcmeCorp (order.**) — ignored by Internal Analytics
platformWebhooks.fire('order.payment.completed', {
  orderId: 'ord_123',
  amount: 450.0,
  currency: 'USD',
})

// Matches AcmeCorp (user.signup)
platformWebhooks.fire('user.signup', {
  userId: 'usr_abc',
  email: 'newuser@example.com',
})

// Matches Internal Analytics (user.*.activated)
platformWebhooks.fire('user.onboarding.activated', {
  userId: 'usr_abc',
  timestamp: Date.now(),
})

Dynamic CRUD

import { createWebhook, updateWebhook, deleteWebhook, pauseWebhook, resumeWebhook, resendWebhook } from 'oqronkit'

// Create at runtime
await createWebhook({ /* ... */ })

// Pause/Resume
await pauseWebhook('webhook-id')
await resumeWebhook('webhook-id')

// Resend a failed delivery
await resendWebhook('delivery-id')

v0.0.2 improvements:

  • Abort signal support — Cancelling an active webhook job via cancelJob() now immediately aborts the in-flight HTTP request, rather than waiting for the full timeout.
  • Resend clears delayresendWebhook() now clears runAt and opts.delay on the cloned job, ensuring immediate re-dispatch instead of inheriting the original schedule.

Signature Verification

import { verifyWebhookSignature, signWebhookPayload } from 'oqronkit'

// Verify incoming webhook (consumer side)
const isValid = verifyWebhookSignature(payload, signature, secret, 'sha256')

// Sign outgoing payload (producer side)
const sig = signWebhookPayload(payload, secret, 'sha256')

Circuit Breaker

import { createCircuitBreaker } from 'oqronkit'

const breaker = createCircuitBreaker({
  failureThreshold: 5,
  resetTimeout: 30_000,
})

Throttle

Cap the delivery rate to avoid overwhelming endpoints:

const rateLimitedWebhooks = webhook({
  name: 'api-events',
  concurrency: 5,
  throttle: { max: 100, duration: 60_000 }, // 100 deliveries per minute

  endpoints: [
    { name: 'Partner API', url: 'https://partner.com/webhooks', events: ['*'] },
  ],
})

Next Steps

  • Rate Limiter — Protect webhook dispatch with rate limits
  • Task Queue — Use queues for reliable background processing

On this page