diff --git a/packages/infra/docs/cluster-storage.md b/packages/infra/docs/cluster-storage.md index cbe4d3a6e..88bad7fcc 100644 --- a/packages/infra/docs/cluster-storage.md +++ b/packages/infra/docs/cluster-storage.md @@ -24,3 +24,29 @@ The closest baseline in Effect is `SqlMessageStorage` + `SqlRunnerStorage`. - Pick **`ClusterCosmos`** when your platform standard is Cosmos and you want to avoid introducing SQL just for cluster storage. - Pick **SQL storage** when you already have strong SQL ops tooling and prefer table/migration based durability. + +## Parity notes + +The goal is API parity with Effect SQL cluster storage, with explicit notes where backend semantics differ. + +- Reply uniqueness: Cosmos enforces one terminal `WithExit` reply per request and one `Chunk` per `(request, sequence)` via deterministic reply document ids. Duplicate writes fail with `PersistenceError`, matching SQL behavior. +- OCC: Cosmos uses `_etag` + `IfMatch` for lock updates and message read claims. +- Batching: Cosmos groups operations by partition key and uses transactional batch per partition (chunked at 100 operations). +- `withTransaction`: intentionally a no-op in Cosmos storage. Unlike SQL, Cosmos does not expose a general cross-operation transaction boundary that matches cluster storage semantics across partitions and mixed operations. + +### Why `withTransaction` stays a no-op + +SQL storage can run arbitrary multi-step storage effects inside one DB transaction. +Cosmos only supports transactional scope in limited shapes (primarily same logical partition and explicit batch APIs). +The cluster storage API expects a broader transaction abstraction, so this adapter keeps `withTransaction` as pass-through and relies on idempotency, OCC, and partition-scoped batches. + +## Current Cosmos limitations + +These limitations apply to fresh Cosmos containers too; they are adapter semantics, not migration concerns. + +- Multi-step writes are not atomic across partitions. `saveReply`, `clearReplies`, `clearAddress`, and chunk acknowledgements can complete one Cosmos operation and fail before the remaining state changes are applied. SQL storage wraps the equivalent flows in a database transaction. +- `saveReply` writes the reply before patching message state. If the reply write succeeds but message patching fails, a retry can hit the deterministic reply id and fail with `PersistenceError` while `lastReplyId` / `processed` are still stale. +- Runner and message lease expiry use application clocks. SQL storage mostly evaluates lock expiry, heartbeats, and read lease age with database time, while Cosmos uses `Date.now()` / caller-provided time; clock skew between runners can affect lease timing. +- Shard locks are persisted `_etag`-guarded documents, not advisory locks. They survive process death until expiry or explicit release. +- `unprocessedMessagesById` matches both Cosmos document id and request id. This is needed for keyed request docs whose document id is derived from the primary key, but it is broader than SQL's `id` lookup and should be considered when adding ack / interrupt edge cases. +- Cosmos runner `machineId` values are derived from a hash of runner address instead of a database-assigned id. This keeps re-registration stable, but collisions are possible and Snowflake still only uses the low 10 bits. diff --git a/packages/infra/src/ClusterCosmos.ts b/packages/infra/src/ClusterCosmos.ts index 8936b6603..ecbdcb4a1 100644 --- a/packages/infra/src/ClusterCosmos.ts +++ b/packages/infra/src/ClusterCosmos.ts @@ -104,6 +104,10 @@ const messagePartition = (shardId: string) => `message::${shardId}` const messageDocId = (envelope: Envelope.Encoded, primaryKey: string | null) => cosmosId(primaryKey === null ? envelopeId(envelope) : `primary::${primaryKey}`) const replyPartition = (requestId: string) => `reply::${requestId}` +const replyDocId = (reply: Reply.Encoded) => + reply._tag === "WithExit" + ? cosmosId(`reply::with-exit::${reply.requestId}`) + : cosmosId(`reply::chunk::${reply.requestId}::${reply.sequence}`) const runnerDocId = (address: string) => cosmosId(`runner::${address}`) const lockDocId = (shardId: string) => cosmosId(`lock::${shardId}`) const tenMinutes = Duration.toMillis(Duration.minutes(10)) @@ -291,7 +295,7 @@ const envelopeFromDoc = ( const replyToDoc = (reply: Reply.Encoded): ReplyDoc => reply._tag === "WithExit" ? { - id: cosmosId(reply.id), + id: replyDocId(reply), _partitionKey: replyPartition(reply.requestId), type: "reply", rowid: reply.id, @@ -302,7 +306,7 @@ const replyToDoc = (reply: Reply.Encoded): ReplyDoc => acked: false } : { - id: cosmosId(reply.id), + id: replyDocId(reply), _partitionKey: replyPartition(reply.requestId), type: "reply", rowid: reply.id, @@ -396,17 +400,30 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { ) const markReplyAcked = (requestId: string, replyId: string) => - Effect - .tryPromise(() => - container.item(cosmosId(replyId), replyPartition(requestId)).patch([ - { op: "set", path: "/acked", value: true } - ]) - ) + queryReplies( + "SELECT * FROM c WHERE c.type = 'reply' AND c.requestId = @requestId AND c.rowid = @replyId", + [ + { name: "@requestId", value: requestId }, + { name: "@replyId", value: replyId } + ] + ) .pipe( - Effect.tap(annotateItem), - Effect.asVoid, - Effect.catchIf(isNotFound, () => Effect.void), - Effect.catchIf(isPreconditionFailed, () => Effect.void) + Effect.flatMap((docs) => { + const doc = docs[0] + if (doc === undefined) return Effect.void + return Effect + .tryPromise(() => + container.item(doc.id, replyPartition(requestId)).patch([ + { op: "set", path: "/acked", value: true } + ]) + ) + .pipe( + Effect.tap(annotateItem), + Effect.asVoid, + Effect.catchIf(isNotFound, () => Effect.void), + Effect.catchIf(isPreconditionFailed, () => Effect.void) + ) + }) ) const claimMessageRead = (doc: MessageDoc, now: number) => @@ -538,8 +555,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { .gen(function*() { const doc = replyToDoc(reply) yield* Effect.tryPromise(() => container.items.create(doc)).pipe( - Effect.tap(annotateItem), - Effect.catchIf(isConflict, () => Effect.void) + Effect.tap(annotateItem) ) const messages = yield* queryMessages( "SELECT * FROM c WHERE c.type = 'message' AND c.requestId = @requestId", @@ -705,6 +721,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { withTracerDisabled ), + // Cosmos does not expose a general cross-operation transaction matching SQL semantics. withTransaction: (effect) => effect }) }) diff --git a/packages/infra/test/cluster-cosmos.test.ts b/packages/infra/test/cluster-cosmos.test.ts index 2e62b9cd3..081c134b6 100644 --- a/packages/infra/test/cluster-cosmos.test.ts +++ b/packages/infra/test/cluster-cosmos.test.ts @@ -156,6 +156,34 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { yield* Fiber.await(fiber) }) .pipe(Effect.provide(layerFor()))) + + it.effect("fails on duplicate WithExit for the same request", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const request = yield* makeStreamRequest(`duplicate-with-exit/${testRunId}`) + yield* storage.saveRequest(request) + yield* storage.saveReply(yield* makeStreamReply(request)) + + const duplicateAttempt = storage.saveReply(yield* makeStreamReply(request)) + const error = yield* Effect.flip(duplicateAttempt) + assert.strictEqual(error._tag, "PersistenceError") + }) + .pipe(Effect.provide(layerFor()))) + + it.effect("fails on duplicate Chunk sequence for the same request", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const request = yield* makeStreamRequest(`duplicate-chunk-sequence/${testRunId}`) + yield* storage.saveRequest(request) + yield* storage.saveReply(yield* makeChunkReply(request, 0)) + + const duplicateAttempt = storage.saveReply(yield* makeChunkReply(request, 0)) + const error = yield* Effect.flip(duplicateAttempt) + assert.strictEqual(error._tag, "PersistenceError") + }) + .pipe(Effect.provide(layerFor()))) }) describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { diff --git a/packages/infra/test/cluster-storage-parity-sqlite.test.ts b/packages/infra/test/cluster-storage-parity-sqlite.test.ts new file mode 100644 index 000000000..b0487351e --- /dev/null +++ b/packages/infra/test/cluster-storage-parity-sqlite.test.ts @@ -0,0 +1,110 @@ +import { SqliteClient } from "@effect/sql-sqlite-node" +import { assert, describe, it } from "@effect/vitest" +import { Context, Effect, Exit, Layer, Option, Schema } from "effect" +import { HttpHeaders } from "effect-app/http" +import { EntityAddress, EntityId, EntityType, Envelope, Message, MessageStorage, Reply, ShardId, ShardingConfig, Snowflake, SqlMessageStorage } from "effect/unstable/cluster" +import { Rpc, RpcSchema } from "effect/unstable/rpc" + +const testRunId = `${Date.now()}-${process.pid}-${Math.random().toString(16).slice(2)}` + +describe("Cluster storage parity on sqlite", () => { + it.effect("fails on duplicate WithExit for the same request", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const request = yield* makeStreamRequest(`sqlite-duplicate-with-exit/${testRunId}`) + yield* storage.saveRequest(request) + yield* storage.saveReply(yield* makeStreamReply(request)) + + const duplicateAttempt = storage.saveReply(yield* makeStreamReply(request)) + const error = yield* Effect.flip(duplicateAttempt) + assert.strictEqual(error._tag, "PersistenceError") + }) + .pipe(Effect.provide(messageStorageLayer()))) + + it.effect("fails on duplicate Chunk sequence for the same request", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const request = yield* makeStreamRequest(`sqlite-duplicate-chunk-sequence/${testRunId}`) + yield* storage.saveRequest(request) + yield* storage.saveReply(yield* makeChunkReply(request, 0)) + + const duplicateAttempt = storage.saveReply(yield* makeChunkReply(request, 0)) + const error = yield* Effect.flip(duplicateAttempt) + assert.strictEqual(error._tag, "PersistenceError") + }) + .pipe(Effect.provide(messageStorageLayer()))) +}) + +class StreamRpc extends Rpc.make("ClusterStorageParityStreamRpc", { + success: RpcSchema.Stream(Schema.Void, Schema.Never), + payload: { + id: Schema.String + }, + primaryKey: (value) => value.id.toString() +}) {} + +const messageStorageLayer = () => + SqlMessageStorage.layerWith({ prefix: `cluster_parity_sqlite_${testRunId.replace(/-/g, "_")}` }).pipe( + Layer.provideMerge(Snowflake.layerGenerator), + Layer.provide(ShardingConfig.layerDefaults), + Layer.provide(SqliteClient.layer({ filename: ":memory:" })) + ) + +const makeStreamRequest = Effect.fnUntraced(function*(id: string) { + const snowflake = yield* Snowflake.Generator + return new Message.OutgoingRequest({ + envelope: Envelope.makeRequest({ + requestId: snowflake.nextUnsafe(), + address: EntityAddress.make({ + shardId: ShardId.make(`cluster-sqlite-parity-${testRunId}`, 1), + entityType: EntityType.make("parity-test"), + entityId: EntityId.make("1") + }), + tag: StreamRpc._tag, + payload: StreamRpc.payloadSchema.make({ id }), + traceId: "noop", + spanId: "noop", + sampled: false, + headers: HttpHeaders.empty + }), + annotations: StreamRpc.annotations, + context: Context.empty(), + rpc: StreamRpc, + lastReceivedReply: Option.none(), + respond() { + return Effect.void + } + }) +}) + +const makeStreamReply = Effect.fnUntraced(function*(request: Message.OutgoingRequest) { + const snowflake = yield* Snowflake.Generator + return new Reply.ReplyWithContext({ + reply: new Reply.WithExit({ + id: snowflake.nextUnsafe(), + requestId: request.envelope.requestId, + exit: Exit.void + }), + context: request.context, + rpc: request.rpc + }) +}) + +const makeChunkReply = Effect.fnUntraced(function*( + request: Message.OutgoingRequest, + sequence: number +) { + const snowflake = yield* Snowflake.Generator + return new Reply.ReplyWithContext({ + reply: new Reply.Chunk({ + id: snowflake.nextUnsafe(), + requestId: request.envelope.requestId, + sequence, + values: [undefined] + }), + context: request.context, + rpc: request.rpc + }) +})