Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,68 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## [0.0.17-pre-release]

### Added

#### Optional `WaitForTopic` retry for Kafka publisher and consumer (`RayTree.Plugins.Kafka`)

Mirrors the existing RabbitMQ `WaitForTopology` feature for Kafka. When `WaitForTopic = true`
is set on either `KafkaPublisherOptions` or `KafkaConsumerOptions`, `InitializeAsync` probes
the broker via `IAdminClient.GetMetadata` and retries while the response indicates the topic
is not yet available — empty `Topics` collection, per-topic `UnknownTopicOrPart`, per-topic
`LeaderNotAvailable` (cluster bootstrap / leader election), or a transient transport-level
`KafkaException` (`Local_Transport`, `Local_AllBrokersDown`, `Local_Resolve`, `Local_TimedOut`
— the broker-not-yet-reachable startup race). Other broker errors (authorization, fatal
librdkafka errors) propagate immediately. New options on both classes: `WaitForTopic` (bool,
default `false`), `TopicWaitInterval` (TimeSpan, default 5 s), `TopicWaitTimeout` (TimeSpan?,
default `null`). Both Kafka builder extensions (`UseKafka` on publisher and `UseKafka<TEntity>`
on subscriber) now accept an optional `ILoggerFactory?` parameter so probe logs reach the host
logging infrastructure when using the documented fluent API.

The probe is implemented in a new internal `KafkaTopicProbe` helper. Per-call metadata timeout
is a fixed ~1 s decoupled from `TopicWaitInterval` so cancellation latency and shutdown
thread-pool occupancy are bounded regardless of how long the interval is set.

### Changed — BINARY-BREAKING

#### `KafkaPublisher` constructor adds optional `ILoggerFactory?` parameter

`public KafkaPublisher(KafkaPublisherOptions options)` → `public KafkaPublisher(KafkaPublisherOptions options, ILoggerFactory? loggerFactory = null)`.

This is **source-compatible** (existing `new KafkaPublisher(options)` call-sites continue to
compile) but **binary-breaking** (adding an optional parameter to a public constructor
changes the constructor's binary signature). Downstream applications consuming
`RayTree.Plugins.Kafka` as a published NuGet must **recompile** against this version —
binaries built against the older signature will hit `MissingMethodException` at runtime.

### Changed

- `KafkaPublisher` now uses two `SemaphoreSlim` instances (one for the one-shot topic probe
gated by a `volatile bool _probeCompleted` flag, one for the very short producer-build
critical section) instead of the previous `lock`. Splitting the two means concurrent
`PublishAsync` callers do NOT serialize behind a multi-second topic-wait probe — they
contend only on the microsecond-long builder lock. The probe runs inside the lazy
`GetProducerAsync` path so it covers both `InitializeAsync` and direct `PublishAsync`.
- `KafkaPublisher.Dispose` is now idempotent (`volatile bool _disposed` guard) and its
internal `SafeRelease` swallows `ObjectDisposedException` from in-flight `Release()`
calls during a Dispose-during-init race, so host shutdown no longer produces a noisy
crash log.
- `KafkaConsumer.InitializeAsync` is now genuinely `async Task` instead of returning a
pre-completed `Task` so the probe can be awaited safely under any captured
`SynchronizationContext`. A `cancellationToken.ThrowIfCancellationRequested()` check
between the probe and the native `IConsumer` allocation prevents handle leaks when
cancellation arrives during a slow probe.

### Changed — `RayTree.Core`

- `ChangeSubscriber.InitializeAsync` now initializes all registered consumers in parallel
via `Task.WhenAll` rather than sequentially. A single consumer with a slow init (e.g.
Kafka `WaitForTopic` against a missing topic) no longer blocks unrelated consumers from
subscribing.

---

## [0.0.16-pre-release]

### Added
Expand Down
6 changes: 3 additions & 3 deletions CLAUDE.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>nullable</WarningsAsErrors>

<VersionPrefix>0.0.16</VersionPrefix>
<VersionPrefix>0.0.17</VersionPrefix>
<VersionSuffix>pre-release</VersionSuffix>

<Authors>bitc0der</Authors>
Expand Down
16 changes: 8 additions & 8 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.8" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.8" />
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.8" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="10.0.8" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.8" />
<!-- EF -->
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="10.0.8" />
Expand All @@ -22,22 +23,21 @@
<PackageVersion Include="Confluent.Kafka" Version="2.14.0" />
<!-- Serialization -->
<PackageVersion Include="protobuf-net" Version="3.2.56" />
<PackageVersion Include="MessagePack" Version="3.1.4" />
<PackageVersion Include="MessagePack" Version="3.1.6" />
<!-- Compression -->
<PackageVersion Include="K4os.Compression.LZ4" Version="1.3.8" />
<!-- Telemetry -->
<PackageVersion Include="OpenTelemetry" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Api" Version="1.15.3" />
<!-- Testing -->
<PackageVersion Include="NUnit" Version="4.6.0" />
<PackageVersion Include="NUnit" Version="4.6.1" />
<PackageVersion Include="NUnit.Analyzers" Version="4.13.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="6.2.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="18.5.1" />
<PackageVersion Include="Moq" Version="4.20.72" />
<PackageVersion Include="Testcontainers" Version="4.11.0" />
<PackageVersion Include="Testcontainers.PostgreSql" Version="4.11.0" />
<PackageVersion Include="Testcontainers.RabbitMq" Version="4.11.0" />
<PackageVersion Include="Testcontainers.Kafka" Version="4.11.0" />
<PackageVersion Include="Testcontainers.Redis" Version="4.11.0" />
<PackageVersion Include="Testcontainers.PostgreSql" Version="4.12.0" />
<PackageVersion Include="Testcontainers.RabbitMq" Version="4.12.0" />
<PackageVersion Include="Testcontainers.Kafka" Version="4.12.0" />
<PackageVersion Include="Testcontainers.Redis" Version="4.12.0" />
</ItemGroup>
</Project>
</Project>
56 changes: 56 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,62 @@ var consumer = new RabbitMqConsumer(new RabbitMqConsumerOptions
See the [Configuration Guide](configuration.md#rabbitmq-topology-wait) for the full option
reference and for consumer-factory / Generic Host patterns.

## Kafka Topic Wait

The Kafka analogue of `WaitForTopology` for the same microservice startup-ordering case.
Enable `WaitForTopic = true` on `KafkaPublisherOptions` or `KafkaConsumerOptions` and
`InitializeAsync` probes `IAdminClient.GetMetadata` until the topic is reported available.

| Option | Default | Description |
|---|---|---|
| `WaitForTopic` | `false` | Enable the wait loop. |
| `TopicWaitInterval` | `5 s` | Delay between probe attempts. |
| `TopicWaitTimeout` | `null` | Hard deadline; `null` means no ceiling. |

The probe retries on: empty `Topics` collection, per-topic `UnknownTopicOrPart`, per-topic
`LeaderNotAvailable` (cluster bootstrap / leader election), and transient transport-level
`KafkaException`s (`Local_Transport`, `Local_AllBrokersDown`, `Local_Resolve`, `Local_TimedOut`
— covers the broker-not-yet-reachable startup race). All other broker errors and fatal
librdkafka errors propagate immediately.

### Publisher example

```csharp
builder.ForEntity<Order>(e => e
.UsePublisher(new KafkaPublisher(new KafkaPublisherOptions
{
BootstrapServers = "kafka:9092",
Topic = "orders.events",
WaitForTopic = true,
TopicWaitInterval = TimeSpan.FromSeconds(2),
TopicWaitTimeout = TimeSpan.FromMinutes(5)
}, loggerFactory))); // pass loggerFactory so probe logs are observable
```

### Consumer example

```csharp
builder.ForEntity<Order>(e => e
.UseSerializer(new JsonSerializerPlugin())
.UseKafka(o =>
{
o.BootstrapServers = "kafka:9092";
o.Topic = "orders.events";
o.GroupId = "orders-service";
o.WaitForTopic = true;
o.TopicWaitInterval = TimeSpan.FromSeconds(2);
// TopicWaitTimeout = null → retry until CancellationToken is cancelled
}, loggerFactory)
.OnInsert(async (change, ct) => { /* ... */ }));
```

> **Auto-create caveat:** brokers with `auto.create.topics.enable=true` (the default on many
> images) create the topic in response to the probe itself, defeating the wait. Set the
> broker option to `false` in deployments that depend on this feature.

See the [Configuration Guide](configuration.md#kafka-topic-wait) for the full caveats list
(sync `Build()` + `null` timeout interaction, logger-plumbing tips).

## Examples

The `examples/` directory contains two complete runnable microservice demos showing the full outbox-to-broker pipeline end to end. Both are standalone solutions (not part of `RayTree.slnx`) and start with a single `docker compose up --build`:
Expand Down
68 changes: 68 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,74 @@ builder.ForEntity<Order>(e => e
The default (`WaitForTopology = false`) is unchanged — a missing exchange or queue surfaces the
underlying `OperationInterruptedException` immediately.

### Kafka topic wait

The Kafka analogue of `WaitForTopology` for deployments where the topic owner (often a
dedicated "schema-owner" pod) comes up after the publisher / consumer that depends on it.
When `WaitForTopic = true`, `InitializeAsync` probes the broker via
`IAdminClient.GetMetadata` and retries until the topic is reported available.

Three options on both `KafkaPublisherOptions` and `KafkaConsumerOptions`:

| Option | Default | Description |
|---|---|---|
| `WaitForTopic` | `false` | Enable the wait loop. |
| `TopicWaitInterval` | `5 s` | Delay between metadata probe attempts. |
| `TopicWaitTimeout` | `null` (unlimited) | Hard deadline; `null` means retry until cancellation. |

**Retryable responses** — the probe retries on any of:

- Empty `Metadata.Topics` collection (some broker versions return no entry rather than a placeholder).
- Per-topic `ErrorCode.UnknownTopicOrPart`.
- Per-topic `ErrorCode.LeaderNotAvailable` (transient during cluster bootstrap and partition-leader election).
- Transient transport-level `KafkaException`s: `Local_Transport` (connection refused / socket closed), `Local_AllBrokersDown`, `Local_Resolve` (DNS failure), `Local_TimedOut`. This covers the common startup race where the broker pod has not yet finished starting.

All other broker errors, fatal `KafkaException`s (`Error.IsFatal == true`), and `OperationCanceledException` propagate immediately.

**Probe placement**

- The publisher probes inside the lazy producer-init path so both `InitializeAsync` and `PublishAsync` benefit (the probe runs at most once per `KafkaPublisher` lifetime, then a `volatile bool` flag short-circuits subsequent calls).
- The consumer probes before allocating the native `IConsumer` handle and re-checks cancellation immediately after, so a Ctrl+C during a slow probe never leaks a librdkafka handle.

**Cancellation latency.** Each `GetMetadata` call is bounded by a small fixed timeout (~1 s),
decoupled from `TopicWaitInterval`. This keeps shutdown-thread-pinning small regardless of how
long you set the interval to.

```csharp
// Publisher — waits for an externally-owned topic
builder.ForEntity<Order>(e => e
.UsePublisher(new KafkaPublisher(new KafkaPublisherOptions
{
BootstrapServers = "kafka:9092",
Topic = "orders.events",
WaitForTopic = true,
TopicWaitInterval = TimeSpan.FromSeconds(2),
TopicWaitTimeout = TimeSpan.FromMinutes(5)
}, loggerFactory))); // pass loggerFactory so probe progress is observable

// Consumer — waits for the same topic
builder.ForEntity<Order>(e => e
.UseSerializer(new JsonSerializerPlugin())
.UseKafka(o =>
{
o.BootstrapServers = "kafka:9092";
o.Topic = "orders.events";
o.GroupId = "orders-service";
o.WaitForTopic = true;
o.TopicWaitInterval = TimeSpan.FromSeconds(2);
// TopicWaitTimeout = null → retry until CancellationToken is cancelled
}, loggerFactory)
.OnInsert(async (change, ct) => { /* ... */ }));
```

**Caveats**

- *Auto-create.* Brokers with `auto.create.topics.enable=true` (the default on many distributions, including the stock `confluentinc/cp-kafka` image) will *create* the topic in response to the metadata probe itself, defeating the wait. Set the broker option to `false` in deployments that depend on this feature.
- *Sync `Build()` + `TopicWaitTimeout = null`.* The synchronous `ChangeTrackingBuilder.Build()` overload (which `AddChangeTracking` uses) does not plumb a cancellation token through. With an unbounded wait, host startup will block indefinitely with no SIGTERM escape. Either set a non-null timeout, or use `BuildAsync(cancellationToken)` with the host's `ApplicationStopping` token.
- *Logger plumbing.* The publisher's `ILoggerFactory?` parameter and the subscriber-side `UseKafka(configure, loggerFactory)` overload both default to silent (`NullLoggerFactory.Instance`). Pass the host's logger factory explicitly when using `WaitForTopic` so the first-miss / recovery `Information` logs are visible — otherwise a stuck startup is silently invisible.

The default (`WaitForTopic = false`) is unchanged — a missing topic surfaces as `UnknownTopicOrPart` on the first `ProduceAsync` (publisher) or as silent no-message returns from `Consume` (consumer).

```csharp
// InMemory (testing)
.ForEntity<Order>(e => e
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-05-23
Loading
Loading