OqronKitOqronKit

Adapters

Storage, Broker, and Lock adapter architecture — built-in and custom

Adapters

OqronKit's adapter system provides swappable persistence backends. The same job definitions work identically across in-memory, Redis, and PostgreSQL — zero code changes required. Need RabbitMQ for your broker? DynamoDB for storage? Implement the interface, plug it in.

Three Adapter Interfaces

Every OqronKit deployment requires exactly three adapters:

InterfaceResponsibilityCore Methods
IStorageEngineJob records, schedules, history, bufferssave, get, list, delete, count, prune
IBrokerEngineJob signaling, queue transport, claim/ackpublish, claim, ack, nack, pause, resume
ILockAdapterDistributed mutual exclusion, leader electionacquire, release, renew, isOwner

Storage Modes

Use the mode config to select which adapter combination to use:

ModeStorageBrokerLockRequires
"default"MemoryMemoryMemoryNothing
"redis"RedisRedisRedisredis config
"db"PostgreSQLPostgreSQLPostgreSQLpostgres config
"hybrid-db"PostgreSQLRedisRedisBoth configs
"custom"Your choiceYour choiceYour choiceadapters config

In-Memory (Development)

Zero-dependency. Ships built-in. Perfect for local development and testing.

await OqronKit.init({
  config: {
    // mode: 'default' is implicit — no adapter config needed
    modules: [queueModule(), cronModule()],
  },
})

Redis (Production)

High-throughput adapter using Redis Sorted Sets for the broker and Redlock-style distributed locking.

await OqronKit.init({
  config: {
    mode: 'redis',
    redis: 'redis://localhost:6379',
    modules: [queueModule(), cronModule()],
  },
})

PostgreSQL (Production)

ACID-compliant adapter using JSONB + GIN indexes for storage and PostgreSQL Advisory Locks.

await OqronKit.init({
  config: {
    mode: 'db',
    postgres: {
      connectionString: 'postgresql://user:pass@localhost:5432/mydb',
      tablePrefix: 'oqron',  // optional, default: "oqron"
      poolSize: 10,           // optional, default: 10
    },
    modules: [queueModule(), cronModule()],
  },
})

Hybrid (PG + Redis)

PostgreSQL for durable storage, Redis for fast broker + lock. Best of both worlds.

await OqronKit.init({
  config: {
    mode: 'hybrid-db',
    postgres: { connectionString: 'postgresql://...' },
    redis: 'redis://localhost:6379',
    modules: [queueModule(), cronModule()],
  },
})

Custom Adapters

Use mode: "custom" to plug in your own adapter implementations.

Using Adapter Factory Functions

OqronKit provides factory functions to create individual adapters. Mix built-in adapters with your own:

import {
  OqronKit,
  createStorageAdapter,
  createBrokerAdapter,
  createLockAdapter,
} from 'oqronkit'

// Create individual adapters — mix and match freely
const storage = await createStorageAdapter({
  type: 'postgres',
  postgres: { connectionString: 'postgresql://...' },
})

const lock = await createLockAdapter({
  type: 'redis',
  redis: 'redis://localhost:6379',
})

await OqronKit.init({
  config: {
    mode: 'custom',
    adapters: {
      storage,                        // PostgreSQL
      broker: new MyRabbitBroker(),    // Your custom RabbitMQ adapter
      lock,                           // Redis
    },
    modules: [queueModule(), cronModule()],
  },
})

Using createAdapters (All at Once)

Create all three adapters from a single config:

import { OqronKit, createAdapters } from 'oqronkit'

const adapters = await createAdapters({
  mode: 'redis',
  redis: 'redis://localhost:6379',
})

await OqronKit.init({
  config: {
    mode: 'custom',
    adapters,
    modules: [queueModule(), cronModule()],
  },
})

// Later, on shutdown:
await adapters.close()

Implementing a Custom Adapter (Class-Based)

Implement the interface directly as a class:

import type { IBrokerEngine, BrokerStrategy } from 'oqronkit'

class RabbitMQBroker implements IBrokerEngine {
  async publish(queue: string, jobId: string, delay?: number): Promise<void> {
    await this.channel.sendToQueue(queue, Buffer.from(jobId), {
      headers: delay ? { 'x-delay': delay } : undefined,
    })
  }

  async claim(
    queue: string,
    consumerId: string,
    limit: number,
    lockTtlMs: number,
    strategy?: BrokerStrategy,
  ): Promise<string[]> {
    const messages = await this.channel.get(queue, { noAck: false })
    // ... return job IDs
  }

  async ack(queue: string, jobId: string): Promise<void> { /* ... */ }
  async nack(queue: string, jobId: string, delay?: number): Promise<void> { /* ... */ }
  async pause(queue: string): Promise<void> { /* ... */ }
  async resume(queue: string): Promise<void> { /* ... */ }
  async extendLock(
    jobId: string,
    consumerId: string,
    ttlMs: number,
    queue?: string,
  ): Promise<void> { /* ... */ }
}

Implementing a Custom Adapter (Factory Functions)

Use createStorage, createBroker, and createLock to build adapters from plain objects — no class boilerplate needed:

import { createStorage, createBroker, createLock } from 'oqronkit'

createStorage(impl)

Build a custom IStorageEngine:

const dynamoStorage = createStorage({
  async save(namespace, id, data) {
    await dynamo.putItem({
      TableName: namespace,
      Item: { id, data: JSON.stringify(data) },
    })
  },
  async get(namespace, id) {
    const result = await dynamo.getItem({ TableName: namespace, Key: { id } })
    return result.Item ? JSON.parse(result.Item.data) : null
  },
  async list(namespace, filter, opts) {
    const result = await dynamo.scan({ TableName: namespace })
    return result.Items?.map(i => JSON.parse(i.data)) ?? []
  },
  async delete(namespace, id) {
    await dynamo.deleteItem({ TableName: namespace, Key: { id } })
  },
  async prune(namespace, beforeMs) {
    // Delete records older than beforeMs
    return 0
  },
  async count(namespace, filter) {
    const result = await dynamo.scan({ TableName: namespace, Select: 'COUNT' })
    return result.Count ?? 0
  },
})

createBroker(impl)

Build a custom IBrokerEngine. claimBlocking is optional.

const sqsBroker = createBroker({
  async publish(brokerName, id, delayMs, priority) {
    await sqs.sendMessage({
      QueueUrl: queues[brokerName],
      MessageBody: id,
      DelaySeconds: delayMs ? Math.ceil(delayMs / 1000) : undefined,
    })
  },
  async claim(brokerName, consumerId, limit, lockTtlMs, strategy) {
    const result = await sqs.receiveMessage({
      QueueUrl: queues[brokerName],
      MaxNumberOfMessages: limit,
      VisibilityTimeout: Math.ceil(lockTtlMs / 1000),
    })
    return result.Messages?.map(m => m.Body!) ?? []
  },
  async extendLock(id, consumerId, lockTtlMs, brokerName) {
    // Extend SQS visibility timeout
  },
  async ack(brokerName, id) {
    // Delete message from SQS
  },
  async nack(brokerName, id, delayMs) {
    // Change visibility timeout for retry delay
  },
  async pause(brokerName) { /* disable polling */ },
  async resume(brokerName) { /* re-enable polling */ },
})

createLock(impl)

Build a custom ILockAdapter:

const consulLock = createLock({
  async acquire(key, ownerId, ttlMs) {
    const session = await consul.session.create({ ttl: `${ttlMs}ms` })
    return consul.kv.acquire({ key, session, value: ownerId })
  },
  async renew(key, ownerId, ttlMs) {
    return consul.session.renew(ownerId)
  },
  async release(key, ownerId) {
    await consul.kv.release({ key, session: ownerId })
  },
  async isOwner(key, ownerId) {
    const result = await consul.kv.get(key)
    return result?.Value === ownerId
  },
})

Putting It All Together

import { OqronKit, createStorage, createBroker, createLock } from 'oqronkit'

const storage = createStorage({ /* ... */ })
const broker = createBroker({ /* ... */ })
const lock = createLock({ /* ... */ })

await OqronKit.init({
  config: {
    mode: 'custom',
    adapters: { storage, broker, lock },
    modules: [queueModule(), cronModule()],
  },
})

All three factory functions validate at creation time that every required method is present. Missing a method throws immediately — no runtime surprises.

Adapter Factory Reference

Built-in Adapter Factories

These create adapters backed by OqronKit's built-in implementations (Memory, Redis, PostgreSQL):

createStorageAdapter(options)

OptionTypeValues
typestring"memory", "postgres", "redis"
postgresPostgresAdapterConfigRequired when type: "postgres"
redisRedisLikeRequired when type: "redis"

createBrokerAdapter(options)

OptionTypeValues
typestring"memory", "postgres", "redis"
postgresPostgresAdapterConfigRequired when type: "postgres"
redisRedisLikeRequired when type: "redis"

createLockAdapter(options)

OptionTypeValues
typestring"memory", "postgres", "redis"
postgresPostgresAdapterConfigRequired when type: "postgres"
redisRedisLikeRequired when type: "redis"

createAdapters(options)

OptionTypeDescription
modestring"default", "redis", "db", "hybrid-db", "custom"
ReturnsOqronAdapters{ storage, broker, lock, close() }

Custom Adapter Factories

These accept your own method implementations — no class needed:

FunctionInterfaceRequired Methods
createStorage(impl)IStorageEnginesave, get, list, delete, prune, count
createBroker(impl)IBrokerEnginepublish, claim, extendLock, ack, nack, pause, resume
createLock(impl)ILockAdapteracquire, renew, release, isOwner

createBroker also accepts an optional claimBlocking method for blocking-wait semantics.

Environment Isolation

All adapters are automatically wrapped with namespace isolation based on project and environment:

oqron:{project}:{environment}:{namespace}:{id}

This means:

  • A "production" worker never touches "development" data
  • Multiple projects on the same Redis/Postgres instance never collide
  • You don't need separate databases per environment
// Project A, production
await OqronKit.init({
  config: {
    mode: 'redis',
    redis: 'redis://shared-redis:6379',
    project: 'billing',
    environment: 'production',
  },
})

// Project B, staging — same Redis, fully isolated
await OqronKit.init({
  config: {
    mode: 'redis',
    redis: 'redis://shared-redis:6379',
    project: 'notifications',
    environment: 'staging',
  },
})

On this page