Adapters
Storage, Broker, and Lock adapter architecture — built-in and custom
Adapters
OqronKit's adapter system provides swappable persistence backends. The same job definitions work identically across in-memory, Redis, and PostgreSQL — zero code changes required. Need RabbitMQ for your broker? DynamoDB for storage? Implement the interface, plug it in.
Three Adapter Interfaces
Every OqronKit deployment requires exactly three adapters:
| Interface | Responsibility | Core Methods |
|---|---|---|
IStorageEngine | Job records, schedules, history, buffers | save, get, list, delete, count, prune |
IBrokerEngine | Job signaling, queue transport, claim/ack | publish, claim, ack, nack, pause, resume |
ILockAdapter | Distributed mutual exclusion, leader election | acquire, release, renew, isOwner |
Storage Modes
Use the mode config to select which adapter combination to use:
| Mode | Storage | Broker | Lock | Requires |
|---|---|---|---|---|
"default" | Memory | Memory | Memory | Nothing |
"redis" | Redis | Redis | Redis | redis config |
"db" | PostgreSQL | PostgreSQL | PostgreSQL | postgres config |
"hybrid-db" | PostgreSQL | Redis | Redis | Both configs |
"custom" | Your choice | Your choice | Your choice | adapters config |
In-Memory (Development)
Zero-dependency. Ships built-in. Perfect for local development and testing.
await OqronKit.init({
config: {
// mode: 'default' is implicit — no adapter config needed
modules: [queueModule(), cronModule()],
},
})Redis (Production)
High-throughput adapter using Redis Sorted Sets for the broker and Redlock-style distributed locking.
await OqronKit.init({
config: {
mode: 'redis',
redis: 'redis://localhost:6379',
modules: [queueModule(), cronModule()],
},
})PostgreSQL (Production)
ACID-compliant adapter using JSONB + GIN indexes for storage and PostgreSQL Advisory Locks.
await OqronKit.init({
config: {
mode: 'db',
postgres: {
connectionString: 'postgresql://user:pass@localhost:5432/mydb',
tablePrefix: 'oqron', // optional, default: "oqron"
poolSize: 10, // optional, default: 10
},
modules: [queueModule(), cronModule()],
},
})Hybrid (PG + Redis)
PostgreSQL for durable storage, Redis for fast broker + lock. Best of both worlds.
await OqronKit.init({
config: {
mode: 'hybrid-db',
postgres: { connectionString: 'postgresql://...' },
redis: 'redis://localhost:6379',
modules: [queueModule(), cronModule()],
},
})Custom Adapters
Use mode: "custom" to plug in your own adapter implementations.
Using Adapter Factory Functions
OqronKit provides factory functions to create individual adapters. Mix built-in adapters with your own:
import {
OqronKit,
createStorageAdapter,
createBrokerAdapter,
createLockAdapter,
} from 'oqronkit'
// Create individual adapters — mix and match freely
const storage = await createStorageAdapter({
type: 'postgres',
postgres: { connectionString: 'postgresql://...' },
})
const lock = await createLockAdapter({
type: 'redis',
redis: 'redis://localhost:6379',
})
await OqronKit.init({
config: {
mode: 'custom',
adapters: {
storage, // PostgreSQL
broker: new MyRabbitBroker(), // Your custom RabbitMQ adapter
lock, // Redis
},
modules: [queueModule(), cronModule()],
},
})Using createAdapters (All at Once)
Create all three adapters from a single config:
import { OqronKit, createAdapters } from 'oqronkit'
const adapters = await createAdapters({
mode: 'redis',
redis: 'redis://localhost:6379',
})
await OqronKit.init({
config: {
mode: 'custom',
adapters,
modules: [queueModule(), cronModule()],
},
})
// Later, on shutdown:
await adapters.close()Implementing a Custom Adapter (Class-Based)
Implement the interface directly as a class:
import type { IBrokerEngine, BrokerStrategy } from 'oqronkit'
class RabbitMQBroker implements IBrokerEngine {
async publish(queue: string, jobId: string, delay?: number): Promise<void> {
await this.channel.sendToQueue(queue, Buffer.from(jobId), {
headers: delay ? { 'x-delay': delay } : undefined,
})
}
async claim(
queue: string,
consumerId: string,
limit: number,
lockTtlMs: number,
strategy?: BrokerStrategy,
): Promise<string[]> {
const messages = await this.channel.get(queue, { noAck: false })
// ... return job IDs
}
async ack(queue: string, jobId: string): Promise<void> { /* ... */ }
async nack(queue: string, jobId: string, delay?: number): Promise<void> { /* ... */ }
async pause(queue: string): Promise<void> { /* ... */ }
async resume(queue: string): Promise<void> { /* ... */ }
async extendLock(
jobId: string,
consumerId: string,
ttlMs: number,
queue?: string,
): Promise<void> { /* ... */ }
}Implementing a Custom Adapter (Factory Functions)
Use createStorage, createBroker, and createLock to build adapters from plain objects — no class boilerplate needed:
import { createStorage, createBroker, createLock } from 'oqronkit'createStorage(impl)
Build a custom IStorageEngine:
const dynamoStorage = createStorage({
async save(namespace, id, data) {
await dynamo.putItem({
TableName: namespace,
Item: { id, data: JSON.stringify(data) },
})
},
async get(namespace, id) {
const result = await dynamo.getItem({ TableName: namespace, Key: { id } })
return result.Item ? JSON.parse(result.Item.data) : null
},
async list(namespace, filter, opts) {
const result = await dynamo.scan({ TableName: namespace })
return result.Items?.map(i => JSON.parse(i.data)) ?? []
},
async delete(namespace, id) {
await dynamo.deleteItem({ TableName: namespace, Key: { id } })
},
async prune(namespace, beforeMs) {
// Delete records older than beforeMs
return 0
},
async count(namespace, filter) {
const result = await dynamo.scan({ TableName: namespace, Select: 'COUNT' })
return result.Count ?? 0
},
})createBroker(impl)
Build a custom IBrokerEngine. claimBlocking is optional.
const sqsBroker = createBroker({
async publish(brokerName, id, delayMs, priority) {
await sqs.sendMessage({
QueueUrl: queues[brokerName],
MessageBody: id,
DelaySeconds: delayMs ? Math.ceil(delayMs / 1000) : undefined,
})
},
async claim(brokerName, consumerId, limit, lockTtlMs, strategy) {
const result = await sqs.receiveMessage({
QueueUrl: queues[brokerName],
MaxNumberOfMessages: limit,
VisibilityTimeout: Math.ceil(lockTtlMs / 1000),
})
return result.Messages?.map(m => m.Body!) ?? []
},
async extendLock(id, consumerId, lockTtlMs, brokerName) {
// Extend SQS visibility timeout
},
async ack(brokerName, id) {
// Delete message from SQS
},
async nack(brokerName, id, delayMs) {
// Change visibility timeout for retry delay
},
async pause(brokerName) { /* disable polling */ },
async resume(brokerName) { /* re-enable polling */ },
})createLock(impl)
Build a custom ILockAdapter:
const consulLock = createLock({
async acquire(key, ownerId, ttlMs) {
const session = await consul.session.create({ ttl: `${ttlMs}ms` })
return consul.kv.acquire({ key, session, value: ownerId })
},
async renew(key, ownerId, ttlMs) {
return consul.session.renew(ownerId)
},
async release(key, ownerId) {
await consul.kv.release({ key, session: ownerId })
},
async isOwner(key, ownerId) {
const result = await consul.kv.get(key)
return result?.Value === ownerId
},
})Putting It All Together
import { OqronKit, createStorage, createBroker, createLock } from 'oqronkit'
const storage = createStorage({ /* ... */ })
const broker = createBroker({ /* ... */ })
const lock = createLock({ /* ... */ })
await OqronKit.init({
config: {
mode: 'custom',
adapters: { storage, broker, lock },
modules: [queueModule(), cronModule()],
},
})All three factory functions validate at creation time that every required method is present. Missing a method throws immediately — no runtime surprises.
Adapter Factory Reference
Built-in Adapter Factories
These create adapters backed by OqronKit's built-in implementations (Memory, Redis, PostgreSQL):
createStorageAdapter(options)
| Option | Type | Values |
|---|---|---|
type | string | "memory", "postgres", "redis" |
postgres | PostgresAdapterConfig | Required when type: "postgres" |
redis | RedisLike | Required when type: "redis" |
createBrokerAdapter(options)
| Option | Type | Values |
|---|---|---|
type | string | "memory", "postgres", "redis" |
postgres | PostgresAdapterConfig | Required when type: "postgres" |
redis | RedisLike | Required when type: "redis" |
createLockAdapter(options)
| Option | Type | Values |
|---|---|---|
type | string | "memory", "postgres", "redis" |
postgres | PostgresAdapterConfig | Required when type: "postgres" |
redis | RedisLike | Required when type: "redis" |
createAdapters(options)
| Option | Type | Description |
|---|---|---|
mode | string | "default", "redis", "db", "hybrid-db", "custom" |
| Returns | OqronAdapters | { storage, broker, lock, close() } |
Custom Adapter Factories
These accept your own method implementations — no class needed:
| Function | Interface | Required Methods |
|---|---|---|
createStorage(impl) | IStorageEngine | save, get, list, delete, prune, count |
createBroker(impl) | IBrokerEngine | publish, claim, extendLock, ack, nack, pause, resume |
createLock(impl) | ILockAdapter | acquire, renew, release, isOwner |
createBroker also accepts an optional claimBlocking method for blocking-wait semantics.
Environment Isolation
All adapters are automatically wrapped with namespace isolation based on project and environment:
oqron:{project}:{environment}:{namespace}:{id}This means:
- A
"production"worker never touches"development"data - Multiple projects on the same Redis/Postgres instance never collide
- You don't need separate databases per environment
// Project A, production
await OqronKit.init({
config: {
mode: 'redis',
redis: 'redis://shared-redis:6379',
project: 'billing',
environment: 'production',
},
})
// Project B, staging — same Redis, fully isolated
await OqronKit.init({
config: {
mode: 'redis',
redis: 'redis://shared-redis:6379',
project: 'notifications',
environment: 'staging',
},
})