Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/infra/docs/cluster-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,29 @@ The closest baseline in Effect is `SqlMessageStorage` + `SqlRunnerStorage`.

- Pick **`ClusterCosmos`** when your platform standard is Cosmos and you want to avoid introducing SQL just for cluster storage.
- Pick **SQL storage** when you already have strong SQL ops tooling and prefer table/migration based durability.

## Parity notes

The goal is API parity with Effect SQL cluster storage, with explicit notes where backend semantics differ.

- Reply uniqueness: Cosmos enforces one terminal `WithExit` reply per request and one `Chunk` per `(request, sequence)` via deterministic reply document ids. Duplicate writes fail with `PersistenceError`, matching SQL behavior.
- OCC: Cosmos uses `_etag` + `IfMatch` for lock updates and message read claims.
- Batching: Cosmos groups operations by partition key and uses transactional batch per partition (chunked at 100 operations).
- `withTransaction`: intentionally a no-op in Cosmos storage. Unlike SQL, Cosmos does not expose a general cross-operation transaction boundary that matches cluster storage semantics across partitions and mixed operations.

### Why `withTransaction` stays a no-op

SQL storage can run arbitrary multi-step storage effects inside one DB transaction.
Cosmos only supports transactional scope in limited shapes (primarily same logical partition and explicit batch APIs).
The cluster storage API expects a broader transaction abstraction, so this adapter keeps `withTransaction` as pass-through and relies on idempotency, OCC, and partition-scoped batches.

## Current Cosmos limitations

These limitations apply to fresh Cosmos containers too; they are adapter semantics, not migration concerns.

- Multi-step writes are not atomic across partitions. `saveReply`, `clearReplies`, `clearAddress`, and chunk acknowledgements can complete one Cosmos operation and fail before the remaining state changes are applied. SQL storage wraps the equivalent flows in a database transaction.
- `saveReply` writes the reply before patching message state. If the reply write succeeds but message patching fails, a retry can hit the deterministic reply id and fail with `PersistenceError` while `lastReplyId` / `processed` are still stale.
- Runner and message lease expiry use application clocks. SQL storage mostly evaluates lock expiry, heartbeats, and read lease age with database time, while Cosmos uses `Date.now()` / caller-provided time; clock skew between runners can affect lease timing.
- Shard locks are persisted `_etag`-guarded documents, not advisory locks. They survive process death until expiry or explicit release.
- `unprocessedMessagesById` matches both Cosmos document id and request id. This is needed for keyed request docs whose document id is derived from the primary key, but it is broader than SQL's `id` lookup and should be considered when adding ack / interrupt edge cases.
- Cosmos runner `machineId` values are derived from a hash of runner address instead of a database-assigned id. This keeps re-registration stable, but collisions are possible and Snowflake still only uses the low 10 bits.
45 changes: 31 additions & 14 deletions packages/infra/src/ClusterCosmos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ const messagePartition = (shardId: string) => `message::${shardId}`
const messageDocId = (envelope: Envelope.Encoded, primaryKey: string | null) =>
cosmosId(primaryKey === null ? envelopeId(envelope) : `primary::${primaryKey}`)
const replyPartition = (requestId: string) => `reply::${requestId}`
const replyDocId = (reply: Reply.Encoded) =>
reply._tag === "WithExit"
? cosmosId(`reply::with-exit::${reply.requestId}`)
: cosmosId(`reply::chunk::${reply.requestId}::${reply.sequence}`)
const runnerDocId = (address: string) => cosmosId(`runner::${address}`)
const lockDocId = (shardId: string) => cosmosId(`lock::${shardId}`)
const tenMinutes = Duration.toMillis(Duration.minutes(10))
Expand Down Expand Up @@ -291,7 +295,7 @@ const envelopeFromDoc = (
const replyToDoc = (reply: Reply.Encoded): ReplyDoc =>
reply._tag === "WithExit"
? {
id: cosmosId(reply.id),
id: replyDocId(reply),
_partitionKey: replyPartition(reply.requestId),
type: "reply",
rowid: reply.id,
Expand All @@ -302,7 +306,7 @@ const replyToDoc = (reply: Reply.Encoded): ReplyDoc =>
acked: false
}
: {
id: cosmosId(reply.id),
id: replyDocId(reply),
_partitionKey: replyPartition(reply.requestId),
type: "reply",
rowid: reply.id,
Expand Down Expand Up @@ -396,17 +400,30 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: {
)

const markReplyAcked = (requestId: string, replyId: string) =>
Effect
.tryPromise(() =>
container.item(cosmosId(replyId), replyPartition(requestId)).patch<ReplyDoc>([
{ op: "set", path: "/acked", value: true }
])
)
queryReplies(
"SELECT * FROM c WHERE c.type = 'reply' AND c.requestId = @requestId AND c.rowid = @replyId",
[
{ name: "@requestId", value: requestId },
{ name: "@replyId", value: replyId }
]
)
.pipe(
Effect.tap(annotateItem),
Effect.asVoid,
Effect.catchIf(isNotFound, () => Effect.void),
Effect.catchIf(isPreconditionFailed, () => Effect.void)
Effect.flatMap((docs) => {
const doc = docs[0]
if (doc === undefined) return Effect.void
return Effect
.tryPromise(() =>
container.item(doc.id, replyPartition(requestId)).patch<ReplyDoc>([
{ op: "set", path: "/acked", value: true }
])
)
.pipe(
Effect.tap(annotateItem),
Effect.asVoid,
Effect.catchIf(isNotFound, () => Effect.void),
Effect.catchIf(isPreconditionFailed, () => Effect.void)
)
})
)

const claimMessageRead = (doc: MessageDoc, now: number) =>
Expand Down Expand Up @@ -538,8 +555,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: {
.gen(function*() {
const doc = replyToDoc(reply)
yield* Effect.tryPromise(() => container.items.create(doc)).pipe(
Effect.tap(annotateItem),
Effect.catchIf(isConflict, () => Effect.void)
Effect.tap(annotateItem)
)
const messages = yield* queryMessages(
"SELECT * FROM c WHERE c.type = 'message' AND c.requestId = @requestId",
Expand Down Expand Up @@ -705,6 +721,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: {
withTracerDisabled
),

// Cosmos does not expose a general cross-operation transaction matching SQL semantics.
withTransaction: (effect) => effect
})
})
Expand Down
28 changes: 28 additions & 0 deletions packages/infra/test/cluster-cosmos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,34 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => {
yield* Fiber.await(fiber)
})
.pipe(Effect.provide(layerFor())))

it.effect("fails on duplicate WithExit for the same request", () =>
Effect
.gen(function*() {
const storage = yield* MessageStorage.MessageStorage
const request = yield* makeStreamRequest(`duplicate-with-exit/${testRunId}`)
yield* storage.saveRequest(request)
yield* storage.saveReply(yield* makeStreamReply(request))

const duplicateAttempt = storage.saveReply(yield* makeStreamReply(request))
const error = yield* Effect.flip(duplicateAttempt)
assert.strictEqual(error._tag, "PersistenceError")
})
.pipe(Effect.provide(layerFor())))

it.effect("fails on duplicate Chunk sequence for the same request", () =>
Effect
.gen(function*() {
const storage = yield* MessageStorage.MessageStorage
const request = yield* makeStreamRequest(`duplicate-chunk-sequence/${testRunId}`)
yield* storage.saveRequest(request)
yield* storage.saveReply(yield* makeChunkReply(request, 0))

const duplicateAttempt = storage.saveReply(yield* makeChunkReply(request, 0))
const error = yield* Effect.flip(duplicateAttempt)
assert.strictEqual(error._tag, "PersistenceError")
})
.pipe(Effect.provide(layerFor())))
})

describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => {
Expand Down
110 changes: 110 additions & 0 deletions packages/infra/test/cluster-storage-parity-sqlite.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { SqliteClient } from "@effect/sql-sqlite-node"
import { assert, describe, it } from "@effect/vitest"
import { Context, Effect, Exit, Layer, Option, Schema } from "effect"
import { HttpHeaders } from "effect-app/http"
import { EntityAddress, EntityId, EntityType, Envelope, Message, MessageStorage, Reply, ShardId, ShardingConfig, Snowflake, SqlMessageStorage } from "effect/unstable/cluster"
import { Rpc, RpcSchema } from "effect/unstable/rpc"

const testRunId = `${Date.now()}-${process.pid}-${Math.random().toString(16).slice(2)}`

describe("Cluster storage parity on sqlite", () => {
it.effect("fails on duplicate WithExit for the same request", () =>
Effect
.gen(function*() {
const storage = yield* MessageStorage.MessageStorage
const request = yield* makeStreamRequest(`sqlite-duplicate-with-exit/${testRunId}`)
yield* storage.saveRequest(request)
yield* storage.saveReply(yield* makeStreamReply(request))

const duplicateAttempt = storage.saveReply(yield* makeStreamReply(request))
const error = yield* Effect.flip(duplicateAttempt)
assert.strictEqual(error._tag, "PersistenceError")
})
.pipe(Effect.provide(messageStorageLayer())))

it.effect("fails on duplicate Chunk sequence for the same request", () =>
Effect
.gen(function*() {
const storage = yield* MessageStorage.MessageStorage
const request = yield* makeStreamRequest(`sqlite-duplicate-chunk-sequence/${testRunId}`)
yield* storage.saveRequest(request)
yield* storage.saveReply(yield* makeChunkReply(request, 0))

const duplicateAttempt = storage.saveReply(yield* makeChunkReply(request, 0))
const error = yield* Effect.flip(duplicateAttempt)
assert.strictEqual(error._tag, "PersistenceError")
})
.pipe(Effect.provide(messageStorageLayer())))
})

class StreamRpc extends Rpc.make("ClusterStorageParityStreamRpc", {
success: RpcSchema.Stream(Schema.Void, Schema.Never),
payload: {
id: Schema.String
},
primaryKey: (value) => value.id.toString()
}) {}

const messageStorageLayer = () =>
SqlMessageStorage.layerWith({ prefix: `cluster_parity_sqlite_${testRunId.replace(/-/g, "_")}` }).pipe(
Layer.provideMerge(Snowflake.layerGenerator),
Layer.provide(ShardingConfig.layerDefaults),
Layer.provide(SqliteClient.layer({ filename: ":memory:" }))
)

const makeStreamRequest = Effect.fnUntraced(function*(id: string) {
const snowflake = yield* Snowflake.Generator
return new Message.OutgoingRequest({
envelope: Envelope.makeRequest<typeof StreamRpc>({
requestId: snowflake.nextUnsafe(),
address: EntityAddress.make({
shardId: ShardId.make(`cluster-sqlite-parity-${testRunId}`, 1),
entityType: EntityType.make("parity-test"),
entityId: EntityId.make("1")
}),
tag: StreamRpc._tag,
payload: StreamRpc.payloadSchema.make({ id }),
traceId: "noop",
spanId: "noop",
sampled: false,
headers: HttpHeaders.empty
}),
annotations: StreamRpc.annotations,
context: Context.empty(),
rpc: StreamRpc,
lastReceivedReply: Option.none(),
respond() {
return Effect.void
}
})
})

const makeStreamReply = Effect.fnUntraced(function*(request: Message.OutgoingRequest<typeof StreamRpc>) {
const snowflake = yield* Snowflake.Generator
return new Reply.ReplyWithContext({
reply: new Reply.WithExit<typeof StreamRpc>({
id: snowflake.nextUnsafe(),
requestId: request.envelope.requestId,
exit: Exit.void
}),
context: request.context,
rpc: request.rpc
})
})

const makeChunkReply = Effect.fnUntraced(function*(
request: Message.OutgoingRequest<typeof StreamRpc>,
sequence: number
) {
const snowflake = yield* Snowflake.Generator
return new Reply.ReplyWithContext({
reply: new Reply.Chunk<typeof StreamRpc>({
id: snowflake.nextUnsafe(),
requestId: request.envelope.requestId,
sequence,
values: [undefined]
}),
context: request.context,
rpc: request.rpc
})
})
Loading