diff --git a/CHANGELOG.md b/CHANGELOG.md index b3c1930..2f85607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,81 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## [0.0.15-pre-release] + +### Added + +#### Topology wait — opt-in startup retry for externally-owned RabbitMQ topology (`RayTree.Plugins.RabbitMQ`) + +In microservice deployments the exchange or queue that a publisher or consumer depends on is +often owned and declared by a separate service. If both services start simultaneously the +consumer-side service may call `InitializeAsync` before the topology exists, causing an +`OperationInterruptedException` with `NOT_FOUND` (404) and an immediate crash-loop. + +Three new options on both `RabbitMqPublisherOptions` and `RabbitMqConsumerOptions` enable an +opt-in wait loop that probes with AMQP passive declares and retries until the topology appears: + +| Option | Type | Default | Description | +|---|---|---|---| +| `WaitForTopology` | `bool` | `false` | Enable the topology wait loop. | +| `TopologyWaitInterval` | `TimeSpan` | `5 s` | Delay between passive-declare attempts. | +| `TopologyWaitTimeout` | `TimeSpan?` | `null` | Optional hard deadline; `null` means wait indefinitely. | + +**Publisher behaviour** (when `WaitForTopology = true` and `DeclareExchange = false`): probes +the configured `ExchangeName` with `ExchangeDeclarePassiveAsync` before opening the working +channel. Retries on `NOT_FOUND`; propagates all other errors immediately. + +**Consumer behaviour** (when `WaitForTopology = true`): +- When `DeclareQueue = false`: probes `QueueName` with `QueueDeclarePassiveAsync`. +- When `ExchangeName` is non-empty: probes the exchange with `ExchangeDeclarePassiveAsync` + before calling `QueueBindAsync`. + +Both probes use the same `TopologyWaitInterval` and `TopologyWaitTimeout`. + +**Non-retried errors**: only `NOT_FOUND` (404) triggers retry. `PRECONDITION_FAILED`, +`ACCESS_REFUSED`, connection failures, and `OperationCanceledException` propagate immediately. + +**Fresh channel per attempt**: RabbitMQ closes the channel on every channel-level error, so +each probe opens a new channel from the existing connection. The working channel is created only +after all probes succeed. + +**Logging** (publisher only — `RabbitMqConsumer` has no logger by design): + +| Level | When | +|---|---| +| `Information` | First `NOT_FOUND` for an exchange or queue | +| `Debug` | Subsequent misses | +| `Information` | Recovery (topology became available) | +| `Error` | Timeout exhaustion before rethrowing | + +```csharp +// Publisher waits for an exchange owned by another service +new RabbitMqPublisherOptions +{ + ExchangeName = "entity_changes", + DeclareExchange = false, // not our exchange to declare + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2), + TopologyWaitTimeout = TimeSpan.FromMinutes(5) // give up after 5 min +} + +// Consumer waits for a queue and its binding exchange +new RabbitMqConsumerOptions +{ + QueueName = "order-events", + ExchangeName = "entity_changes", + DeclareQueue = false, // queue owned externally + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2) + // TopologyWaitTimeout = null → retry indefinitely until token is cancelled +} +``` + +The default (`WaitForTopology = false`) is unchanged — a missing exchange or queue surfaces the +underlying `OperationInterruptedException` on the first AMQP operation, exactly as before. + +--- + ## [0.0.14-pre-release] ### Added diff --git a/CLAUDE.md b/CLAUDE.md index b343071..5d33ac8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -71,7 +71,7 @@ EntityChangeTracker | `RayTree.Plugins.PostgreSQL` | `PostgreSqlOutbox` — stores changes as flat columns (one column per entity property via `EntityColumnMapper`). Constructor: `PostgreSqlOutbox(PostgreSqlOutboxOptions, ILoggerFactory)` — both params required. `PostgreSqlRepository` constructor: `PostgreSqlRepository(PostgreSqlRepositoryOptions, ILoggerFactory)` — both params required. Builder extension methods accept `ILoggerFactory? loggerFactory = null` and default to `NullLoggerFactory.Instance`. `EntityColumnMapper` honours `System.ComponentModel.DataAnnotations` / `Schema` attributes: `[NotMapped]` excludes a property; `[Column("name")]` overrides the column name suffix (the `state_` prefix is always kept to avoid collisions with outbox metadata columns); `[Column(TypeName = "JSONB")]` sets the PostgreSQL type verbatim; `[Required]` forces `NOT NULL` on reference types; `[MaxLength(n)]`/`[StringLength(n)]` emits `VARCHAR(n)` instead of `TEXT`; `[Table("name")]` on the entity class is used as the base name when deriving default outbox/source table names; `[Key]` (one or more properties) identifies the business primary key — `PostgreSqlRepository` uses these for INSERT/UPDATE/DELETE/SELECT and adds a UNIQUE index on the corresponding `state_*` columns in the source table; for composite keys pair `[Key]` with `[Column(Order = n)]` to control column order. 1D arrays of primitive types are automatically mapped to the corresponding PostgreSQL array column type: `int[]` → `INTEGER[]`, `long[]` → `BIGINT[]`, `bool[]` → `BOOLEAN[]`, `string[]` → `TEXT[]`, `Guid[]` → `UUID[]`, `float[]` → `REAL[]`, `double[]` → `DOUBLE PRECISION[]`, `decimal[]` → `NUMERIC[]`, `DateTime[]`/`DateTimeOffset[]` → `TIMESTAMPTZ[]`, `short[]`/`byte[]`/`sbyte[]` → `SMALLINT[]`; nullable-element arrays (e.g. `int?[]`) strip the nullable wrapper before mapping the element type. Multi-dimensional arrays are not supported — declare the column type explicitly via `[Column(TypeName = "...")]` if needed. When reading values back, `EntityColumnMapper.ConvertFromDb` first attempts a direct CLR assignability check (Npgsql returns the correct array type natively) and falls back to `Convert.ChangeType` for scalar numeric coercions. Both `CleanupPublishedAsync` and `CleanupStaleUnpublishedAsync` delete in batches (`PostgreSqlOutboxOptions.CleanupBatchSize`, default 1000) using a `DELETE … WHERE id IN (SELECT id … LIMIT @BatchSize)` loop to avoid large single-statement locks and WAL spikes. **`InitializeAsync` manages schema automatically** — no flag required, always active. Fresh table path: single `CREATE TABLE IF NOT EXISTS` (columns + indexes). Existing table path: column diff via `SchemaMigrator` (adds missing columns with `ALTER TABLE … ADD COLUMN IF NOT EXISTS`; guards NOT NULL without default on non-empty tables by throwing `InvalidOperationException`; logs `Warning` for orphan columns and type mismatches) + index diff via `IndexMigrator` (creates missing indexes; drops and recreates indexes whose definition changed — uniqueness, column order, or WHERE clause; logs `Warning` for orphan indexes). Internal infrastructure: `SchemaInspector` (static — `TableExistsAsync`, `GetColumnsAsync` via `information_schema.columns`, `GetIndexesAsync` via `pg_index` catalog using `unnest(indkey::smallint[]) WITH ORDINALITY` for ordered columns and `pg_get_expr` for WHERE, `ExecuteDdlAsync`, `TableHasRowsAsync`); `SchemaMigrator` (column diff, parameterised delegate for DDL generation and orphan filter); `IndexMigrator` (index diff with schema-qualified `DROP INDEX IF EXISTS public.{name}`; WHERE clause comparison is case-insensitive and trimmed); `PostgreSqlTypeNormalizer` (maps `information_schema` type fields to canonical DDL strings). `NotificationBasedPublisher` — NOTIFY/LISTEN fast-path with polling fallback; bounded by `NotificationBasedPublisherOptions.MaxConcurrentNotifications` (default 16) via a `SemaphoreSlim` in `OnNotification`; fallback polling uses `Parallel.ForEachAsync` with `MaxPublishConcurrency` (default 1 — sequential). Logs LISTEN connection loss at `Warning` (once, on the first unhealthy tick), recovery at `Information`, and claim contention (record already taken by another publisher) at `Debug`. | | `RayTree.Plugins.InMemory` | `InMemoryQueue` implements both `IQueuePublisher` and `IQueueConsumer` via `Channel`. Use for tests and local dev. | | `RayTree.Plugins.Kafka` | `KafkaPublisher` + `KafkaConsumer`. **Publisher key**: `KafkaPublisherOptions.KeySelector` (`Func`) selects the Kafka partition key for each message. Default: `envelope => $"{EntityType}:{EntityId}"` — all changes for the same entity land on the same partition, preserving per-entity ordering. Override to shard by any envelope field (e.g. tenant, aggregate root). Consumer uses a dedicated background thread (channel-based) because Confluent.Kafka requires all `Consume`/`Commit`/`Seek` calls on one thread. `KafkaConsumer(KafkaConsumerOptions, ILoggerFactory)` — both params required. `KafkaConsumerOptions.AckAfterHandler` (default `false`) defers the offset commit; subscriber posts the `ConsumeResult` plus a `Commit`/`SeekBack` action through an internal post-handler channel that the poll thread drains at the top of each iteration (when items are queued, the next `Consume()` uses `TimeSpan.Zero` so commits don't wait a full poll cycle). `AcknowledgeAsync` → `Commit`; `NegativeAcknowledgeAsync` → `Seek(TopicPartitionOffset)` so the failed message is redelivered in the same consumer's lifetime, not just on restart. Parse-failure path always commits immediately to avoid poison-pilling the partition. Requires `SubscriberOptions.MaxDegreeOfParallelism = 1` per partition when `AckAfterHandler = true`. | -| `RayTree.Plugins.RabbitMQ` | `RabbitMqPublisher` + `RabbitMqConsumer`. **Routing key**: `RabbitMqPublisherOptions.RoutingKeySelector` (`Func`) selects the AMQP routing key for each message. Default: `"{RoutingKey}.{EntityType}.{changeType}"` (e.g. `change.Order.insert`) — consumers bind queues with wildcard patterns such as `change.Order.*` or `change.*.insert`. The default delegate reads `RoutingKey` at call time so changing that property after construction is always reflected; set a custom delegate to route by tenant, aggregate root, or any envelope field. Consumer uses `AsyncEventingBasicConsumer` buffered via `Channel`. `RabbitMqConsumer(RabbitMqConsumerOptions)` — options only; no logger. Message-receive errors silently NACK and requeue without logging (acknowledged exception to the logging placement rule — NACK/requeue is the correct recovery action and no context is available at that point). `RabbitMqConsumerOptions.AckAfterHandler` (default `false`) defers the broker ACK until after `ChangeSubscriber` confirms handler success — delivery tag is stashed in `MessageEnvelope.Metadata` via the internal `RabbitMqEnvelopeMetadata` accessor; `AcknowledgeAsync` issues `BasicAckAsync`; `NegativeAcknowledgeAsync` issues `BasicNackAsync(requeue: true)`. | +| `RayTree.Plugins.RabbitMQ` | `RabbitMqPublisher` + `RabbitMqConsumer`. **Routing key**: `RabbitMqPublisherOptions.RoutingKeySelector` (`Func`) selects the AMQP routing key for each message. Default: `"{RoutingKey}.{EntityType}.{changeType}"` (e.g. `change.Order.insert`) — consumers bind queues with wildcard patterns such as `change.Order.*` or `change.*.insert`. The default delegate reads `RoutingKey` at call time so changing that property after construction is always reflected; set a custom delegate to route by tenant, aggregate root, or any envelope field. `RabbitMqPublisher(RabbitMqPublisherOptions, ILoggerFactory?)` — options required, logger factory optional (`null` → `NullLoggerFactory.Instance`); `UseRabbitMq(configure, loggerFactory)` mirrors the same shape. Consumer uses `AsyncEventingBasicConsumer` buffered via `Channel`. `RabbitMqConsumer(RabbitMqConsumerOptions)` — options only; no logger. Message-receive errors silently NACK and requeue without logging (acknowledged exception to the logging placement rule — NACK/requeue is the correct recovery action and no context is available at that point). `RabbitMqConsumerOptions.AckAfterHandler` (default `false`) defers the broker ACK until after `ChangeSubscriber` confirms handler success — delivery tag is stashed in `MessageEnvelope.Metadata` via the internal `RabbitMqEnvelopeMetadata` accessor; `AcknowledgeAsync` issues `BasicAckAsync`; `NegativeAcknowledgeAsync` issues `BasicNackAsync(requeue: true)`. **Topology wait** (opt-in, both options classes): `WaitForTopology` (bool, default `false`), `TopologyWaitInterval` (TimeSpan, default 5 s), `TopologyWaitTimeout` (TimeSpan?, default `null` — unlimited). When `WaitForTopology = true`, `InitializeAsync` probes externally-owned topology via AMQP passive declares (`ExchangeDeclarePassiveAsync` / `QueueDeclarePassiveAsync`) and retries only on `NOT_FOUND` (404) until the topology appears, the cancellation token is cancelled, or `TopologyWaitTimeout` elapses (rethrowing the last `NOT_FOUND`). Other channel- and connection-level errors (`PRECONDITION_FAILED`, `ACCESS_REFUSED`, etc.) propagate immediately. Each probe attempt uses a fresh channel from the existing connection because RabbitMQ closes the channel on any channel-level exception. The publisher probes when `DeclareExchange = false`; the consumer probes the queue when `DeclareQueue = false` and the binding-target exchange when `ExchangeName` is non-empty. Probe progress is logged by the publisher (first miss `Information`, subsequent misses `Debug`, recovery `Information`, timeout `Error` via `TopologyProbe`); the consumer's no-logger exception still holds, so consumer-side probes log nothing. Use this in microservice deployments where one service owns the topology and others connect later without strict startup ordering. | | `RayTree.Plugins.Serializers.*` | JSON, MessagePack, Protobuf — each in its own package. | | `RayTree.Plugins.Compressors.*` | Gzip, Brotli, LZ4 — each in its own package. | @@ -130,7 +130,7 @@ All durations are emitted in seconds (`s`) per OTel semantic conventions; bytes - **Integration tests use Testcontainers**: PostgreSQL, Kafka, and RabbitMQ tests require Docker. Mark test classes `[NonParallelizable]` when sharing a container. Use unique topic/queue names per test to avoid cross-test contamination. - **Metrics placement rule — required meter, no null fallback**: `RayTreeMeter` is a required non-null constructor parameter on every runtime service that emits metrics (`ChangePublisher`, `OutboxPublisherService`, `ChangeSubscriber`). There is no internal `new RayTreeMeter()` fallback in those classes — the builder layer (`ChangeTrackingBuilder.BuildInternal`) constructs a default meter when the caller didn't supply one via `UseMeter`, then injects it everywhere. This matches the logging rule: callers make a conscious choice at builder/DI level, runtime services have no hidden defaults. `EntityChangeTracker` tracks ownership via an `ownsMeter` flag and disposes the meter only when it created it; caller-supplied meters are left alone. Instrument calls are silent no-ops when no listener is attached, so opting out costs nothing at runtime. - **OTel SDK isolation via peer assembly**: `RayTree.Core` and `RayTree.Hosting` use only `System.Diagnostics.Metrics` (BCL) — no `OpenTelemetry.*` package references. `RayTree.OpenTelemetry` is a separate assembly with two members (`RayTreeInstrumentation.MeterName` + `AddRayTreeMetrics`) that an application opts into. Applications that don't need OTel receive zero transitive OTel dependencies; applications that do reference exactly one well-versioned dependency. This mirrors the `RayTree.Hosting` / `RayTree.EntityFrameworkCore` split. -- **Logging placement rule**: `NullLoggerFactory.Instance` / `NullLogger.Instance` defaults belong **only** in builders and builder-context extension methods (`ChangeTrackingBuilder`, `ChangePublisherBuilder`, `ChangeSubscriberBuilder`, `KafkaSubscriberExtensions.UseKafka`, `RabbitMqSubscriberExtensions.UseRabbitMq`, `BuilderExtensions.UsePostgreSqlOutbox`, `RepositoryExtensions.UsePostgreSqlRepository`). All runtime service classes (`ChangePublisher`, `OutboxPublisherService`, `ChangeSubscriber`, `ChangeTrackingHostedService`, `KafkaConsumer`, `NotificationBasedPublisher`, `PostgreSqlOutbox`, `PostgreSqlRepository`) require a non-nullable logger — no internal fallback. **Exception**: `RabbitMqConsumer` intentionally has no logger — message-receive errors silently NACK and requeue, which is the correct broker-level recovery; no useful context is available inside the RabbitMQ delivery callback to produce a meaningful log entry. This ensures that callers always make a conscious choice about whether to produce log output. +- **Logging placement rule**: `NullLoggerFactory.Instance` / `NullLogger.Instance` defaults belong **only** in builders and builder-context extension methods (`ChangeTrackingBuilder`, `ChangePublisherBuilder`, `ChangeSubscriberBuilder`, `KafkaSubscriberExtensions.UseKafka`, `RabbitMqBuilderExtensions.UseRabbitMq`, `RabbitMqSubscriberExtensions.UseRabbitMq`, `BuilderExtensions.UsePostgreSqlOutbox`, `RepositoryExtensions.UsePostgreSqlRepository`). All runtime service classes (`ChangePublisher`, `OutboxPublisherService`, `ChangeSubscriber`, `ChangeTrackingHostedService`, `KafkaConsumer`, `NotificationBasedPublisher`, `PostgreSqlOutbox`, `PostgreSqlRepository`) require a non-nullable logger — no internal fallback. `RabbitMqPublisher` accepts an optional `ILoggerFactory?` (null → `NullLoggerFactory.Instance`) to support topology-wait logging without forcing every caller through DI. **Exception**: `RabbitMqConsumer` intentionally has no logger — message-receive errors silently NACK and requeue, which is the correct broker-level recovery; no useful context is available inside the RabbitMQ delivery callback to produce a meaningful log entry. Consumer-side topology-wait probes therefore pass `logger: null` to `TopologyProbe`, preserving the exception. This ensures that callers always make a conscious choice about whether to produce log output. ## Code Style & Conventions diff --git a/Directory.Build.props b/Directory.Build.props index 7ac4219..d368ee9 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -7,7 +7,7 @@ true nullable - 0.0.14 + 0.0.15 pre-release bitc0der diff --git a/docs/README.md b/docs/README.md index baf4fb0..2d2bcad 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,6 +1,6 @@ # RayTree - Entity Change Tracking System -A modular .NET 8.0 entity change tracking system with outbox pattern support, queue distribution, per-entity plugin configuration, and `System.IO.Pipelines` for zero-allocation serialization/compression. +A modular .NET 10 entity change tracking system with outbox pattern support, queue distribution, per-entity plugin configuration, and `System.IO.Pipelines` for zero-allocation serialization/compression. ## Features @@ -13,6 +13,7 @@ A modular .NET 8.0 entity change tracking system with outbox pattern support, qu - **Auto-Initialization** - Automatic database schema initialization on `Build()` / `BuildAsync()` - **Structured Logging** - `Microsoft.Extensions.Logging` throughout; pass `ILoggerFactory` to `EntityChangeTracker.Create()` or let `AddChangeTracking` wire it from DI automatically - **OpenTelemetry Metrics** - `System.Diagnostics.Metrics` instruments on a `"RayTree"` meter for outbox writes, publish/subscribe latency, payload size, queue depth, and retry shape. Zero OTel SDK dependency unless the optional `RayTree.OpenTelemetry` package is referenced. See [OpenTelemetry Metrics Guide](opentelemetry-metrics.md). +- **RabbitMQ Topology Wait** - Opt-in startup retry for `RabbitMqPublisher` and `RabbitMqConsumer` when the exchange or queue is owned by another service. Probes with AMQP passive declares and retries on `NOT_FOUND` until the topology appears, the cancellation token fires, or a configurable timeout elapses. ## Quick Start @@ -437,6 +438,60 @@ public class MaintenanceController( `RunCleanupAsync` calls `CleanupPublishedAsync` on every registered outbox and returns the total number of rows deleted. +## RabbitMQ Topology Wait + +In microservice deployments one service typically owns and declares the RabbitMQ exchange or +queue while other services connect to it as consumers or publishers. If the owning service has +not yet started when `InitializeAsync` is called the broker returns `NOT_FOUND` (404) and the +connecting service crashes. + +Enable the opt-in topology wait loop by setting `WaitForTopology = true` on +`RabbitMqPublisherOptions` or `RabbitMqConsumerOptions`. The client then probes with AMQP +passive declares and retries at `TopologyWaitInterval` intervals until the topology appears, the +`CancellationToken` passed to `InitializeAsync` is cancelled, or `TopologyWaitTimeout` +(optional) elapses. + +| Option | Default | Description | +|---|---|---| +| `WaitForTopology` | `false` | Enable the wait loop. | +| `TopologyWaitInterval` | `5 s` | Delay between probe attempts. | +| `TopologyWaitTimeout` | `null` | Hard deadline; `null` means no ceiling. | + +Only `NOT_FOUND` (404) retries. Other errors (`PRECONDITION_FAILED`, connection failures, etc.) +propagate immediately. + +### Publisher example + +```csharp +builder.ForEntity(e => e + .UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions + { + ExchangeName = "entity_changes", + DeclareExchange = false, // owned by another service + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2), + TopologyWaitTimeout = TimeSpan.FromMinutes(5) + }))); +``` + +### Consumer example + +```csharp +// Queue owned externally; consumer waits for queue AND binding exchange +var consumer = new RabbitMqConsumer(new RabbitMqConsumerOptions +{ + QueueName = "order-events", + ExchangeName = "entity_changes", + DeclareQueue = false, + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2) + // TopologyWaitTimeout = null → retry until cancellation +}); +``` + +See the [Configuration Guide](configuration.md#rabbitmq-topology-wait) for the full option +reference and for consumer-factory / Generic Host patterns. + ## Cleanup `EntityChangeTracker` implements `IDisposable`. Disposing it stops all publisher services: diff --git a/docs/configuration.md b/docs/configuration.md index bdb3b6d..98897ec 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -323,6 +323,64 @@ new RabbitMqPublisherOptions When `RoutingKeySelector` is set it takes full control of the key; the `RoutingKey` base prefix is ignored. +### RabbitMQ topology wait + +In microservice deployments one service owns and declares the exchange or queue while other +services consume it. If the owning service has not yet started when `InitializeAsync` is called, +the broker returns `NOT_FOUND` and the client crashes. The opt-in topology wait loop retries with +AMQP passive declares until the topology appears. + +Three options control the wait on both `RabbitMqPublisherOptions` and `RabbitMqConsumerOptions`: + +| Option | Default | Description | +|---|---|---| +| `WaitForTopology` | `false` | Enable the wait loop. | +| `TopologyWaitInterval` | `5 s` | Delay between passive-declare attempts. | +| `TopologyWaitTimeout` | `null` (unlimited) | Hard deadline; `null` means retry until cancellation. | + +**When does the publisher probe?** When `WaitForTopology = true` **and** `DeclareExchange = false`. +The publisher probes `ExchangeName` with `ExchangeDeclarePassiveAsync` before connecting. + +**When does the consumer probe?** + +- When `WaitForTopology = true` and `DeclareQueue = false` — probes `QueueName` with `QueueDeclarePassiveAsync`. +- When `WaitForTopology = true` and `ExchangeName` is non-empty — also probes the exchange with `ExchangeDeclarePassiveAsync` before `QueueBindAsync`. + +Only `NOT_FOUND` (404) retries. All other errors (`PRECONDITION_FAILED`, `ACCESS_REFUSED`, +connection failures) propagate immediately. + +```csharp +// Publisher — waits for an exchange declared by another service +builder.ForEntity(e => e + .UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions + { + HostName = "rabbitmq", + ExchangeName = "entity_changes", + DeclareExchange = false, // owned by topology-service + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2), + TopologyWaitTimeout = TimeSpan.FromMinutes(5) + }))); + +// Consumer — waits for a queue and binding exchange +builder.ForEntity(e => e + .UseSerializer(new JsonSerializerPlugin()) + .UseConsumer(new RabbitMqConsumer(new RabbitMqConsumerOptions + { + HostName = "rabbitmq", + QueueName = "order-events", + ExchangeName = "entity_changes", + DeclareQueue = false, // queue owned externally + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2) + // TopologyWaitTimeout = null → retry until CancellationToken is cancelled + })) + .OnInsert(async (change, ct) => { /* ... */ })); +``` + +The default (`WaitForTopology = false`) is unchanged — a missing exchange or queue surfaces the +underlying `OperationInterruptedException` immediately. + ```csharp // InMemory (testing) .ForEntity(e => e diff --git a/examples/RabbitMQ.Microservices/docker-compose.yml b/examples/RabbitMQ.Microservices/docker-compose.yml index 8f1b82b..50d5cfc 100644 --- a/examples/RabbitMQ.Microservices/docker-compose.yml +++ b/examples/RabbitMQ.Microservices/docker-compose.yml @@ -1,6 +1,6 @@ services: postgres: - image: postgres:18.1-alpine3.22 + image: postgres:18.3-alpine environment: POSTGRES_DB: raytree_example POSTGRES_USER: postgres @@ -16,7 +16,7 @@ services: retries: 10 rabbitmq: - image: rabbitmq:4.2.4-management-alpine + image: rabbitmq:4.3.0-management-alpine ports: - "5672:5672" - "15672:15672" diff --git a/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/.openspec.yaml b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/.openspec.yaml new file mode 100644 index 0000000..af43829 --- /dev/null +++ b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-21 diff --git a/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/design.md b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/design.md new file mode 100644 index 0000000..2a7e517 --- /dev/null +++ b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/design.md @@ -0,0 +1,99 @@ +## Context + +RayTree's RabbitMQ plugin currently performs all topology declaration eagerly inside `InitializeAsync`: + +- `RabbitMqPublisher`: opens a connection + channel, optionally calls `ExchangeDeclareAsync` (gated by `DeclareExchange`). +- `RabbitMqConsumer`: opens a connection + channel, optionally calls `QueueDeclareAsync` (gated by `DeclareQueue`), then `QueueBindAsync` (gated by `ExchangeName` being non-empty), then starts `BasicConsume`. + +When `DeclareExchange = false` (publisher) or `DeclareQueue = false` (consumer), the caller is saying "another component owns this topology — don't auto-create it." But the moment any subsequent operation references a piece of topology that hasn't been declared yet (a `BasicPublish` to a non-existent exchange, a `QueueBind` to a non-existent exchange, or even a `BasicConsume` against a non-existent queue), RabbitMQ raises a channel-level exception with reply code `404 NOT_FOUND` and the channel becomes unusable. + +In any deployment where the topology owner starts after a consumer or publisher — common in microservice systems where startup order is not strict, or where a configuration service / migration job declares topology — this turns into a startup crash that the operator must work around by sequencing services manually or by restarting until everything stabilises. + +The fix is small in scope: an opt-in *wait* loop in `InitializeAsync` that probes the relevant topology with passive declares and retries on `NOT_FOUND` until it shows up or a budget expires. The existing "declare it myself" path is preserved untouched. + +## Goals / Non-Goals + +**Goals:** +- Let an operator deploy a RabbitMQ publisher or consumer that *consumes from* a topology owned elsewhere without requiring strict startup ordering. +- Preserve all current behaviour by default (opt-in flag, default `false`). +- Distinguish "topology not yet present" (retryable) from "genuine misconfiguration" (immediate failure) so misconfigured deployments still fail fast. +- Keep the implementation contained to the RabbitMQ plugin — no changes to `IQueuePublisher` / `IQueueConsumer` contracts, no changes to `EntityChangeTracker` lifecycle. + +**Non-Goals:** +- Re-establishing topology after a *runtime* loss (e.g., the exchange is deleted while the publisher is connected). That is a much larger feature touching publisher confirms and connection-recovery wiring; this change is strictly about startup. +- Auto-creating topology that the caller said they didn't want to declare. We only *wait* — we never `Declare` (active) on a side that opted out. +- Adding retry to other plugins (Kafka, PostgreSQL). The corresponding failure modes are different and are out of scope here. +- Generalising into a cross-plugin "wait-for-dependency" abstraction. Premature — handle Kafka/Postgres equivalents when their real use cases arrive. + +## Decisions + +### Decision 1: Use AMQP passive declares for probing + +We probe topology with `ExchangeDeclarePassiveAsync(name)` and `QueueDeclarePassiveAsync(name)`. Passive declares return success if the entity exists with any configuration; they return `NOT_FOUND` (and close the channel) if it doesn't. They never create anything. + +**Alternatives considered:** +- *Reuse `ExchangeDeclareAsync` / `QueueDeclareAsync` (active) for probing.* Rejected — the active form would create the topology if missing, which directly contradicts the user's explicit choice to set `DeclareExchange = false` / `DeclareQueue = false`. The whole point of the opt-out is that the local service does not own that topology and may not have the configuration (durability, args, exchange type) to declare it correctly. +- *Skip the probe and let the first real operation fail.* Rejected for the consumer side — `BasicConsume` against a missing queue, or `QueueBind` to a missing exchange, both fail with `NOT_FOUND` but at a point where we've already pre-allocated a channel and registered a delivery handler. The recovery path is messier than just doing an explicit probe up-front. Passive declare is also cheap: a single round-trip with no broker-side persistence. + +### Decision 2: Retry only on `NOT_FOUND` (404) + +The retry loop catches `OperationInterruptedException` whose `ShutdownReason.ReplyCode == 404`. Everything else — auth failures (`ACCESS_REFUSED` 403), arg mismatches (`PRECONDITION_FAILED` 406), connection-level errors, `OperationCanceledException`, etc. — propagates immediately. + +**Alternatives considered:** +- *Retry on any channel-level exception.* Rejected — `PRECONDITION_FAILED` typically means the topology exists but with arguments different from what the caller expected. That's a configuration bug, not a startup race, and looping on it would hide the problem indefinitely. +- *Make the predicate user-configurable.* Rejected for the first iteration — `NOT_FOUND` is the only error class this feature is intended to mask. If a real use case appears for a broader predicate we can add it later without breaking the narrower API. + +### Decision 3: Open a fresh channel for each retry attempt + +RabbitMQ closes the channel as part of any channel-level exception. The first failed probe leaves the existing `_channel` unusable. Each retry therefore opens a new channel from the existing connection. The connection itself stays — connection-level errors are not retried here. + +**Alternatives considered:** +- *Reuse a single channel by reopening it after each failure.* Rejected — there is no "reopen" API on a closed `IChannel`; you have to call `_connection.CreateChannelAsync` again. So the cost is identical and adding indirection would only obscure that. +- *Use a single dedicated probe channel separate from the working channel.* Rejected — it would still need to be reopened after each `NOT_FOUND`, so it provides no win, and it doubles the channel count for a one-shot startup task. + +### Decision 4: Default `WaitForTopology = false` + +The new option defaults to `false`. Existing callers who never set it see exactly today's behaviour: a missing exchange/queue throws immediately. The opt-in keeps semantics explicit and avoids silently masking misconfigurations in deployments that *should* fail fast (e.g., dev environments where startup order is already enforced and a missing exchange is a real bug). + +**Alternatives considered:** +- *Default `true`.* Rejected — would change the behaviour of every existing deployment on upgrade, including ones where today's fast-fail is the desired and expected behaviour. The cost of the explicit opt-in is one extra line of options configuration; the cost of an accidental default change is silent startup hangs. + +### Decision 5: `TopologyWaitTimeout` defaults to `null` (no ceiling) + +When `WaitForTopology = true`, the loop runs until topology appears or the caller's `CancellationToken` is cancelled. There is no built-in time budget. Operators who *do* want a hard ceiling set `TopologyWaitTimeout` explicitly. + +**Rationale:** The most common deployment scenario is "wait for the dependency, however long it takes" — typically gated by an orchestrator's overall startup timeout (Kubernetes liveness/readiness, Docker Compose health checks). Adding a default ceiling would introduce a second, hidden timeout that operators have to discover and override. + +**Alternatives considered:** +- *Default to a finite ceiling (e.g., 5 minutes).* Rejected for the reason above — it surprises operators with an arbitrary deadline that's not exposed in the orchestrator they're already using. +- *Default to a small attempt count (e.g., 12 attempts × 5s = 1 minute).* Same problem; rejected. + +### Decision 6: Probing scope per side + +| Side | What we probe (only when `WaitForTopology = true`) | +|---|---| +| Publisher, `DeclareExchange = false` | The configured `ExchangeName`. | +| Publisher, `DeclareExchange = true` | Nothing — we're declaring it ourselves. Wait flag is ignored. | +| Consumer, `DeclareQueue = false` | The configured `QueueName`. | +| Consumer, `DeclareQueue = true` | Nothing for the queue itself. | +| Consumer, `ExchangeName` set (any `DeclareQueue` value) | The exchange named in `ExchangeName`, *before* `QueueBindAsync`. | + +This matches the existing logic in `RabbitMqConsumer.InitializeAsync` (which already conditions `QueueDeclareAsync` on `DeclareQueue` and `QueueBindAsync` on `!string.IsNullOrEmpty(ExchangeName)`). Each guarded step gets its own probe immediately before it; the probe is skipped when the corresponding declare is being performed locally. + +## Risks / Trade-offs + +- **[Indefinite hangs on missing topology when timeout is `null`]** → The default behaviour relies on the caller-supplied `CancellationToken` to bound the wait. `EntityChangeTracker.StartAsync(CancellationToken)` already propagates a token from `ChangeTrackingHostedService`, and the host's shutdown timeout will cancel it during a stuck startup. Operators who want a tighter bound set `TopologyWaitTimeout` explicitly. This is documented in `CLAUDE.md`. +- **[`NOT_FOUND` masking a real bug]** → If a publisher is configured against the wrong exchange name (typo) with `WaitForTopology = true`, it will wait forever. Mitigation: the first `NOT_FOUND` logs at `Information` ("waiting for exchange `X`"), so the typo is visible in startup logs. Operators who want a hard fail set a finite `TopologyWaitTimeout`. +- **[Extra channel churn]** → Each retry opens a fresh channel. At a 5-second interval over a 1-minute wait, that's 12 channel open/close cycles. RabbitMQ tolerates this trivially (channels are cheap), and the activity is one-shot at startup. No expected production impact. +- **[Behaviour change in existing tests / examples]** → Defaults are unchanged, so no existing test should break. The new tests live in `tests/RayTree.Plugins.RabbitMQ.Tests` and use Testcontainers (already in use for this project). +- **[Concurrent topology creation race]** → If the topology owner declares the exchange between our probe and the next operation, no harm done — the next operation succeeds. If it declares it concurrently with the same name but different arguments, that's a configuration bug on their side; we don't try to detect or correct it here. + +## Migration Plan + +This is purely additive — no migration steps for existing callers. Callers who want the new behaviour: + +1. Set `RabbitMqPublisherOptions.WaitForTopology = true` (publisher side) and/or `RabbitMqConsumerOptions.WaitForTopology = true` (consumer side). +2. Optionally set `TopologyWaitInterval` (default 5 s) and `TopologyWaitTimeout` (default unlimited). +3. Ensure the `CancellationToken` passed to `tracker.StartAsync` is the one the host actually cancels on shutdown (already the case under `ChangeTrackingHostedService`). + +Rollback: clear the flag — there is no persistent broker state introduced by this change. diff --git a/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/proposal.md b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/proposal.md new file mode 100644 index 0000000..edbbca0 --- /dev/null +++ b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/proposal.md @@ -0,0 +1,31 @@ +## Why + +In microservice deployments, the service that *owns* a RabbitMQ exchange or queue (declares it) is frequently not the same service that publishes to it or consumes from it. Today, when `RabbitMqPublisher` is configured with `DeclareExchange = false` (or `RabbitMqConsumer` with `DeclareQueue = false`, or any consumer that binds to an externally-owned exchange), `InitializeAsync` fails immediately with an AMQP `NOT_FOUND` (404) channel exception if the topology is not yet present. This forces startup ordering between services and turns transient bootstrap races into permanent crashes. + +We need an opt-in retry loop so a publisher/consumer can wait for topology owned by another service to come online, instead of crashing on first miss. + +## What Changes + +- Add an opt-in *topology wait* mode to `RabbitMqPublisherOptions` and `RabbitMqConsumerOptions`. When enabled and `Declare*` is `false`, initialization SHALL probe the relevant exchange/queue (and exchange-for-binding on the consumer) and retry on `NOT_FOUND` until it appears or the timeout/attempt budget is exhausted. +- Add configuration knobs: `WaitForTopology` (bool, default `false` — opt-in, no behaviour change for existing callers), `TopologyWaitInterval` (TimeSpan, default 5 s), `TopologyWaitTimeout` (TimeSpan?, default `null` — wait indefinitely until cancelled). +- Use AMQP *passive* declares (`ExchangeDeclarePassiveAsync` / `QueueDeclarePassiveAsync`) for probing so we never inadvertently auto-create topology we explicitly said we wouldn't own. +- Only `NOT_FOUND` (404) channel errors trigger retry; connection-level failures, permission errors, and other channel errors propagate immediately so genuine misconfiguration is still surfaced fast. +- Each retry uses a fresh channel — RabbitMQ closes the channel on any channel-level exception, so reusing it would fail on the next probe. +- Log topology-wait attempts at `Debug`, the first `NOT_FOUND` at `Information` (so operators see a one-time "waiting for X" message), and exhaustion at `Error`. Recovery (probe succeeded after one or more misses) logs at `Information`. +- Update `CLAUDE.md` to document the new options and the topology-ownership pattern they enable. + +## Capabilities + +### New Capabilities +- `rmq-topology-wait`: An opt-in wait/retry loop in `RabbitMqPublisher` / `RabbitMqConsumer` `InitializeAsync` that tolerates `NOT_FOUND` on externally-owned exchanges and queues until they appear. + +### Modified Capabilities + + +## Impact + +- **Code**: `src/RayTree.Plugins.RabbitMQ/RabbitMqPublisher.cs`, `src/RayTree.Plugins.RabbitMQ/RabbitMqPublisherOptions.cs`, `src/RayTree.Plugins.RabbitMQ/RabbitMqConsumer.cs`, `src/RayTree.Plugins.RabbitMQ/RabbitMqConsumerOptions.cs`. A small internal helper (`TopologyProbe` or inline) for the probe-and-retry loop. +- **APIs**: Three new public properties on each of `RabbitMqPublisherOptions` and `RabbitMqConsumerOptions`. No breaking changes — defaults preserve current behaviour exactly. +- **Tests**: `tests/RayTree.Plugins.RabbitMQ.Tests` gains coverage for: (a) publisher waits then succeeds when exchange appears late, (b) consumer waits then succeeds when queue and bound exchange appear late, (c) timeout exhaustion surfaces the underlying `NOT_FOUND`, (d) non-`NOT_FOUND` errors propagate immediately without retry, (e) default (opt-out) behaviour is unchanged. +- **Dependencies**: None new — `ExchangeDeclarePassiveAsync` / `QueueDeclarePassiveAsync` already exist on `RabbitMQ.Client.IChannel`. +- **Docs**: `CLAUDE.md` RabbitMQ rows updated to describe `WaitForTopology` and the microservice topology-ownership scenario it enables. diff --git a/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/specs/rmq-topology-wait/spec.md b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/specs/rmq-topology-wait/spec.md new file mode 100644 index 0000000..3a656d6 --- /dev/null +++ b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/specs/rmq-topology-wait/spec.md @@ -0,0 +1,111 @@ +## ADDED Requirements + +### Requirement: Opt-in topology wait flag +The RabbitMQ publisher and consumer SHALL expose a `WaitForTopology` boolean option (default `false`) that, when `true`, causes `InitializeAsync` to wait for externally-owned RabbitMQ topology (exchanges and queues) to appear instead of failing immediately on `NOT_FOUND`. + +#### Scenario: Default behaviour is unchanged +- **WHEN** `WaitForTopology` is not set (or set to `false`) on `RabbitMqPublisherOptions` or `RabbitMqConsumerOptions` +- **THEN** `InitializeAsync` SHALL behave exactly as before — a missing exchange, missing queue, or `QueueBind` to a missing exchange SHALL surface the underlying `OperationInterruptedException` on the first failed AMQP operation without any retry. + +#### Scenario: Opt-in enables wait loop +- **WHEN** `WaitForTopology = true` is set on either options class +- **THEN** `InitializeAsync` SHALL probe the relevant topology with passive declares and retry on `NOT_FOUND` according to the configured interval and timeout. + +### Requirement: Publisher waits for externally-owned exchange +When `RabbitMqPublisherOptions.WaitForTopology = true` AND `DeclareExchange = false`, `RabbitMqPublisher.InitializeAsync` SHALL probe the configured `ExchangeName` with `ExchangeDeclarePassiveAsync` and retry on `NOT_FOUND` until the exchange appears, the cancellation token is cancelled, or `TopologyWaitTimeout` (when set) elapses. + +#### Scenario: Exchange appears after one or more probe attempts +- **WHEN** the exchange named in `ExchangeName` does not exist at the moment `InitializeAsync` is called but is declared by another service shortly after +- **THEN** the publisher SHALL retry the passive declare at intervals of `TopologyWaitInterval` +- **AND** SHALL complete `InitializeAsync` successfully once the passive declare succeeds +- **AND** SHALL log the first `NOT_FOUND` at `Information` level and the eventual recovery at `Information` level. + +#### Scenario: `DeclareExchange = true` skips the wait +- **WHEN** `DeclareExchange = true` regardless of `WaitForTopology` +- **THEN** the publisher SHALL declare the exchange itself with `ExchangeDeclareAsync` and SHALL NOT perform a passive probe. + +### Requirement: Consumer waits for externally-owned queue +When `RabbitMqConsumerOptions.WaitForTopology = true` AND `DeclareQueue = false`, `RabbitMqConsumer.InitializeAsync` SHALL probe the configured `QueueName` with `QueueDeclarePassiveAsync` and retry on `NOT_FOUND` until the queue appears, the cancellation token is cancelled, or `TopologyWaitTimeout` (when set) elapses. The probe SHALL occur before any `QueueBindAsync` or `BasicConsumeAsync` call. + +#### Scenario: Queue appears after one or more probe attempts +- **WHEN** the queue named in `QueueName` does not exist when `InitializeAsync` is called +- **AND** another service declares it shortly after +- **THEN** the consumer SHALL retry the passive declare at intervals of `TopologyWaitInterval` +- **AND** SHALL proceed to `BasicConsumeAsync` once the passive declare succeeds. + +#### Scenario: `DeclareQueue = true` skips the queue wait +- **WHEN** `DeclareQueue = true` regardless of `WaitForTopology` +- **THEN** the consumer SHALL declare the queue itself with `QueueDeclareAsync` and SHALL NOT perform a passive probe for the queue. + +### Requirement: Consumer waits for binding-target exchange +When `RabbitMqConsumerOptions.WaitForTopology = true` AND `ExchangeName` is non-empty, `RabbitMqConsumer.InitializeAsync` SHALL probe `ExchangeName` with `ExchangeDeclarePassiveAsync` and retry on `NOT_FOUND` before calling `QueueBindAsync`, using the same interval and timeout as the queue probe. + +#### Scenario: Exchange-for-binding appears after probe retries +- **WHEN** `ExchangeName` references an exchange that does not yet exist +- **THEN** the consumer SHALL retry the passive declare at intervals of `TopologyWaitInterval` +- **AND** SHALL invoke `QueueBindAsync` once the exchange exists. + +#### Scenario: No `ExchangeName` configured +- **WHEN** `ExchangeName` is null or empty +- **THEN** the consumer SHALL NOT probe any exchange and SHALL skip `QueueBindAsync` (default-exchange routing path). + +### Requirement: Retry only on `NOT_FOUND` +The topology wait loop SHALL retry only when the AMQP operation fails with reply code `404 NOT_FOUND`. All other channel-level exceptions, connection-level exceptions, and `OperationCanceledException` SHALL propagate immediately without retry. + +#### Scenario: PRECONDITION_FAILED propagates immediately +- **WHEN** the broker rejects the passive declare with reply code `406 PRECONDITION_FAILED` (for example, the existing exchange has different arguments than expected) +- **THEN** `InitializeAsync` SHALL propagate the exception on the first attempt without further retries. + +#### Scenario: ACCESS_REFUSED propagates immediately +- **WHEN** the broker rejects the operation with reply code `403 ACCESS_REFUSED` +- **THEN** `InitializeAsync` SHALL propagate the exception on the first attempt without further retries. + +#### Scenario: Connection failure propagates immediately +- **WHEN** the TCP connection cannot be established or is dropped during initialization +- **THEN** the resulting connection-level exception SHALL propagate without retry. + +### Requirement: Retry interval and timeout configuration +The publisher and consumer options SHALL expose `TopologyWaitInterval` (TimeSpan, default `5 seconds`) and `TopologyWaitTimeout` (TimeSpan?, default `null`). When `TopologyWaitTimeout` is non-null, the wait loop SHALL stop and rethrow the most recent `NOT_FOUND` exception once the elapsed time exceeds the timeout. + +#### Scenario: Custom interval is honoured +- **WHEN** `TopologyWaitInterval = TimeSpan.FromSeconds(1)` is set +- **THEN** consecutive passive declare attempts SHALL be separated by approximately one second. + +#### Scenario: Timeout exhaustion surfaces the underlying error +- **WHEN** `TopologyWaitTimeout = TimeSpan.FromSeconds(10)` is set +- **AND** the topology has not appeared after 10 seconds of probing +- **THEN** `InitializeAsync` SHALL throw the last observed `NOT_FOUND` exception. + +#### Scenario: Null timeout means no ceiling +- **WHEN** `TopologyWaitTimeout = null` +- **THEN** the wait loop SHALL continue indefinitely until either the topology appears or the cancellation token is cancelled. + +### Requirement: Cancellation token cancels the wait +The wait loop SHALL observe the `CancellationToken` passed into `InitializeAsync`. When the token is cancelled during a wait or between attempts, the loop SHALL throw `OperationCanceledException`. + +#### Scenario: Cancellation during the inter-attempt delay +- **WHEN** the cancellation token is cancelled while the wait loop is sleeping between attempts +- **THEN** `InitializeAsync` SHALL throw `OperationCanceledException` rather than continuing. + +### Requirement: Fresh channel per retry +Each passive-declare attempt SHALL be executed on a freshly created channel because RabbitMQ closes the channel on every channel-level exception. The persistent working channel held by the publisher/consumer SHALL be opened only after the probe(s) succeed. + +#### Scenario: Channel is recreated after a NOT_FOUND +- **WHEN** a passive declare returns `NOT_FOUND` and closes the probe channel +- **THEN** the next attempt SHALL open a new channel from the existing connection before issuing the next passive declare. + +### Requirement: Logging of topology wait +The plugin SHALL emit the following log entries when `WaitForTopology = true`: + +- First `NOT_FOUND` per probed entity: `Information`, with the entity kind (exchange/queue) and name. +- Subsequent `NOT_FOUND` attempts for the same entity: `Debug`. +- Recovery (probe succeeds after one or more misses): `Information`. +- Timeout exhaustion: `Error`, immediately before rethrowing. + +#### Scenario: First miss logged at Information +- **WHEN** the first passive declare for an entity returns `NOT_FOUND` +- **THEN** an `Information`-level log SHALL be emitted indicating the consumer/publisher is waiting for that entity by name. + +#### Scenario: Recovery logged at Information +- **WHEN** a passive declare succeeds after at least one prior `NOT_FOUND` +- **THEN** an `Information`-level log SHALL be emitted indicating the topology became available. diff --git a/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/tasks.md b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/tasks.md new file mode 100644 index 0000000..d1ba051 --- /dev/null +++ b/openspec/changes/archive/2026-05-21-rmq-retry-missing-topology/tasks.md @@ -0,0 +1,46 @@ +## 1. Options surface + +- [x] 1.1 Add `WaitForTopology` (bool, default `false`), `TopologyWaitInterval` (TimeSpan, default `TimeSpan.FromSeconds(5)`), and `TopologyWaitTimeout` (TimeSpan?, default `null`) to `RabbitMqPublisherOptions`. Include XML doc comments explaining the microservice topology-ownership scenario and the `NOT_FOUND`-only retry semantics. +- [x] 1.2 Add the same three properties with identical defaults and equivalent XML docs to `RabbitMqConsumerOptions`. + +## 2. Topology-probe helper + +- [x] 2.1 Add an internal helper (`TopologyProbe`, in `src/RayTree.Plugins.RabbitMQ/TopologyProbe.cs`) exposing `WaitForExchangeAsync(IConnection, string exchangeName, TimeSpan interval, TimeSpan? timeout, ILogger? logger, CancellationToken ct)` and `WaitForQueueAsync(IConnection, string queueName, TimeSpan interval, TimeSpan? timeout, ILogger? logger, CancellationToken ct)`. +- [x] 2.2 Inside each method, loop: open a fresh channel from the connection → call the passive declare → on `OperationInterruptedException` with `ShutdownInitiator.Peer` and `ReplyCode == 404`, dispose the dead channel and `await Task.Delay(interval, ct)`; on any other exception rethrow immediately; on success dispose the probe channel and return. +- [x] 2.3 Track elapsed time with a `Stopwatch` started before the first attempt. When `timeout` is non-null and `stopwatch.Elapsed >= timeout` at the start of an attempt, log the timeout at `Error` and rethrow the captured `NOT_FOUND` exception. +- [x] 2.4 Emit logging exactly as specified: first miss per call at `Information` (include entity kind + name); subsequent misses at `Debug`; recovery (success after ≥1 miss) at `Information`; timeout at `Error`. + +## 3. Publisher integration + +- [x] 3.1 In `RabbitMqPublisher.GetChannelAsync`, after opening the connection and before opening the working channel, branch on `_options is { WaitForTopology: true, DeclareExchange: false }` and call `TopologyProbe.WaitForExchangeAsync(_connection, _options.ExchangeName, _options.TopologyWaitInterval, _options.TopologyWaitTimeout, logger: null, cancellationToken)`. +- [x] 3.2 Leave the `DeclareExchange = true` path untouched — `ExchangeDeclareAsync` continues to run on the working channel as today. +- [x] 3.3 The publisher currently constructs with options only and has no `ILoggerFactory`. Either (a) wire an optional `ILoggerFactory?` parameter through `RabbitMqPublisher`'s constructor and the `RabbitMqBuilderExtensions` registration (preferred — keeps logging-placement rule consistent with the rest of the publisher-side plugins), or (b) pass `logger: null` to the probe and accept that the publisher logs nothing for topology waits. Pick (a); update `CLAUDE.md`'s "logger placement" exception list accordingly. + +## 4. Consumer integration + +- [x] 4.1 In `RabbitMqConsumer.InitializeAsync`, after creating the connection but before creating `_channel`, branch on `_options is { WaitForTopology: true, DeclareQueue: false }` and probe the queue via `TopologyProbe.WaitForQueueAsync`. +- [x] 4.2 Still inside `InitializeAsync`, after the queue declare/probe and before `QueueBindAsync`, branch on `_options.WaitForTopology && !string.IsNullOrEmpty(_options.ExchangeName)` and probe the exchange via `TopologyProbe.WaitForExchangeAsync`. +- [x] 4.3 Preserve `RabbitMqConsumer`'s "no logger" rule from `CLAUDE.md` exception — pass `logger: null` from the consumer side (the wait-loop logging is intentionally only available to the publisher because the consumer is the one with the documented no-logger exception). Document this decision in a one-line comment at the call site. +- [x] 4.4 Confirm that the existing `_channel` lifecycle is unaffected: probe channels are opened/disposed inside `TopologyProbe`; the durable working channel is created exactly once via `_connection.CreateChannelAsync` as today. + +## 5. Tests (`tests/RayTree.Plugins.RabbitMQ.Tests`) + +- [x] 5.1 Add `TopologyWaitTests.cs`. Use a single Testcontainers RabbitMQ container shared across the class (marked `[NonParallelizable]` per project convention). +- [x] 5.2 Test: `Publisher_waits_then_succeeds_when_exchange_appears_late` — start the publisher with `WaitForTopology = true`, `DeclareExchange = false`, exchange name unique to this test; on a background task wait 2 s then create the exchange via a separate management channel; assert `InitializeAsync` returns successfully and a subsequent `PublishAsync` succeeds. +- [x] 5.3 Test: `Consumer_waits_then_succeeds_when_queue_appears_late` — symmetric setup for the consumer with `DeclareQueue = false` and no `ExchangeName`; background task creates the queue; assert `InitializeAsync` returns and a published message flows through. +- [x] 5.4 Test: `Consumer_waits_then_succeeds_when_bound_exchange_appears_late` — `DeclareQueue = true`, `ExchangeName = ""` not yet created; background task creates the exchange; assert binding succeeds. +- [x] 5.5 Test: `Timeout_exhaustion_throws_NotFound` — set `TopologyWaitTimeout = TimeSpan.FromMilliseconds(500)` and never create the topology; assert `InitializeAsync` throws `OperationInterruptedException` with `ShutdownReason.ReplyCode == 404`. +- [x] 5.6 Test: `Default_options_still_throw_immediately` — `WaitForTopology` unset; missing topology causes `InitializeAsync` to throw `OperationInterruptedException` on first attempt (uses a stopwatch to assert elapsed < 1 s as a behaviour guard). +- [x] 5.7 Test: `NonNotFound_error_does_not_retry` — pre-create an exchange with mismatched `durable` argument; configure the publisher with the opposite `Durable` value (forces `PRECONDITION_FAILED` 406 only if `DeclareExchange = true`; for the wait path, force a different non-404 error by, e.g., closing the connection between attempts). Choose whichever fault injection is straightforward against Testcontainers; the goal is to assert that non-404 errors are not retried. +- [x] 5.8 Test: `Cancellation_during_wait_throws_OperationCanceledException` — start `InitializeAsync` with a 30 s interval and cancel the token after 200 ms; assert `OperationCanceledException` is thrown promptly. + +## 6. Documentation + +- [x] 6.1 Update `CLAUDE.md` `RayTree.Plugins.RabbitMQ` row(s) to describe `WaitForTopology`, `TopologyWaitInterval`, `TopologyWaitTimeout`, and the microservice topology-ownership scenario that motivates them. Reference passive declares as the probe mechanism and `NOT_FOUND`-only retry semantics. +- [x] 6.2 If task 3.3 lands option (a), update the "Logging placement rule" paragraph in `CLAUDE.md` to add `RabbitMqPublisher` to the list of runtime services that take a non-null logger, and keep `RabbitMqConsumer` in the exception list with the existing rationale unchanged. + +## 7. Verification + +- [x] 7.1 `dotnet build RayTree.slnx -c Release` succeeds with `TreatWarningsAsErrors=true` (no nullable warnings introduced). +- [x] 7.2 `dotnet test tests/RayTree.Plugins.RabbitMQ.Tests` passes locally (Docker required). +- [x] 7.3 `openspec validate rmq-retry-missing-topology --strict` reports no issues. diff --git a/openspec/specs/rmq-topology-wait/spec.md b/openspec/specs/rmq-topology-wait/spec.md new file mode 100644 index 0000000..053fd90 --- /dev/null +++ b/openspec/specs/rmq-topology-wait/spec.md @@ -0,0 +1,109 @@ +### Requirement: Opt-in topology wait flag +The RabbitMQ publisher and consumer SHALL expose a `WaitForTopology` boolean option (default `false`) that, when `true`, causes `InitializeAsync` to wait for externally-owned RabbitMQ topology (exchanges and queues) to appear instead of failing immediately on `NOT_FOUND`. + +#### Scenario: Default behaviour is unchanged +- **WHEN** `WaitForTopology` is not set (or set to `false`) on `RabbitMqPublisherOptions` or `RabbitMqConsumerOptions` +- **THEN** `InitializeAsync` SHALL behave exactly as before — a missing exchange, missing queue, or `QueueBind` to a missing exchange SHALL surface the underlying `OperationInterruptedException` on the first failed AMQP operation without any retry. + +#### Scenario: Opt-in enables wait loop +- **WHEN** `WaitForTopology = true` is set on either options class +- **THEN** `InitializeAsync` SHALL probe the relevant topology with passive declares and retry on `NOT_FOUND` according to the configured interval and timeout. + +### Requirement: Publisher waits for externally-owned exchange +When `RabbitMqPublisherOptions.WaitForTopology = true` AND `DeclareExchange = false`, `RabbitMqPublisher.InitializeAsync` SHALL probe the configured `ExchangeName` with `ExchangeDeclarePassiveAsync` and retry on `NOT_FOUND` until the exchange appears, the cancellation token is cancelled, or `TopologyWaitTimeout` (when set) elapses. + +#### Scenario: Exchange appears after one or more probe attempts +- **WHEN** the exchange named in `ExchangeName` does not exist at the moment `InitializeAsync` is called but is declared by another service shortly after +- **THEN** the publisher SHALL retry the passive declare at intervals of `TopologyWaitInterval` +- **AND** SHALL complete `InitializeAsync` successfully once the passive declare succeeds +- **AND** SHALL log the first `NOT_FOUND` at `Information` level and the eventual recovery at `Information` level. + +#### Scenario: `DeclareExchange = true` skips the wait +- **WHEN** `DeclareExchange = true` regardless of `WaitForTopology` +- **THEN** the publisher SHALL declare the exchange itself with `ExchangeDeclareAsync` and SHALL NOT perform a passive probe. + +### Requirement: Consumer waits for externally-owned queue +When `RabbitMqConsumerOptions.WaitForTopology = true` AND `DeclareQueue = false`, `RabbitMqConsumer.InitializeAsync` SHALL probe the configured `QueueName` with `QueueDeclarePassiveAsync` and retry on `NOT_FOUND` until the queue appears, the cancellation token is cancelled, or `TopologyWaitTimeout` (when set) elapses. The probe SHALL occur before any `QueueBindAsync` or `BasicConsumeAsync` call. + +#### Scenario: Queue appears after one or more probe attempts +- **WHEN** the queue named in `QueueName` does not exist when `InitializeAsync` is called +- **AND** another service declares it shortly after +- **THEN** the consumer SHALL retry the passive declare at intervals of `TopologyWaitInterval` +- **AND** SHALL proceed to `BasicConsumeAsync` once the passive declare succeeds. + +#### Scenario: `DeclareQueue = true` skips the queue wait +- **WHEN** `DeclareQueue = true` regardless of `WaitForTopology` +- **THEN** the consumer SHALL declare the queue itself with `QueueDeclareAsync` and SHALL NOT perform a passive probe for the queue. + +### Requirement: Consumer waits for binding-target exchange +When `RabbitMqConsumerOptions.WaitForTopology = true` AND `ExchangeName` is non-empty, `RabbitMqConsumer.InitializeAsync` SHALL probe `ExchangeName` with `ExchangeDeclarePassiveAsync` and retry on `NOT_FOUND` before calling `QueueBindAsync`, using the same interval and timeout as the queue probe. + +#### Scenario: Exchange-for-binding appears after probe retries +- **WHEN** `ExchangeName` references an exchange that does not yet exist +- **THEN** the consumer SHALL retry the passive declare at intervals of `TopologyWaitInterval` +- **AND** SHALL invoke `QueueBindAsync` once the exchange exists. + +#### Scenario: No `ExchangeName` configured +- **WHEN** `ExchangeName` is null or empty +- **THEN** the consumer SHALL NOT probe any exchange and SHALL skip `QueueBindAsync` (default-exchange routing path). + +### Requirement: Retry only on `NOT_FOUND` +The topology wait loop SHALL retry only when the AMQP operation fails with reply code `404 NOT_FOUND`. All other channel-level exceptions, connection-level exceptions, and `OperationCanceledException` SHALL propagate immediately without retry. + +#### Scenario: PRECONDITION_FAILED propagates immediately +- **WHEN** the broker rejects the passive declare with reply code `406 PRECONDITION_FAILED` (for example, the existing exchange has different arguments than expected) +- **THEN** `InitializeAsync` SHALL propagate the exception on the first attempt without further retries. + +#### Scenario: ACCESS_REFUSED propagates immediately +- **WHEN** the broker rejects the operation with reply code `403 ACCESS_REFUSED` +- **THEN** `InitializeAsync` SHALL propagate the exception on the first attempt without further retries. + +#### Scenario: Connection failure propagates immediately +- **WHEN** the TCP connection cannot be established or is dropped during initialization +- **THEN** the resulting connection-level exception SHALL propagate without retry. + +### Requirement: Retry interval and timeout configuration +The publisher and consumer options SHALL expose `TopologyWaitInterval` (TimeSpan, default `5 seconds`) and `TopologyWaitTimeout` (TimeSpan?, default `null`). When `TopologyWaitTimeout` is non-null, the wait loop SHALL stop and rethrow the most recent `NOT_FOUND` exception once the elapsed time exceeds the timeout. + +#### Scenario: Custom interval is honoured +- **WHEN** `TopologyWaitInterval = TimeSpan.FromSeconds(1)` is set +- **THEN** consecutive passive declare attempts SHALL be separated by approximately one second. + +#### Scenario: Timeout exhaustion surfaces the underlying error +- **WHEN** `TopologyWaitTimeout = TimeSpan.FromSeconds(10)` is set +- **AND** the topology has not appeared after 10 seconds of probing +- **THEN** `InitializeAsync` SHALL throw the last observed `NOT_FOUND` exception. + +#### Scenario: Null timeout means no ceiling +- **WHEN** `TopologyWaitTimeout = null` +- **THEN** the wait loop SHALL continue indefinitely until either the topology appears or the cancellation token is cancelled. + +### Requirement: Cancellation token cancels the wait +The wait loop SHALL observe the `CancellationToken` passed into `InitializeAsync`. When the token is cancelled during a wait or between attempts, the loop SHALL throw `OperationCanceledException`. + +#### Scenario: Cancellation during the inter-attempt delay +- **WHEN** the cancellation token is cancelled while the wait loop is sleeping between attempts +- **THEN** `InitializeAsync` SHALL throw `OperationCanceledException` rather than continuing. + +### Requirement: Fresh channel per retry +Each passive-declare attempt SHALL be executed on a freshly created channel because RabbitMQ closes the channel on every channel-level exception. The persistent working channel held by the publisher/consumer SHALL be opened only after the probe(s) succeed. + +#### Scenario: Channel is recreated after a NOT_FOUND +- **WHEN** a passive declare returns `NOT_FOUND` and closes the probe channel +- **THEN** the next attempt SHALL open a new channel from the existing connection before issuing the next passive declare. + +### Requirement: Logging of topology wait +The plugin SHALL emit the following log entries when `WaitForTopology = true`: + +- First `NOT_FOUND` per probed entity: `Information`, with the entity kind (exchange/queue) and name. +- Subsequent `NOT_FOUND` attempts for the same entity: `Debug`. +- Recovery (probe succeeds after one or more misses): `Information`. +- Timeout exhaustion: `Error`, immediately before rethrowing. + +#### Scenario: First miss logged at Information +- **WHEN** the first passive declare for an entity returns `NOT_FOUND` +- **THEN** an `Information`-level log SHALL be emitted indicating the consumer/publisher is waiting for that entity by name. + +#### Scenario: Recovery logged at Information +- **WHEN** a passive declare succeeds after at least one prior `NOT_FOUND` +- **THEN** an `Information`-level log SHALL be emitted indicating the topology became available. diff --git a/src/RayTree.Plugins.RabbitMQ/README.md b/src/RayTree.Plugins.RabbitMQ/README.md index 7484115..d23f841 100644 --- a/src/RayTree.Plugins.RabbitMQ/README.md +++ b/src/RayTree.Plugins.RabbitMQ/README.md @@ -159,6 +159,9 @@ The serialised entity payload goes in `body` (already compressed by the time it | `RoutingKeySelector` | `envelope => $"{RoutingKey}.{EntityType}.{changeType}"` | Overrides routing-key construction entirely — receives the full `MessageEnvelope` and returns any string. Replaces the default delegate by assigning a new one | | `DeclareExchange` | `true` | Set `false` if the exchange is pre-provisioned and your credentials lack declare rights | | `Durable` | `true` | Survives broker restart (only relevant when declaring) | +| `WaitForTopology` | `false` | When `true` and `DeclareExchange = false`, probe `ExchangeName` with a passive declare and retry on `NOT_FOUND` until it appears, the token is cancelled, or `TopologyWaitTimeout` elapses | +| `TopologyWaitInterval` | `5 s` | Delay between passive-declare attempts (used when `WaitForTopology = true`) | +| `TopologyWaitTimeout` | `null` | Hard deadline for the wait loop; `null` means no ceiling | --- @@ -293,6 +296,9 @@ stateDiagram-v2 | `ExchangeName` | `null` | When set, queue is bound to this exchange during init | | `BindingKey` | `"#"` | Routing-key pattern for the binding (only used when `ExchangeName` is set) | | `AckAfterHandler` | `false` | **At-most-once** (default) or **at-least-once** (`true`) — see below | +| `WaitForTopology` | `false` | When `true`: probes `QueueName` (if `DeclareQueue = false`) and `ExchangeName` (if non-empty) with passive declares, retrying on `NOT_FOUND` until they appear, the token is cancelled, or `TopologyWaitTimeout` elapses | +| `TopologyWaitInterval` | `5 s` | Delay between passive-declare attempts (used when `WaitForTopology = true`) | +| `TopologyWaitTimeout` | `null` | Hard deadline for the wait loop; `null` means no ceiling | ### `AckAfterHandler` — delivery-guarantee toggle @@ -324,6 +330,83 @@ The delivery tag is broker-private state — `ChangeSubscriber` shouldn't know a --- +## Topology wait (microservice deployments) + +In a microservice deployment one service typically owns and declares the exchange and queue +while other services connect to them as publisher or consumer. If the owning service has not +started yet when `InitializeAsync` is called, the broker returns `NOT_FOUND` (404) and the +connection attempt fails. + +Set `WaitForTopology = true` to enable an opt-in retry loop. The plugin probes with AMQP +passive declares at `TopologyWaitInterval` intervals and returns as soon as the topology +exists. The `CancellationToken` passed to `InitializeAsync` cancels the loop at any time; +`TopologyWaitTimeout` adds an optional hard deadline. + +``` +[Publisher / Consumer] + InitializeAsync() + ├─ WaitForTopology = false (default) + │ └─ open channel → declare / consume → OK or throw immediately + └─ WaitForTopology = true + ├─ open probe channel + ├─ ExchangeDeclarePassiveAsync / QueueDeclarePassiveAsync + │ ├─ OK → close probe, open working channel, continue + │ ├─ NOT_FOUND → close probe, wait TopologyWaitInterval, retry + │ └─ other → propagate immediately (no retry) + └─ TopologyWaitTimeout elapsed → rethrow last NOT_FOUND +``` + +**Which probes fire?** + +| Side | Condition | Probe | +|---|---|---| +| Publisher | `WaitForTopology = true` AND `DeclareExchange = false` | `ExchangeDeclarePassiveAsync(ExchangeName)` | +| Consumer | `WaitForTopology = true` AND `DeclareQueue = false` | `QueueDeclarePassiveAsync(QueueName)` | +| Consumer | `WaitForTopology = true` AND `ExchangeName` non-empty | `ExchangeDeclarePassiveAsync(ExchangeName)` before `QueueBindAsync` | + +Only `NOT_FOUND` (404) retries. `PRECONDITION_FAILED`, `ACCESS_REFUSED`, and connection +errors propagate immediately so genuine misconfiguration still fails fast. + +**Logging** (publisher only — `RabbitMqConsumer` has no logger by design): + +| Level | When | +|---|---| +| `Information` | First miss for a given exchange or queue | +| `Debug` | Subsequent misses | +| `Information` | Recovery (topology became available) | +| `Error` | Timeout exhaustion, immediately before rethrowing | + +### Publisher — wait for externally-owned exchange + +```csharp +new RabbitMqPublisher(new RabbitMqPublisherOptions +{ + HostName = "rabbitmq", + ExchangeName = "entity_changes", + DeclareExchange = false, // owned by topology-service + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2), + TopologyWaitTimeout = TimeSpan.FromMinutes(5) +}, loggerFactory) +``` + +### Consumer — wait for externally-owned queue and binding exchange + +```csharp +new RabbitMqConsumer(new RabbitMqConsumerOptions +{ + HostName = "rabbitmq", + QueueName = "order-events", + ExchangeName = "entity_changes", + DeclareQueue = false, // queue owned by another service + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(2) + // TopologyWaitTimeout = null → retry until CancellationToken is cancelled +}) +``` + +--- + ## Integration with `IChangeTrackingBuilder` (DI / Hosting) ```csharp diff --git a/src/RayTree.Plugins.RabbitMQ/RabbitMqBuilderExtensions.cs b/src/RayTree.Plugins.RabbitMQ/RabbitMqBuilderExtensions.cs index 9e71b3f..48b75fe 100644 --- a/src/RayTree.Plugins.RabbitMQ/RabbitMqBuilderExtensions.cs +++ b/src/RayTree.Plugins.RabbitMQ/RabbitMqBuilderExtensions.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.Logging; using RayTree.Core.Plugins.Publisher; using RayTree.Core.Tracking; @@ -7,14 +8,15 @@ public static class RabbitMqBuilderExtensions { public static IChangeTrackingBuilder UseRabbitMq( this IChangeTrackingBuilder builder, - Action configure) + Action configure, + ILoggerFactory? loggerFactory = null) { ArgumentNullException.ThrowIfNull(builder); ArgumentNullException.ThrowIfNull(configure); var options = new RabbitMqPublisherOptions(); configure(options); - return builder.UsePublisher(_ => new RabbitMqPublisher(options)); + return builder.UsePublisher(_ => new RabbitMqPublisher(options, loggerFactory)); } public static RabbitMqPublisherOptions WithExchange( diff --git a/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumer.cs b/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumer.cs index ea4ad06..126c4fe 100644 --- a/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumer.cs +++ b/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumer.cs @@ -33,42 +33,94 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default) }; _connection = await factory.CreateConnectionAsync(cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); - if (_options.DeclareQueue) - await _channel.QueueDeclareAsync( - queue: _options.QueueName, - durable: _options.Durable, - exclusive: false, - autoDelete: false, - arguments: null, + try + { + // No logger is passed to the probe: RabbitMqConsumer intentionally has no logger + // (documented exception to the logging-placement rule in CLAUDE.md). + if (_options is { WaitForTopology: true, DeclareQueue: false }) + { + await TopologyProbe.WaitForQueueAsync( + _connection, + _options.QueueName, + _options.TopologyWaitInterval, + _options.TopologyWaitTimeout, + logger: null, + cancellationToken); + } + + _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + + if (_options.DeclareQueue) + await _channel.QueueDeclareAsync( + queue: _options.QueueName, + durable: _options.Durable, + exclusive: false, + autoDelete: false, + arguments: null, + cancellationToken: cancellationToken + ); + + if (!string.IsNullOrEmpty(_options.ExchangeName)) + { + if (_options.WaitForTopology) + { + await TopologyProbe.WaitForExchangeAsync( + _connection, + _options.ExchangeName, + _options.TopologyWaitInterval, + _options.TopologyWaitTimeout, + logger: null, + cancellationToken); + } + + await _channel.QueueBindAsync( + queue: _options.QueueName, + exchange: _options.ExchangeName, + routingKey: _options.BindingKey, + cancellationToken: cancellationToken + ); + } + + await _channel.BasicQosAsync( + prefetchSize: 0, + prefetchCount: _options.PrefetchCount, + global: false, cancellationToken: cancellationToken ); - if (!string.IsNullOrEmpty(_options.ExchangeName)) - await _channel.QueueBindAsync( + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.ReceivedAsync += OnMessageReceived; + + await _channel.BasicConsumeAsync( queue: _options.QueueName, - exchange: _options.ExchangeName, - routingKey: _options.BindingKey, + autoAck: false, + consumer: consumer, cancellationToken: cancellationToken ); + } + catch + { + await CleanupAfterFailedInitAsync(); + throw; + } + } - await _channel.BasicQosAsync( - prefetchSize: 0, - prefetchCount: _options.PrefetchCount, - global: false, - cancellationToken: cancellationToken - ); - - var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.ReceivedAsync += OnMessageReceived; - - await _channel.BasicConsumeAsync( - queue: _options.QueueName, - autoAck: false, - consumer: consumer, - cancellationToken: cancellationToken - ); + private async Task CleanupAfterFailedInitAsync() + { + if (_channel is not null) + { + try { await _channel.CloseAsync(CancellationToken.None); } catch { /* may already be closed */ } + _channel.Dispose(); + _channel = null; + } + + if (_connection is not null) + { + try { await _connection.CloseAsync(CancellationToken.None); } catch { /* may already be closed */ } + _connection.Dispose(); + _connection = null; + } } private async Task OnMessageReceived(object sender, BasicDeliverEventArgs ea) diff --git a/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumerOptions.cs b/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumerOptions.cs index c3fc21b..926be19 100644 --- a/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumerOptions.cs +++ b/src/RayTree.Plugins.RabbitMQ/RabbitMqConsumerOptions.cs @@ -38,4 +38,39 @@ public class RabbitMqConsumerOptions /// so any concurrency level is safe. /// public bool AckAfterHandler { get; set; } + + /// + /// When true, InitializeAsync waits for externally-owned RabbitMQ topology to + /// appear instead of failing on NOT_FOUND. Specifically: + /// + /// If is false, the consumer probes + /// with a passive declare and retries on NOT_FOUND. + /// If is non-empty, the consumer probes the exchange + /// with a passive declare before QueueBind and retries on NOT_FOUND. + /// + /// Defaults to false — missing topology surfaces the underlying + /// OperationInterruptedException on the first failed AMQP operation as before. + /// + /// Only NOT_FOUND (404) errors trigger retry. Other channel- and connection-level + /// errors propagate immediately so genuine misconfiguration still fails fast. + /// + /// + public bool WaitForTopology { get; set; } + + /// + /// Delay between passive-declare attempts when is true. + /// Defaults to 5 seconds. + /// + public TimeSpan TopologyWaitInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Optional ceiling on the total time the topology wait loop may consume. When null + /// (default), the loop continues indefinitely until the topology appears or the + /// passed to InitializeAsync is cancelled. + /// + /// The timeout is evaluated after each failed attempt, so the observed wait may + /// exceed this value by up to one . Must be positive when set. + /// + /// + public TimeSpan? TopologyWaitTimeout { get; set; } } diff --git a/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisher.cs b/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisher.cs index f95c9c0..2ee7f27 100644 --- a/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisher.cs +++ b/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisher.cs @@ -1,3 +1,5 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using RabbitMQ.Client; using RayTree.Core.Models; using RayTree.Core.Plugins.Publisher; @@ -7,14 +9,16 @@ namespace RayTree.Plugins.RabbitMQ; public class RabbitMqPublisher : IQueuePublisher, IDisposable { private readonly RabbitMqPublisherOptions _options; + private readonly ILogger _logger; private IConnection? _connection; private IChannel? _channel; private readonly SemaphoreSlim _semaphore = new(initialCount: 1, maxCount: 1); - public RabbitMqPublisher(RabbitMqPublisherOptions options) + public RabbitMqPublisher(RabbitMqPublisherOptions options, ILoggerFactory? loggerFactory = null) { _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); } public async Task InitializeAsync(CancellationToken cancellationToken = default) @@ -43,17 +47,37 @@ private async Task GetChannelAsync(CancellationToken cancellationToken }; _connection = await factory.CreateConnectionAsync(cancellationToken: cancellationToken); - _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); - if (_options.DeclareExchange) - await _channel.ExchangeDeclareAsync( - exchange: _options.ExchangeName, - type: _options.ExchangeType, - durable: _options.Durable, - cancellationToken: cancellationToken - ); + try + { + if (_options is { WaitForTopology: true, DeclareExchange: false }) + { + await TopologyProbe.WaitForExchangeAsync( + _connection, + _options.ExchangeName, + _options.TopologyWaitInterval, + _options.TopologyWaitTimeout, + _logger, + cancellationToken); + } + + _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); + + if (_options.DeclareExchange) + await _channel.ExchangeDeclareAsync( + exchange: _options.ExchangeName, + type: _options.ExchangeType, + durable: _options.Durable, + cancellationToken: cancellationToken + ); - return _channel; + return _channel; + } + catch + { + await CleanupAfterFailedInitAsync(); + throw; + } } finally { @@ -90,6 +114,23 @@ await channel.BasicPublishAsync( ); } + private async Task CleanupAfterFailedInitAsync() + { + if (_channel is not null) + { + try { await _channel.CloseAsync(CancellationToken.None); } catch { /* may already be closed */ } + _channel.Dispose(); + _channel = null; + } + + if (_connection is not null) + { + try { await _connection.CloseAsync(CancellationToken.None); } catch { /* may already be closed */ } + _connection.Dispose(); + _connection = null; + } + } + public void Dispose() { _channel?.CloseAsync().GetAwaiter().GetResult(); diff --git a/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisherOptions.cs b/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisherOptions.cs index eb3ef65..9e6c25a 100644 --- a/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisherOptions.cs +++ b/src/RayTree.Plugins.RabbitMQ/RabbitMqPublisherOptions.cs @@ -1,4 +1,4 @@ -using RayTree.Core.Models; +using RayTree.Core.Models; namespace RayTree.Plugins.RabbitMQ; @@ -27,6 +27,42 @@ public class RabbitMqPublisherOptions /// public Func RoutingKeySelector { get; set; } + /// + /// When true AND is false, InitializeAsync + /// probes the configured with an AMQP passive declare and retries on + /// NOT_FOUND (404) until the exchange appears, the cancellation token is cancelled, or + /// (when set) elapses. + /// + /// Use this in microservice deployments where the exchange is owned and declared by a + /// different service. Defaults to false — a missing exchange surfaces the underlying + /// OperationInterruptedException on the first failed AMQP operation as before. + /// + /// + /// Only NOT_FOUND (404) errors trigger retry. Other channel-level errors + /// (PRECONDITION_FAILED, ACCESS_REFUSED, etc.) and connection-level errors + /// propagate immediately so genuine misconfiguration still fails fast. + /// + /// + public bool WaitForTopology { get; set; } + + /// + /// Delay between passive-declare attempts when is true. + /// Defaults to 5 seconds. + /// + public TimeSpan TopologyWaitInterval { get; set; } = TimeSpan.FromSeconds(5); + + /// + /// Optional ceiling on the total time the topology wait loop may consume. When null + /// (default), the loop continues indefinitely until the topology appears or the + /// passed to InitializeAsync is cancelled. Operators who + /// want a hard deadline independent of the host's cancellation should set this explicitly. + /// + /// The timeout is evaluated after each failed attempt, so the observed wait may + /// exceed this value by up to one . Must be positive when set. + /// + /// + public TimeSpan? TopologyWaitTimeout { get; set; } + public RabbitMqPublisherOptions() { RoutingKeySelector = envelope => diff --git a/src/RayTree.Plugins.RabbitMQ/TopologyProbe.cs b/src/RayTree.Plugins.RabbitMQ/TopologyProbe.cs new file mode 100644 index 0000000..508fcdc --- /dev/null +++ b/src/RayTree.Plugins.RabbitMQ/TopologyProbe.cs @@ -0,0 +1,128 @@ +using System.Diagnostics; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; + +namespace RayTree.Plugins.RabbitMQ; + +/// +/// Probes RabbitMQ topology (exchanges, queues) with passive declares and waits for it to appear. +/// Used by and when configured with +/// WaitForTopology = true so that a service consuming externally-owned topology does not +/// crash on startup if the owning service has not yet declared it. +/// +internal static class TopologyProbe +{ + private const ushort NotFoundReplyCode = 404; + + public static Task WaitForExchangeAsync( + IConnection connection, + string exchangeName, + TimeSpan interval, + TimeSpan? timeout, + ILogger? logger, + CancellationToken cancellationToken) + => WaitAsync( + connection, + entityKind: "exchange", + entityName: exchangeName, + probe: static (channel, name, ct) => channel.ExchangeDeclarePassiveAsync(name, ct), + interval, + timeout, + logger, + cancellationToken); + + public static Task WaitForQueueAsync( + IConnection connection, + string queueName, + TimeSpan interval, + TimeSpan? timeout, + ILogger? logger, + CancellationToken cancellationToken) + => WaitAsync( + connection, + entityKind: "queue", + entityName: queueName, + probe: static async (channel, name, ct) => { _ = await channel.QueueDeclarePassiveAsync(name, ct); }, + interval, + timeout, + logger, + cancellationToken); + + private static async Task WaitAsync( + IConnection connection, + string entityKind, + string entityName, + Func probe, + TimeSpan interval, + TimeSpan? timeout, + ILogger? logger, + CancellationToken cancellationToken) + { + if (interval <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(interval), interval, "Topology wait interval must be positive."); + if (timeout is { } t && t <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Topology wait timeout must be positive when set."); + + var stopwatch = Stopwatch.StartNew(); + var missCount = 0; + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + OperationInterruptedException notFound; + IChannel? channel = null; + try + { + channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + await probe(channel, entityName, cancellationToken); + + if (missCount > 0) + { + logger?.LogInformation( + "RabbitMQ {EntityKind} '{EntityName}' became available after {Misses} miss(es) ({Elapsed}).", + entityKind, entityName, missCount, stopwatch.Elapsed); + } + return; + } + catch (OperationInterruptedException ex) when (ex.ShutdownReason?.ReplyCode == NotFoundReplyCode) + { + notFound = ex; + missCount++; + + if (missCount == 1) + { + logger?.LogInformation( + "RabbitMQ {EntityKind} '{EntityName}' not found yet; waiting (interval {Interval}, timeout {Timeout}).", + entityKind, entityName, interval, timeout?.ToString() ?? ""); + } + else + { + logger?.LogDebug( + "RabbitMQ {EntityKind} '{EntityName}' still missing after {Misses} attempts ({Elapsed}).", + entityKind, entityName, missCount, stopwatch.Elapsed); + } + } + finally + { + if (channel is not null) + { + try { await channel.CloseAsync(CancellationToken.None); } catch { /* channel may already be closed by NOT_FOUND */ } + channel.Dispose(); + } + } + + // Timeout check after the failed attempt — `notFound` is guaranteed non-null here. + if (timeout is { } limit && stopwatch.Elapsed >= limit) + { + logger?.LogError( + "RabbitMQ topology wait for {EntityKind} '{EntityName}' timed out after {Elapsed} (limit {Limit}).", + entityKind, entityName, stopwatch.Elapsed, limit); + throw notFound; + } + + await Task.Delay(interval, cancellationToken); + } + } +} diff --git a/tests/RayTree.Plugins.RabbitMQ.Tests/TopologyWaitTests.cs b/tests/RayTree.Plugins.RabbitMQ.Tests/TopologyWaitTests.cs new file mode 100644 index 0000000..48f055d --- /dev/null +++ b/tests/RayTree.Plugins.RabbitMQ.Tests/TopologyWaitTests.cs @@ -0,0 +1,286 @@ +using System.Diagnostics; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using Testcontainers.RabbitMq; + +namespace RayTree.Plugins.RabbitMQ.Tests; + +/// +/// Verifies the opt-in topology-wait behaviour for and +/// : when WaitForTopology = true the side that does NOT +/// own the topology (declare disabled) probes via passive declares and retries on +/// NOT_FOUND until the other side creates it. +/// +[NonParallelizable] +public class TopologyWaitTests : IAsyncDisposable +{ + private readonly RabbitMqContainer _rabbitMq = new RabbitMqBuilder("rabbitmq:4.3.0-alpine") + .Build(); + + [OneTimeSetUp] + public Task OneTimeSetUp() => _rabbitMq.StartAsync(); + + public ValueTask DisposeAsync() => _rabbitMq.DisposeAsync(); + + private ConnectionFactory CreateFactory() => new() + { + HostName = _rabbitMq.Hostname, + Port = _rabbitMq.GetMappedPublicPort(5672), + UserName = RabbitMqBuilder.DefaultUsername, + Password = RabbitMqBuilder.DefaultPassword + }; + + private RabbitMqPublisherOptions BasePublisherOptions(string exchangeName) => new() + { + HostName = _rabbitMq.Hostname, + Port = _rabbitMq.GetMappedPublicPort(5672), + UserName = RabbitMqBuilder.DefaultUsername, + Password = RabbitMqBuilder.DefaultPassword, + ExchangeName = exchangeName, + ExchangeType = "topic", + DeclareExchange = false + }; + + private RabbitMqConsumerOptions BaseConsumerOptions(string queueName, string? exchangeName = null) => new() + { + HostName = _rabbitMq.Hostname, + Port = _rabbitMq.GetMappedPublicPort(5672), + UserName = RabbitMqBuilder.DefaultUsername, + Password = RabbitMqBuilder.DefaultPassword, + QueueName = queueName, + DeclareQueue = false, + ExchangeName = exchangeName + }; + + // --------------------------------------------------------------------- + // 5.2 Publisher waits for an externally-owned exchange to appear. + // --------------------------------------------------------------------- + [Test] + public async Task Publisher_waits_then_succeeds_when_exchange_appears_late() + { + var exchangeName = $"late-exch-{Guid.NewGuid():N}"; + var options = BasePublisherOptions(exchangeName); + options.WaitForTopology = true; + options.TopologyWaitInterval = TimeSpan.FromMilliseconds(200); + + using var publisher = new RabbitMqPublisher(options); + + // Declare the exchange after a short delay on a separate connection. + var declareTask = Task.Run(async () => + { + await Task.Delay(TimeSpan.FromSeconds(2)); + var factory = CreateFactory(); + await using var conn = await factory.CreateConnectionAsync(); + await using var ch = await conn.CreateChannelAsync(); + await ch.ExchangeDeclareAsync(exchangeName, type: "topic", durable: true); + }); + + await publisher.InitializeAsync(); + await declareTask; + + Assert.DoesNotThrowAsync(async () => await publisher.PublishAsync(new Core.Models.MessageEnvelope + { + EntityType = "Order", + EntityId = "1", + ChangeType = Core.Tracking.ChangeType.Insert, + CorrelationId = Guid.NewGuid(), + Timestamp = DateTime.UtcNow, + Payload = new byte[] { 1 } + })); + } + + // --------------------------------------------------------------------- + // 5.3 Consumer waits for an externally-owned queue (no exchange binding) + // and a message published to that queue flows through end-to-end. + // --------------------------------------------------------------------- + [Test] + public async Task Consumer_waits_then_succeeds_when_queue_appears_late() + { + var queueName = $"late-queue-{Guid.NewGuid():N}"; + var options = BaseConsumerOptions(queueName); + options.WaitForTopology = true; + options.TopologyWaitInterval = TimeSpan.FromMilliseconds(200); + + using var consumer = new RabbitMqConsumer(options); + + var declareTask = Task.Run(async () => + { + await Task.Delay(TimeSpan.FromSeconds(2)); + var factory = CreateFactory(); + await using var conn = await factory.CreateConnectionAsync(); + await using var ch = await conn.CreateChannelAsync(); + await ch.QueueDeclareAsync(queueName, durable: true, exclusive: false, autoDelete: false); + }); + + await consumer.InitializeAsync(); + await declareTask; + + // Publish a message directly to the queue (default exchange + queueName routing key) + // on a separate connection, then drain it from the consumer's IAsyncEnumerable to verify + // the consumer is actually wired up — not just that InitializeAsync returned. + var factory = CreateFactory(); + await using (var conn = await factory.CreateConnectionAsync()) + await using (var ch = await conn.CreateChannelAsync()) + { + var props = new BasicProperties + { + MessageId = Guid.NewGuid().ToString(), + Headers = new Dictionary + { + ["entity_type"] = "Order", + ["entity_id"] = "1", + ["change_type"] = "Insert", + ["version"] = 0 + } + }; + await ch.BasicPublishAsync(exchange: "", routingKey: queueName, + mandatory: false, basicProperties: props, body: new byte[] { 1 }); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var enumerator = consumer.ConsumeAsync(cts.Token).GetAsyncEnumerator(cts.Token); + Assert.That(await enumerator.MoveNextAsync(), Is.True, "expected a message to arrive within 10s"); + Assert.That(enumerator.Current.EntityType, Is.EqualTo("Order")); + Assert.That(enumerator.Current.EntityId, Is.EqualTo("1")); + } + + // --------------------------------------------------------------------- + // 5.4 Consumer waits for an externally-owned exchange used for binding. + // --------------------------------------------------------------------- + [Test] + public async Task Consumer_waits_then_succeeds_when_bound_exchange_appears_late() + { + var exchangeName = $"late-bind-exch-{Guid.NewGuid():N}"; + var queueName = $"q-{Guid.NewGuid():N}"; + + var options = BaseConsumerOptions(queueName, exchangeName); + options.DeclareQueue = true; // consumer owns the queue, waits for the exchange + options.WaitForTopology = true; + options.TopologyWaitInterval = TimeSpan.FromMilliseconds(200); + options.BindingKey = "#"; + + using var consumer = new RabbitMqConsumer(options); + + var declareTask = Task.Run(async () => + { + await Task.Delay(TimeSpan.FromSeconds(2)); + var factory = CreateFactory(); + await using var conn = await factory.CreateConnectionAsync(); + await using var ch = await conn.CreateChannelAsync(); + await ch.ExchangeDeclareAsync(exchangeName, type: "topic", durable: true); + }); + + await consumer.InitializeAsync(); + await declareTask; + Assert.Pass(); + } + + // --------------------------------------------------------------------- + // 5.5 Timeout exhaustion surfaces the underlying NOT_FOUND. + // --------------------------------------------------------------------- + [Test] + public void Timeout_exhaustion_throws_NotFound() + { + var options = BasePublisherOptions($"never-{Guid.NewGuid():N}"); + options.WaitForTopology = true; + options.TopologyWaitInterval = TimeSpan.FromMilliseconds(100); + options.TopologyWaitTimeout = TimeSpan.FromMilliseconds(500); + + using var publisher = new RabbitMqPublisher(options); + + var ex = Assert.ThrowsAsync( + async () => await publisher.InitializeAsync()); + Assert.That(ex!.ShutdownReason!.ReplyCode, Is.EqualTo((ushort)404)); + } + + // --------------------------------------------------------------------- + // 5.6 Default (opt-out) behaviour is unchanged: throws immediately. + // + // Exercised on the consumer side because the publisher with `mandatory: false` + // silently drops messages routed to a missing exchange; only the consumer's + // BasicConsume against a missing queue surfaces NOT_FOUND eagerly. + // --------------------------------------------------------------------- + [Test] + public void Default_options_still_throw_immediately() + { + var options = BaseConsumerOptions($"missing-{Guid.NewGuid():N}"); + // WaitForTopology default (false), DeclareQueue = false (BaseConsumerOptions default). + using var consumer = new RabbitMqConsumer(options); + + var sw = Stopwatch.StartNew(); + var ex = Assert.ThrowsAsync( + async () => await consumer.InitializeAsync()); + sw.Stop(); + + Assert.That(ex!.ShutdownReason!.ReplyCode, Is.EqualTo((ushort)404)); + Assert.That(sw.Elapsed, Is.LessThan(TimeSpan.FromSeconds(2)), + "default behaviour must not retry on NOT_FOUND"); + } + + // --------------------------------------------------------------------- + // 5.7 Non-NOT_FOUND errors do not retry. + // + // Note: the broker's PRECONDITION_FAILED is injected via the active-declare path because + // RabbitMQ's passive declare can only return NOT_FOUND for missing topology (argument + // mismatches are impossible — passive declare takes no arguments). The TopologyProbe's + // exception filter (`ReplyCode == 404`) is identical for both paths, so any + // OperationInterruptedException with a non-404 reply code propagates without retry. The + // probe path's "no retry on non-404" is independently covered by + // Cancellation_during_wait_throws_OperationCanceledException (an OperationCanceledException + // is a non-NOT_FOUND throwable that exits the loop on the first occurrence). + // --------------------------------------------------------------------- + [Test] + public async Task NonNotFound_error_does_not_retry() + { + // Pre-create an exchange with type "direct". + var exchangeName = $"mismatch-{Guid.NewGuid():N}"; + var factory = CreateFactory(); + await using (var conn = await factory.CreateConnectionAsync()) + await using (var ch = await conn.CreateChannelAsync()) + { + await ch.ExchangeDeclareAsync(exchangeName, type: "direct", durable: true); + } + + // Now configure a publisher that wants to declare the same exchange as "topic" — that's + // PRECONDITION_FAILED (406), not NOT_FOUND. WaitForTopology must not mask it. + var options = BasePublisherOptions(exchangeName); + options.WaitForTopology = true; + options.TopologyWaitInterval = TimeSpan.FromMilliseconds(100); + options.TopologyWaitTimeout = TimeSpan.FromSeconds(5); + options.DeclareExchange = true; // active declare — type mismatch surfaces immediately. + options.ExchangeType = "topic"; + + using var publisher = new RabbitMqPublisher(options); + + var sw = Stopwatch.StartNew(); + var ex = Assert.ThrowsAsync( + async () => await publisher.InitializeAsync()); + sw.Stop(); + + Assert.That(ex!.ShutdownReason!.ReplyCode, Is.Not.EqualTo((ushort)404)); + Assert.That(sw.Elapsed, Is.LessThan(TimeSpan.FromSeconds(2)), + "non-NOT_FOUND errors must propagate immediately, not be retried"); + } + + // --------------------------------------------------------------------- + // 5.8 Cancellation during the wait throws OperationCanceledException. + // --------------------------------------------------------------------- + [Test] + public void Cancellation_during_wait_throws_OperationCanceledException() + { + var options = BasePublisherOptions($"never-{Guid.NewGuid():N}"); + options.WaitForTopology = true; + options.TopologyWaitInterval = TimeSpan.FromSeconds(30); + + using var publisher = new RabbitMqPublisher(options); + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)); + + var sw = Stopwatch.StartNew(); + Assert.CatchAsync( + async () => await publisher.InitializeAsync(cts.Token)); + sw.Stop(); + + Assert.That(sw.Elapsed, Is.LessThan(TimeSpan.FromSeconds(5)), + "cancellation must propagate promptly, not wait for the full interval"); + } +}