A SQLite-backed event and work engine for building durable, restart-safe workflows.
If you need to reliably turn events into background work (without losing consistency during crashes/retries), this package gives you the core runtime.
Use @torkbot/sledge when you want:
- durable event append + background work orchestration,
- retries and restart recovery by default,
- strong runtime validation at I/O boundaries,
- a small API surface you can adapt to your own schema and storage layout.
- Event log (
eventstable) - Durable work queue (
worktable) - Transactional flow: append event -> project -> materialize work in one transaction
- Lease-based execution for queue handlers
- Idempotent producer retries via
dedupeKey - Signals for short-lived work created inside queue handlers
- Webhook ingestion with producer idempotency (
dedupeKey) and reliable downstream processing - Notification pipelines (email/push/slack) with retries and dead-letter outcomes
- Long-running tool/API jobs that survive worker restarts
- Outbox-style orchestration without split-brain between writes and job enqueue
- Client-side materialization (browser/mobile/worker) by tailing events and resuming with an opaque cursor
import { Type } from "typebox";
import {
bindLedgerModel,
defineLedgerModel,
registerLedgerModel,
} from "@torkbot/sledge/ledger";
import { createBetterSqliteLedger } from "@torkbot/sledge/better-sqlite3-ledger";
import {
NodeRuntimeScheduler,
SystemRuntimeClock,
} from "@torkbot/sledge/runtime/node-runtime";
// Sledge schemas should describe JSON-compatible values. TypeBox codecs that
// transform between distinct runtime and storage domains are not supported.
const definedModel = defineLedgerModel({
events: {
"user.created": Type.Object({
userId: Type.String(),
email: Type.String(),
}),
},
queues: {
"welcome-email.send": Type.Object({
userId: Type.String(),
email: Type.String(),
}),
},
indexers: {
upsertUser: Type.Object({
userId: Type.String(),
email: Type.String(),
}),
},
queries: {
userById: {
params: Type.Object({ userId: Type.String() }),
result: Type.Union([
Type.Null(),
Type.Object({
userId: Type.String(),
email: Type.String(),
}),
]),
},
},
});
const registeredModel = registerLedgerModel(definedModel, {
events: {
"user.created": async ({ event, actions }) => {
await actions.index("upsertUser", {
userId: event.payload.userId,
email: event.payload.email,
});
actions.enqueue(
"welcome-email.send",
{
userId: event.payload.userId,
email: event.payload.email,
},
{ workKey: `welcome-email:${event.payload.userId}` },
);
},
},
queues: {
"welcome-email.send": async ({ work }) => {
// call provider here
console.log("sending welcome email", work.payload.email);
},
},
});
const clock = new SystemRuntimeClock();
const scheduler = new NodeRuntimeScheduler();
{
await using ledger = createBetterSqliteLedger({
databaseUrl: "./app.sqlite",
boundModel: bindLedgerModel(registeredModel, {
indexers: {
upsertUser: async (scope, input) => {
// Write to your own projection table(s)
await scope
.prepare(
`INSERT INTO users (user_id, email)
VALUES (?, ?)
ON CONFLICT(user_id) DO UPDATE SET email = excluded.email`,
)
.run(input.userId, input.email);
},
},
queries: {
userById: async (scope, params) => {
// Read from your own projection table(s)
const row = await scope
.prepare(
`SELECT user_id, email
FROM users
WHERE user_id = ?`,
)
.get(params.userId);
if (row === undefined) {
return null;
}
return {
userId: row.user_id,
email: row.email,
};
},
},
}),
timing: {
clock,
},
});
await using workers = await ledger.startWorkers({
scheduler,
});
const event = await ledger.emit("user.created", {
userId: "u_123",
email: "alice@example.com",
});
console.log(event.eventId);
}
db.close();You define contracts, not implementation details:
events: facts appended to the event streamsignals: short-lived records emitted from queue handlersqueues: durable work payloadssignalQueues: work payloads materialized from signalsindexers: projection write contractsqueries: projection read contracts
You attach orchestration handlers keyed by event/signal/queue names.
Event handlers can index, enqueue, and query.
You provide concrete implementations for indexers and queries. Sledge passes a storage scope as the first argument and the typed input/params as the second argument.
Query implementations are projection reads. They are not serialized behind ledger mutations, so a long-running query cannot block unrelated durable writes. Keep query implementations bounded to read-side state; avoid application orchestration, attachment/network/model I/O, and re-entering ledger-backed write paths from inside a query.
You choose a backend adapter and open the durable ledger:
createBetterSqliteLedger(...)createTursoLedger(...)
Adapters take a databaseUrl and Sledge owns the database connections it opens.
Plain :memory: URLs are rejected because they cannot support more than one
connection. Use a real database file for local SQLite, or a backend-specific
shared-memory URL only where the driver can still provide the required
multi-connection read/write semantics. The better-sqlite3 adapter verifies
that the opened database actually enters WAL journal mode and rejects databases
that cannot.
Sledge assumes a single process owns writes to a ledger database. Its
multi-connection support is for Sledge-managed read/write scopes inside that
runtime, not for competing writer processes. Cross-process consumers should use
durable event streams (tailEvents / resumeEvents) and work inspection APIs.
onSignal(...) is a live, process-local notification hook; signals are
transient and are not a durable distributed pub/sub channel.
The runtime exposes:
emit(eventName, payload, options?): append an event and return its durable envelopequery(queryName, params): run a model query implementationcancelWork({ ref, reason? }): durably mark non-terminal keyed work as cancelledqueryWork({ workId }): inspect one durable work item by storage idlistWork({ queueName?, sourceEventId?, states?, limit? }): inspect durable work itemstailEvents({ last, signal }): read recent durable events and follow new onesresumeEvents({ cursor, signal }): continue an event stream from an opaque cursoronSignal(signalName, observer): subscribe to live process-local signal notificationsstartWorkers(options): start queue dispatch for this ledger handleclose(): close the ledger runtime and the database connections owned by it
Opening a ledger is passive: it initializes storage and can emit, query, tail, resume, and observe signals, but it does not claim or process queue work.
Sledge manages read and write scopes internally. Ambient ledger.query(...)
calls receive an ambient read scope. Event projection actions.query(...) and
actions.index(...) receive the same transaction-local scope as the event
append and work materialization transaction.
Start workers explicitly in the process that owns queue execution:
await using workers = await ledger.startWorkers({
scheduler: new NodeRuntimeScheduler(),
leaseMs: 1_000,
defaultRetryDelayMs: 1_000,
maxInFlight: 16,
terminalWorkRetentionMs: 7 * 24 * 60 * 60 * 1_000,
});Sledge stores durable work rows for queued, leased, delayed-retry, dead-lettered,
and cancelled work. Successful work is deleted when it acks. Terminal retained
work (dead and cancelled) is pruned when workers start according to
terminalWorkRetentionMs.
Event and signal handlers can assign a workKey when they enqueue work. Sledge
combines that key with the source event id, queue name, and signal flag into a
stable WorkRef. Use refs for cancellation; workId remains a storage-local
inspection id.
actions.enqueue(
"welcome-email.send",
{ userId: event.payload.userId, email: event.payload.email },
{ workKey: `welcome-email:${event.payload.userId}` },
);
const currentWork = await ledger.listWork({
states: ["pending", "delayed", "leased"],
limit: 100,
});
const target = currentWork.find((work) => work.ref !== null);
if (target?.ref === undefined || target.ref === null) {
throw new Error("no keyed work to cancel");
}
const result = await ledger.cancelWork({
ref: target.ref,
reason: "user requested cancellation",
});
if (result.status === "cancelled") {
console.log(result.work.state); // "cancelled"
}Cancellation is terminal. Once cancelWork(...) succeeds for a non-terminal
work item, Sledge will not dispatch that work again, including after process
restart. If the work is currently leased by this ledger's active worker handle,
Sledge also aborts that lease's AbortSignal as a live-delivery fast path.
queryWork(...) and listWork(...) read only committed work state; they do not
expose rows staged by in-flight event materialization that may later roll back.
Queue and signal queue handlers implicitly ack on normal return.
- Return (or resolve) => ack
- Throw => retry using default retry delay
- Use
control.retry(error, { retryAtMs? })for explicit retry timing - Durable queue handlers may call
control.deadLetter(error) - Signal queue handlers do not support dead-letter
Use a signal when a handler needs to create short-lived follow-up work.
For example, a response handler might need to publish a “response started” notice. If the process crashes before the notice is published, sledge should retry it. After it is published, the notice does not need to stay in event history.
Signals are only emitted from normal queue handlers with actions.emitSignal(...). A signal can create signalQueues, but it cannot write indexes.
const definedModel = defineLedgerModel({
events: {
"response.requested": Type.Object({
responseId: Type.String(),
}),
},
signals: {
"response.notice": Type.Object({
responseId: Type.String(),
message: Type.String(),
}),
},
queues: {
"response.generate": Type.Object({
responseId: Type.String(),
}),
},
signalQueues: {
"response.notice.publish": Type.Object({
responseId: Type.String(),
message: Type.String(),
}),
},
indexers: {},
queries: {},
});
const model = registerLedgerModel(definedModel, {
events: {
"response.requested": ({ event, actions }) => {
actions.enqueue("response.generate", {
responseId: event.payload.responseId,
});
},
},
queues: {
"response.generate": async ({ work, actions }) => {
actions.emitSignal("response.notice", {
responseId: work.payload.responseId,
message: "response started",
});
},
},
signals: {
"response.notice": ({ event, actions }) => {
actions.enqueueSignal("response.notice.publish", {
responseId: event.payload.responseId,
message: event.payload.message,
});
},
},
signalQueues: {
"response.notice.publish": async ({ work }) => {
await notifier.publish(work.payload);
},
},
});Sledge keeps a signal while its work is pending or retrying. When the work acks, sledge deletes the signal and the completed signal work in the same transaction. dedupeKey values are shared across durable events and signals, so they must be globally unique while any matching row exists.
You can also watch signals live:
using notices = ledger.onSignal("response.notice", (signal) => {
socket.send(signal.payload.message);
});This is a live notification only. It does not have a cursor, and sledge does not retry observer callbacks. For work that must be retried before cleanup, use a signalQueue.
Use dedupeKey in emit(...) for producer retries. emit(...) returns the
winning durable event envelope. With a duplicate key, the existing event envelope
is returned and downstream materialization is not replayed.
const event = await ledger.emit(
"user.created",
{ userId: "u_123", email: "alice@example.com" },
{ dedupeKey: "provider-event:abc-123" },
);
console.log(event.eventId);Same key => same durable event winner, no duplicate downstream materialization.
You can materialize ledger events outside the process (for example, to browser state) with two APIs:
tailEvents({ last, signal }):tail -f -n <last>semanticsresumeEvents({ cursor, signal }): continue from a previously persisted opaque cursor
const controller = new AbortController();
for await (const item of ledger.tailEvents({
last: 100,
signal: controller.signal,
})) {
const event = item.event;
const cursor = item.cursor;
// apply event to external read model
// persist cursor for reconnect/resume
}
// Later (e.g. reconnect):
const resumeController = new AbortController();
for await (const item of ledger.resumeEvents({
cursor: persistedCursor,
signal: resumeController.signal,
})) {
// continue exactly after persisted cursor
}Cursor values are opaque by contract. Persist and reuse them as-is.
Long-running handlers are automatically lease-renewed for the full handler duration.
Use lease.signal for cooperative cancellation on shutdown/restart:
register: {
queues: {
"some.queue": async ({ lease }) => {
while (!lease.signal.aborted) {
// long-running async work
break;
}
},
},
};Available options when starting workers:
leaseMsdefaultRetryDelayMsmaxInFlight
Start simple; tune only when you observe contention/throughput issues.
@torkbot/sledge/ledger@torkbot/sledge/database-ledger-engine@torkbot/sledge/better-sqlite3-ledger@torkbot/sledge/turso-ledger@torkbot/sledge/runtime/contracts@torkbot/sledge/runtime/node-runtime@torkbot/sledge/runtime/virtual-runtime
node --run typecheck
node --run test
node --run build
node --run lint- The package is published as compiled JavaScript in
dist/(with.d.tstypes). - Source remains strict TypeScript in
src/. prepublishOnlyrunsnode --run buildautomatically.- Publishing uses GitHub Actions OIDC trusted publishing (
.github/workflows/release.yml), so no long-lived npm token is required. - Configure npm trusted publishing for
@torkbot/sledgeto trust this repository/workflow before first publish. - Node version is pinned via
engines.nodebecause runtime code uses explicit resource management (using/await using).