Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,81 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## [0.0.15-pre-release]

### Added

#### Topology wait — opt-in startup retry for externally-owned RabbitMQ topology (`RayTree.Plugins.RabbitMQ`)

In microservice deployments the exchange or queue that a publisher or consumer depends on is
often owned and declared by a separate service. If both services start simultaneously the
consumer-side service may call `InitializeAsync` before the topology exists, causing an
`OperationInterruptedException` with `NOT_FOUND` (404) and an immediate crash-loop.

Three new options on both `RabbitMqPublisherOptions` and `RabbitMqConsumerOptions` enable an
opt-in wait loop that probes with AMQP passive declares and retries until the topology appears:

| Option | Type | Default | Description |
|---|---|---|---|
| `WaitForTopology` | `bool` | `false` | Enable the topology wait loop. |
| `TopologyWaitInterval` | `TimeSpan` | `5 s` | Delay between passive-declare attempts. |
| `TopologyWaitTimeout` | `TimeSpan?` | `null` | Optional hard deadline; `null` means wait indefinitely. |

**Publisher behaviour** (when `WaitForTopology = true` and `DeclareExchange = false`): probes
the configured `ExchangeName` with `ExchangeDeclarePassiveAsync` before opening the working
channel. Retries on `NOT_FOUND`; propagates all other errors immediately.

**Consumer behaviour** (when `WaitForTopology = true`):
- When `DeclareQueue = false`: probes `QueueName` with `QueueDeclarePassiveAsync`.
- When `ExchangeName` is non-empty: probes the exchange with `ExchangeDeclarePassiveAsync`
before calling `QueueBindAsync`.

Both probes use the same `TopologyWaitInterval` and `TopologyWaitTimeout`.

**Non-retried errors**: only `NOT_FOUND` (404) triggers retry. `PRECONDITION_FAILED`,
`ACCESS_REFUSED`, connection failures, and `OperationCanceledException` propagate immediately.

**Fresh channel per attempt**: RabbitMQ closes the channel on every channel-level error, so
each probe opens a new channel from the existing connection. The working channel is created only
after all probes succeed.

**Logging** (publisher only — `RabbitMqConsumer` has no logger by design):

| Level | When |
|---|---|
| `Information` | First `NOT_FOUND` for an exchange or queue |
| `Debug` | Subsequent misses |
| `Information` | Recovery (topology became available) |
| `Error` | Timeout exhaustion before rethrowing |

```csharp
// Publisher waits for an exchange owned by another service
new RabbitMqPublisherOptions
{
ExchangeName = "entity_changes",
DeclareExchange = false, // not our exchange to declare
WaitForTopology = true,
TopologyWaitInterval = TimeSpan.FromSeconds(2),
TopologyWaitTimeout = TimeSpan.FromMinutes(5) // give up after 5 min
}

// Consumer waits for a queue and its binding exchange
new RabbitMqConsumerOptions
{
QueueName = "order-events",
ExchangeName = "entity_changes",
DeclareQueue = false, // queue owned externally
WaitForTopology = true,
TopologyWaitInterval = TimeSpan.FromSeconds(2)
// TopologyWaitTimeout = null → retry indefinitely until token is cancelled
}
```

The default (`WaitForTopology = false`) is unchanged — a missing exchange or queue surfaces the
underlying `OperationInterruptedException` on the first AMQP operation, exactly as before.

---

## [0.0.14-pre-release]

### Added
Expand Down
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ EntityChangeTracker
| `RayTree.Plugins.PostgreSQL` | `PostgreSqlOutbox<TEntity>` — stores changes as flat columns (one column per entity property via `EntityColumnMapper`). Constructor: `PostgreSqlOutbox<TEntity>(PostgreSqlOutboxOptions, ILoggerFactory)` — both params required. `PostgreSqlRepository<TEntity>` constructor: `PostgreSqlRepository<TEntity>(PostgreSqlRepositoryOptions, ILoggerFactory)` — both params required. Builder extension methods accept `ILoggerFactory? loggerFactory = null` and default to `NullLoggerFactory.Instance`. `EntityColumnMapper` honours `System.ComponentModel.DataAnnotations` / `Schema` attributes: `[NotMapped]` excludes a property; `[Column("name")]` overrides the column name suffix (the `state_` prefix is always kept to avoid collisions with outbox metadata columns); `[Column(TypeName = "JSONB")]` sets the PostgreSQL type verbatim; `[Required]` forces `NOT NULL` on reference types; `[MaxLength(n)]`/`[StringLength(n)]` emits `VARCHAR(n)` instead of `TEXT`; `[Table("name")]` on the entity class is used as the base name when deriving default outbox/source table names; `[Key]` (one or more properties) identifies the business primary key — `PostgreSqlRepository` uses these for INSERT/UPDATE/DELETE/SELECT and adds a UNIQUE index on the corresponding `state_*` columns in the source table; for composite keys pair `[Key]` with `[Column(Order = n)]` to control column order. 1D arrays of primitive types are automatically mapped to the corresponding PostgreSQL array column type: `int[]` → `INTEGER[]`, `long[]` → `BIGINT[]`, `bool[]` → `BOOLEAN[]`, `string[]` → `TEXT[]`, `Guid[]` → `UUID[]`, `float[]` → `REAL[]`, `double[]` → `DOUBLE PRECISION[]`, `decimal[]` → `NUMERIC[]`, `DateTime[]`/`DateTimeOffset[]` → `TIMESTAMPTZ[]`, `short[]`/`byte[]`/`sbyte[]` → `SMALLINT[]`; nullable-element arrays (e.g. `int?[]`) strip the nullable wrapper before mapping the element type. Multi-dimensional arrays are not supported — declare the column type explicitly via `[Column(TypeName = "...")]` if needed. When reading values back, `EntityColumnMapper.ConvertFromDb` first attempts a direct CLR assignability check (Npgsql returns the correct array type natively) and falls back to `Convert.ChangeType` for scalar numeric coercions. Both `CleanupPublishedAsync` and `CleanupStaleUnpublishedAsync` delete in batches (`PostgreSqlOutboxOptions.CleanupBatchSize`, default 1000) using a `DELETE … WHERE id IN (SELECT id … LIMIT @BatchSize)` loop to avoid large single-statement locks and WAL spikes. **`InitializeAsync` manages schema automatically** — no flag required, always active. Fresh table path: single `CREATE TABLE IF NOT EXISTS` (columns + indexes). Existing table path: column diff via `SchemaMigrator` (adds missing columns with `ALTER TABLE … ADD COLUMN IF NOT EXISTS`; guards NOT NULL without default on non-empty tables by throwing `InvalidOperationException`; logs `Warning` for orphan columns and type mismatches) + index diff via `IndexMigrator` (creates missing indexes; drops and recreates indexes whose definition changed — uniqueness, column order, or WHERE clause; logs `Warning` for orphan indexes). Internal infrastructure: `SchemaInspector` (static — `TableExistsAsync`, `GetColumnsAsync` via `information_schema.columns`, `GetIndexesAsync` via `pg_index` catalog using `unnest(indkey::smallint[]) WITH ORDINALITY` for ordered columns and `pg_get_expr` for WHERE, `ExecuteDdlAsync`, `TableHasRowsAsync`); `SchemaMigrator` (column diff, parameterised delegate for DDL generation and orphan filter); `IndexMigrator` (index diff with schema-qualified `DROP INDEX IF EXISTS public.{name}`; WHERE clause comparison is case-insensitive and trimmed); `PostgreSqlTypeNormalizer` (maps `information_schema` type fields to canonical DDL strings). `NotificationBasedPublisher` — NOTIFY/LISTEN fast-path with polling fallback; bounded by `NotificationBasedPublisherOptions.MaxConcurrentNotifications` (default 16) via a `SemaphoreSlim` in `OnNotification`; fallback polling uses `Parallel.ForEachAsync` with `MaxPublishConcurrency` (default 1 — sequential). Logs LISTEN connection loss at `Warning` (once, on the first unhealthy tick), recovery at `Information`, and claim contention (record already taken by another publisher) at `Debug`. |
| `RayTree.Plugins.InMemory` | `InMemoryQueue` implements both `IQueuePublisher` and `IQueueConsumer` via `Channel<MessageEnvelope>`. Use for tests and local dev. |
| `RayTree.Plugins.Kafka` | `KafkaPublisher` + `KafkaConsumer`. **Publisher key**: `KafkaPublisherOptions.KeySelector` (`Func<MessageEnvelope, string>`) selects the Kafka partition key for each message. Default: `envelope => $"{EntityType}:{EntityId}"` — all changes for the same entity land on the same partition, preserving per-entity ordering. Override to shard by any envelope field (e.g. tenant, aggregate root). Consumer uses a dedicated background thread (channel-based) because Confluent.Kafka requires all `Consume`/`Commit`/`Seek` calls on one thread. `KafkaConsumer(KafkaConsumerOptions, ILoggerFactory)` — both params required. `KafkaConsumerOptions.AckAfterHandler` (default `false`) defers the offset commit; subscriber posts the `ConsumeResult` plus a `Commit`/`SeekBack` action through an internal post-handler channel that the poll thread drains at the top of each iteration (when items are queued, the next `Consume()` uses `TimeSpan.Zero` so commits don't wait a full poll cycle). `AcknowledgeAsync` → `Commit`; `NegativeAcknowledgeAsync` → `Seek(TopicPartitionOffset)` so the failed message is redelivered in the same consumer's lifetime, not just on restart. Parse-failure path always commits immediately to avoid poison-pilling the partition. Requires `SubscriberOptions.MaxDegreeOfParallelism = 1` per partition when `AckAfterHandler = true`. |
| `RayTree.Plugins.RabbitMQ` | `RabbitMqPublisher` + `RabbitMqConsumer`. **Routing key**: `RabbitMqPublisherOptions.RoutingKeySelector` (`Func<MessageEnvelope, string>`) selects the AMQP routing key for each message. Default: `"{RoutingKey}.{EntityType}.{changeType}"` (e.g. `change.Order.insert`) — consumers bind queues with wildcard patterns such as `change.Order.*` or `change.*.insert`. The default delegate reads `RoutingKey` at call time so changing that property after construction is always reflected; set a custom delegate to route by tenant, aggregate root, or any envelope field. Consumer uses `AsyncEventingBasicConsumer` buffered via `Channel<MessageEnvelope>`. `RabbitMqConsumer(RabbitMqConsumerOptions)` — options only; no logger. Message-receive errors silently NACK and requeue without logging (acknowledged exception to the logging placement rule — NACK/requeue is the correct recovery action and no context is available at that point). `RabbitMqConsumerOptions.AckAfterHandler` (default `false`) defers the broker ACK until after `ChangeSubscriber` confirms handler success — delivery tag is stashed in `MessageEnvelope.Metadata` via the internal `RabbitMqEnvelopeMetadata` accessor; `AcknowledgeAsync` issues `BasicAckAsync`; `NegativeAcknowledgeAsync` issues `BasicNackAsync(requeue: true)`. |
| `RayTree.Plugins.RabbitMQ` | `RabbitMqPublisher` + `RabbitMqConsumer`. **Routing key**: `RabbitMqPublisherOptions.RoutingKeySelector` (`Func<MessageEnvelope, string>`) selects the AMQP routing key for each message. Default: `"{RoutingKey}.{EntityType}.{changeType}"` (e.g. `change.Order.insert`) — consumers bind queues with wildcard patterns such as `change.Order.*` or `change.*.insert`. The default delegate reads `RoutingKey` at call time so changing that property after construction is always reflected; set a custom delegate to route by tenant, aggregate root, or any envelope field. `RabbitMqPublisher(RabbitMqPublisherOptions, ILoggerFactory?)` — options required, logger factory optional (`null` → `NullLoggerFactory.Instance`); `UseRabbitMq(configure, loggerFactory)` mirrors the same shape. Consumer uses `AsyncEventingBasicConsumer` buffered via `Channel<MessageEnvelope>`. `RabbitMqConsumer(RabbitMqConsumerOptions)` — options only; no logger. Message-receive errors silently NACK and requeue without logging (acknowledged exception to the logging placement rule — NACK/requeue is the correct recovery action and no context is available at that point). `RabbitMqConsumerOptions.AckAfterHandler` (default `false`) defers the broker ACK until after `ChangeSubscriber` confirms handler success — delivery tag is stashed in `MessageEnvelope.Metadata` via the internal `RabbitMqEnvelopeMetadata` accessor; `AcknowledgeAsync` issues `BasicAckAsync`; `NegativeAcknowledgeAsync` issues `BasicNackAsync(requeue: true)`. **Topology wait** (opt-in, both options classes): `WaitForTopology` (bool, default `false`), `TopologyWaitInterval` (TimeSpan, default 5 s), `TopologyWaitTimeout` (TimeSpan?, default `null` — unlimited). When `WaitForTopology = true`, `InitializeAsync` probes externally-owned topology via AMQP passive declares (`ExchangeDeclarePassiveAsync` / `QueueDeclarePassiveAsync`) and retries only on `NOT_FOUND` (404) until the topology appears, the cancellation token is cancelled, or `TopologyWaitTimeout` elapses (rethrowing the last `NOT_FOUND`). Other channel- and connection-level errors (`PRECONDITION_FAILED`, `ACCESS_REFUSED`, etc.) propagate immediately. Each probe attempt uses a fresh channel from the existing connection because RabbitMQ closes the channel on any channel-level exception. The publisher probes when `DeclareExchange = false`; the consumer probes the queue when `DeclareQueue = false` and the binding-target exchange when `ExchangeName` is non-empty. Probe progress is logged by the publisher (first miss `Information`, subsequent misses `Debug`, recovery `Information`, timeout `Error` via `TopologyProbe`); the consumer's no-logger exception still holds, so consumer-side probes log nothing. Use this in microservice deployments where one service owns the topology and others connect later without strict startup ordering. |
| `RayTree.Plugins.Serializers.*` | JSON, MessagePack, Protobuf — each in its own package. |
| `RayTree.Plugins.Compressors.*` | Gzip, Brotli, LZ4 — each in its own package. |

Expand Down Expand Up @@ -130,7 +130,7 @@ All durations are emitted in seconds (`s`) per OTel semantic conventions; bytes
- **Integration tests use Testcontainers**: PostgreSQL, Kafka, and RabbitMQ tests require Docker. Mark test classes `[NonParallelizable]` when sharing a container. Use unique topic/queue names per test to avoid cross-test contamination.
- **Metrics placement rule — required meter, no null fallback**: `RayTreeMeter` is a required non-null constructor parameter on every runtime service that emits metrics (`ChangePublisher`, `OutboxPublisherService`, `ChangeSubscriber`). There is no internal `new RayTreeMeter()` fallback in those classes — the builder layer (`ChangeTrackingBuilder.BuildInternal`) constructs a default meter when the caller didn't supply one via `UseMeter`, then injects it everywhere. This matches the logging rule: callers make a conscious choice at builder/DI level, runtime services have no hidden defaults. `EntityChangeTracker` tracks ownership via an `ownsMeter` flag and disposes the meter only when it created it; caller-supplied meters are left alone. Instrument calls are silent no-ops when no listener is attached, so opting out costs nothing at runtime.
- **OTel SDK isolation via peer assembly**: `RayTree.Core` and `RayTree.Hosting` use only `System.Diagnostics.Metrics` (BCL) — no `OpenTelemetry.*` package references. `RayTree.OpenTelemetry` is a separate assembly with two members (`RayTreeInstrumentation.MeterName` + `AddRayTreeMetrics`) that an application opts into. Applications that don't need OTel receive zero transitive OTel dependencies; applications that do reference exactly one well-versioned dependency. This mirrors the `RayTree.Hosting` / `RayTree.EntityFrameworkCore` split.
- **Logging placement rule**: `NullLoggerFactory.Instance` / `NullLogger<T>.Instance` defaults belong **only** in builders and builder-context extension methods (`ChangeTrackingBuilder`, `ChangePublisherBuilder`, `ChangeSubscriberBuilder`, `KafkaSubscriberExtensions.UseKafka`, `RabbitMqSubscriberExtensions.UseRabbitMq`, `BuilderExtensions.UsePostgreSqlOutbox`, `RepositoryExtensions.UsePostgreSqlRepository`). All runtime service classes (`ChangePublisher`, `OutboxPublisherService`, `ChangeSubscriber`, `ChangeTrackingHostedService`, `KafkaConsumer`, `NotificationBasedPublisher`, `PostgreSqlOutbox<TEntity>`, `PostgreSqlRepository<TEntity>`) require a non-nullable logger — no internal fallback. **Exception**: `RabbitMqConsumer` intentionally has no logger — message-receive errors silently NACK and requeue, which is the correct broker-level recovery; no useful context is available inside the RabbitMQ delivery callback to produce a meaningful log entry. This ensures that callers always make a conscious choice about whether to produce log output.
- **Logging placement rule**: `NullLoggerFactory.Instance` / `NullLogger<T>.Instance` defaults belong **only** in builders and builder-context extension methods (`ChangeTrackingBuilder`, `ChangePublisherBuilder`, `ChangeSubscriberBuilder`, `KafkaSubscriberExtensions.UseKafka`, `RabbitMqBuilderExtensions.UseRabbitMq`, `RabbitMqSubscriberExtensions.UseRabbitMq`, `BuilderExtensions.UsePostgreSqlOutbox`, `RepositoryExtensions.UsePostgreSqlRepository`). All runtime service classes (`ChangePublisher`, `OutboxPublisherService`, `ChangeSubscriber`, `ChangeTrackingHostedService`, `KafkaConsumer`, `NotificationBasedPublisher`, `PostgreSqlOutbox<TEntity>`, `PostgreSqlRepository<TEntity>`) require a non-nullable logger — no internal fallback. `RabbitMqPublisher` accepts an optional `ILoggerFactory?` (null → `NullLoggerFactory.Instance`) to support topology-wait logging without forcing every caller through DI. **Exception**: `RabbitMqConsumer` intentionally has no logger — message-receive errors silently NACK and requeue, which is the correct broker-level recovery; no useful context is available inside the RabbitMQ delivery callback to produce a meaningful log entry. Consumer-side topology-wait probes therefore pass `logger: null` to `TopologyProbe`, preserving the exception. This ensures that callers always make a conscious choice about whether to produce log output.

## Code Style & Conventions

Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>nullable</WarningsAsErrors>

<VersionPrefix>0.0.14</VersionPrefix>
<VersionPrefix>0.0.15</VersionPrefix>
<VersionSuffix>pre-release</VersionSuffix>

<Authors>bitc0der</Authors>
Expand Down
Loading
Loading