diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..7d4dd59 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,24 @@ +# Exclude build artefacts so they are not sent to the Docker daemon as part of the build context. +# Without this file, local bin/ and obj/ directories (which include large native Confluent.Kafka +# and Npgsql binaries) inflate the context to several GB and make every build slow. + +# .NET build output +**/bin/ +**/obj/ + +# IDE and editor directories +.vs/ +.idea/ +.vscode/ + +# Git metadata +.git/ + +# Local user overrides +*.user +*.suo +*.DotSettings.user + +# OS noise +.DS_Store +Thumbs.db diff --git a/docs/README.md b/docs/README.md index 2d2bcad..6912034 100644 --- a/docs/README.md +++ b/docs/README.md @@ -492,6 +492,23 @@ var consumer = new RabbitMqConsumer(new RabbitMqConsumerOptions See the [Configuration Guide](configuration.md#rabbitmq-topology-wait) for the full option reference and for consumer-factory / Generic Host patterns. +## Examples + +The `examples/` directory contains two complete runnable microservice demos showing the full outbox-to-broker pipeline end to end. Both are standalone solutions (not part of `RayTree.slnx`) and start with a single `docker compose up --build`: + +| Example | Broker | Key concepts demonstrated | +|---|---|---| +| [`examples/RabbitMQ.Microservices/`](../examples/RabbitMQ.Microservices/README.md) | RabbitMQ 4 (topic exchange) | Outbox → publish → consume, `WaitForTopology`, at-most-once vs at-least-once, management UI | +| [`examples/Kafka.Microservices/`](../examples/Kafka.Microservices/README.md) | Apache Kafka 3.9 (KRaft, no Zookeeper) | Partition-key ordering, `FromEarliest` consumer replay, consumer-group scaling, 3-partition routing | + +Both examples share the same structure: +- `OrderService` — `PostgreSqlRepository` + `PostgreSqlOutbox` + broker publisher + `OrderSimulator` background service +- `NotificationService` — broker consumer + `OnInsert`/`OnUpdate`/`OnDelete` handlers logging via `ILogger` +- Multi-stage Dockerfiles with csproj-first layer caching (NuGet packages cached across source-only edits) +- `postgres:18.3-alpine` with the postgres 18 volume layout (`/var/lib/postgresql`) + +--- + ## Cleanup `EntityChangeTracker` implements `IDisposable`. Disposing it stops all publisher services: diff --git a/examples/Kafka.Microservices/Directory.Build.props b/examples/Kafka.Microservices/Directory.Build.props new file mode 100644 index 0000000..63c8d8f --- /dev/null +++ b/examples/Kafka.Microservices/Directory.Build.props @@ -0,0 +1,22 @@ + + + + + + false + + + + + + + + false + false + + + diff --git a/examples/Kafka.Microservices/Directory.Packages.props b/examples/Kafka.Microservices/Directory.Packages.props new file mode 100644 index 0000000..67fd41a --- /dev/null +++ b/examples/Kafka.Microservices/Directory.Packages.props @@ -0,0 +1,13 @@ + + + + + + + + + diff --git a/examples/Kafka.Microservices/Kafka.Microservices.slnx b/examples/Kafka.Microservices/Kafka.Microservices.slnx new file mode 100644 index 0000000..1d5f75a --- /dev/null +++ b/examples/Kafka.Microservices/Kafka.Microservices.slnx @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/examples/Kafka.Microservices/NotificationService/Dockerfile b/examples/Kafka.Microservices/NotificationService/Dockerfile new file mode 100644 index 0000000..9e792d6 --- /dev/null +++ b/examples/Kafka.Microservices/NotificationService/Dockerfile @@ -0,0 +1,37 @@ +# Build context: repository root (../../ from this file). The compose file sets this explicitly +# so that ProjectReference paths from NotificationService.csproj into ../../../src/RayTree.* resolve. + +FROM mcr.microsoft.com/dotnet/sdk:10.0-alpine AS build +WORKDIR /src + +# ── Layer 1: central build infrastructure ──────────────────────────────────────────────────────── +# Copied first because it changes rarely. All subsequent layers inherit TargetFramework, Nullable, +# TreatWarningsAsErrors, and centrally-pinned NuGet versions from these files. +COPY Directory.Build.props Directory.Packages.props NuGet.config ./ + +# ── Layer 2: .csproj files only (restore cache) ────────────────────────────────────────────────── +# Copy project files for NotificationService and every ProjectReference in its dependency graph +# (direct: Shared + five RayTree plugins; transitive: all plugins → RayTree.Core). +# NotificationService has no PostgreSQL dependency — only the Kafka + serializer + compressor stack. +COPY src/RayTree.Core/RayTree.Core.csproj src/RayTree.Core/ +COPY src/RayTree.Hosting/RayTree.Hosting.csproj src/RayTree.Hosting/ +COPY src/RayTree.Plugins.Kafka/RayTree.Plugins.Kafka.csproj src/RayTree.Plugins.Kafka/ +COPY src/RayTree.Plugins.Serializers.MessagePack/RayTree.Plugins.Serializers.MessagePack.csproj src/RayTree.Plugins.Serializers.MessagePack/ +COPY src/RayTree.Plugins.Compressors.Gzip/RayTree.Plugins.Compressors.Gzip.csproj src/RayTree.Plugins.Compressors.Gzip/ +COPY examples/Kafka.Microservices/Directory.Build.props examples/Kafka.Microservices/ +COPY examples/Kafka.Microservices/Directory.Packages.props examples/Kafka.Microservices/ +COPY examples/Kafka.Microservices/Shared/Shared.csproj examples/Kafka.Microservices/Shared/ +COPY examples/Kafka.Microservices/NotificationService/NotificationService.csproj examples/Kafka.Microservices/NotificationService/ +RUN dotnet restore examples/Kafka.Microservices/NotificationService/NotificationService.csproj + +# ── Layer 3: source files → publish ────────────────────────────────────────────────────────────── +# Packages are already restored above; --no-restore skips the redundant network round-trip. +COPY src/ ./src/ +COPY examples/Kafka.Microservices/ ./examples/Kafka.Microservices/ +WORKDIR /src/examples/Kafka.Microservices/NotificationService +RUN dotnet publish -c Release -o /app /p:UseAppHost=false --no-restore + +FROM mcr.microsoft.com/dotnet/runtime:10.0-alpine AS runtime +WORKDIR /app +COPY --from=build /app ./ +ENTRYPOINT ["dotnet", "KafkaMicroservices.NotificationService.dll"] diff --git a/examples/Kafka.Microservices/NotificationService/NotificationService.csproj b/examples/Kafka.Microservices/NotificationService/NotificationService.csproj new file mode 100644 index 0000000..10fed15 --- /dev/null +++ b/examples/Kafka.Microservices/NotificationService/NotificationService.csproj @@ -0,0 +1,20 @@ + + + Exe + KafkaMicroservices.NotificationService + KafkaMicroservices.NotificationService + + + + + + + + + + + + + + + diff --git a/examples/Kafka.Microservices/NotificationService/Program.cs b/examples/Kafka.Microservices/NotificationService/Program.cs new file mode 100644 index 0000000..b137479 --- /dev/null +++ b/examples/Kafka.Microservices/NotificationService/Program.cs @@ -0,0 +1,82 @@ +using KafkaMicroservices.Shared; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using RayTree.Core.Models; +using RayTree.Core.Plugins; +using RayTree.Core.Plugins.Serialization; +using RayTree.Hosting; +using RayTree.Plugins.Compressors.Gzip; +using RayTree.Plugins.Kafka; +using RayTree.Plugins.Serializers.MessagePack; + +var kafkaBootstrap = Environment.GetEnvironmentVariable("KAFKA_BOOTSTRAP_SERVERS") ?? "localhost:9092"; + +var builder = Host.CreateApplicationBuilder(args); + +// KafkaConsumer requires an ILoggerFactory at construction. We reuse a dedicated console-logging +// factory for the consumer instance (the host's own pipeline is unaffected). +var pluginLoggerFactory = LoggerFactory.Create(b => b + .AddConsole() + .SetMinimumLevel(LogLevel.Information)); + +// Handlers close over this logger so structured log properties (OrderId, Status, etc.) flow through +// the Generic Host pipeline — level filtering, formatters, and scopes — rather than bypassing it +// via Console.WriteLine. +var handlerLogger = pluginLoggerFactory.CreateLogger("Notifications"); + +builder.Services.AddChangeTracking(builder.Configuration, cfg => +{ + cfg + // Payload pipeline MUST match OrderService exactly (MessagePack + Gzip). + // A mismatch here means deserialization throws on the first envelope and the consumer crashes. + .UseSerializer(_ => new MessagePackSerializerPlugin()) + .UseCompressor(_ => new GzipCompressorPlugin()) + .ForEntity(e => e + // FromEarliest = true (default) so a fresh consumer group replays from offset 0 — this is + // why notification-service is allowed to start before order-service. AckAfterHandler = false + // (default) means the offset is committed on the poll thread immediately after parsing + // (at-most-once). Switch to true plus SubscriberOptions.MaxDegreeOfParallelism = 1 for + // at-least-once delivery. + .UseConsumer(new KafkaConsumer(new KafkaConsumerOptions + { + BootstrapServers = kafkaBootstrap, + Topic = "raytree.order_changes", + GroupId = "notification-service", + }, pluginLoggerFactory)) + // Shared-handler dispatch: all three handlers run sequentially in registration order + // on every matching delivery. Each handler binds to exactly one ChangeType. + .OnInsert(LogInsertAsync) + .OnUpdate(LogUpdateAsync) + .OnDelete(LogDeleteAsync)); +}); + +// ChangeTrackingHostedService (registered by AddChangeTracking) drives StartAsync/StopAsync. +// IHostApplicationLifetime handles graceful shutdown for Ctrl+C and `docker compose down`. +await builder.Build().RunAsync(); + +// ---- handlers ------------------------------------------------------------------------------- +// Local functions close over handlerLogger — non-static so they capture the variable above. + +Task LogInsertAsync(EntityChange change, CancellationToken ct) +{ + var order = change.State; + handlerLogger.LogInformation( + "[NOTIFY] NEW order {OrderId} — customer={Customer} total={Total:C} status={Status}", + order?.Id, order?.CustomerName, order?.TotalAmount, order?.Status); + return Task.CompletedTask; +} + +Task LogUpdateAsync(EntityChange change, CancellationToken ct) +{ + var order = change.State; + handlerLogger.LogInformation( + "[NOTIFY] UPDATED order {OrderId} — status={Status} total={Total:C}", + order?.Id, order?.Status, order?.TotalAmount); + return Task.CompletedTask; +} + +Task LogDeleteAsync(EntityChange change, CancellationToken ct) +{ + handlerLogger.LogInformation("[NOTIFY] DELETED order {OrderId}", change.EntityId); + return Task.CompletedTask; +} diff --git a/examples/Kafka.Microservices/OrderService/Dockerfile b/examples/Kafka.Microservices/OrderService/Dockerfile new file mode 100644 index 0000000..ac27cf2 --- /dev/null +++ b/examples/Kafka.Microservices/OrderService/Dockerfile @@ -0,0 +1,39 @@ +# Build context: repository root (../../ from this file). The compose file sets this explicitly +# so that ProjectReference paths from OrderService.csproj into ../../../src/RayTree.* resolve. + +FROM mcr.microsoft.com/dotnet/sdk:10.0-alpine AS build +WORKDIR /src + +# ── Layer 1: central build infrastructure ──────────────────────────────────────────────────────── +# Copied first because it changes rarely. All subsequent layers inherit TargetFramework, Nullable, +# TreatWarningsAsErrors, and centrally-pinned NuGet versions from these files. +COPY Directory.Build.props Directory.Packages.props NuGet.config ./ + +# ── Layer 2: .csproj files only (restore cache) ────────────────────────────────────────────────── +# Copy project files for OrderService and every ProjectReference in its dependency graph +# (direct: Shared + six RayTree plugins; transitive: all plugins → RayTree.Core). +# Restore runs before source is copied so the NuGet package layer is reused on every build that +# does not change a .csproj or .props file — only source edits skip directly to the publish step. +COPY src/RayTree.Core/RayTree.Core.csproj src/RayTree.Core/ +COPY src/RayTree.Hosting/RayTree.Hosting.csproj src/RayTree.Hosting/ +COPY src/RayTree.Plugins.PostgreSQL/RayTree.Plugins.PostgreSQL.csproj src/RayTree.Plugins.PostgreSQL/ +COPY src/RayTree.Plugins.Kafka/RayTree.Plugins.Kafka.csproj src/RayTree.Plugins.Kafka/ +COPY src/RayTree.Plugins.Serializers.MessagePack/RayTree.Plugins.Serializers.MessagePack.csproj src/RayTree.Plugins.Serializers.MessagePack/ +COPY src/RayTree.Plugins.Compressors.Gzip/RayTree.Plugins.Compressors.Gzip.csproj src/RayTree.Plugins.Compressors.Gzip/ +COPY examples/Kafka.Microservices/Directory.Build.props examples/Kafka.Microservices/ +COPY examples/Kafka.Microservices/Directory.Packages.props examples/Kafka.Microservices/ +COPY examples/Kafka.Microservices/Shared/Shared.csproj examples/Kafka.Microservices/Shared/ +COPY examples/Kafka.Microservices/OrderService/OrderService.csproj examples/Kafka.Microservices/OrderService/ +RUN dotnet restore examples/Kafka.Microservices/OrderService/OrderService.csproj + +# ── Layer 3: source files → publish ────────────────────────────────────────────────────────────── +# Packages are already restored above; --no-restore skips the redundant network round-trip. +COPY src/ ./src/ +COPY examples/Kafka.Microservices/ ./examples/Kafka.Microservices/ +WORKDIR /src/examples/Kafka.Microservices/OrderService +RUN dotnet publish -c Release -o /app /p:UseAppHost=false --no-restore + +FROM mcr.microsoft.com/dotnet/runtime:10.0-alpine AS runtime +WORKDIR /app +COPY --from=build /app ./ +ENTRYPOINT ["dotnet", "KafkaMicroservices.OrderService.dll"] diff --git a/examples/Kafka.Microservices/OrderService/OrderService.csproj b/examples/Kafka.Microservices/OrderService/OrderService.csproj new file mode 100644 index 0000000..6428f17 --- /dev/null +++ b/examples/Kafka.Microservices/OrderService/OrderService.csproj @@ -0,0 +1,21 @@ + + + Exe + KafkaMicroservices.OrderService + KafkaMicroservices.OrderService + + + + + + + + + + + + + + + + diff --git a/examples/Kafka.Microservices/OrderService/OrderSimulator.cs b/examples/Kafka.Microservices/OrderService/OrderSimulator.cs new file mode 100644 index 0000000..13624e2 --- /dev/null +++ b/examples/Kafka.Microservices/OrderService/OrderSimulator.cs @@ -0,0 +1,118 @@ +using KafkaMicroservices.Shared; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using RayTree.Core.Plugins.Repository; +using RayTree.Core.Tracking; + +namespace KafkaMicroservices.OrderService; + +/// +/// Drives the example by periodically creating, updating, and deleting +/// rows. Each operation writes through the *and* calls +/// the matching TrackXxxAsync on the so the outbox +/// receives the change event. +/// +/// Known limitation: the repository write and the outbox write are +/// two separate transactions; a crash between them can leave the tables inconsistent. The +/// README explains this and points to RayTree.EntityFrameworkCore as the production +/// transactional path. +/// +internal sealed class OrderSimulator : BackgroundService +{ + private static readonly string[] s_CustomerNames = + ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Heidi"]; + private static readonly string[] s_Statuses = + ["Pending", "Confirmed", "Shipped", "Delivered"]; + + private readonly IRepository _repository; + private readonly EntityChangeTracker _tracker; + private readonly ILogger _logger; + private readonly Random _random = new(); + + public OrderSimulator( + IRepository repository, + EntityChangeTracker tracker, + ILogger logger) + { + _repository = repository; + _tracker = tracker; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // Wait briefly so the tracker's InitializeAsync (which runs schema migration) + // and the hosted service's StartAsync complete before we start emitting events. + await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken); + + var liveOrders = new List(); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await StepAsync(liveOrders, stoppingToken); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "OrderSimulator step failed"); + } + + await Task.Delay(TimeSpan.FromSeconds(1.5), stoppingToken); + } + } + + private async Task StepAsync(List liveOrders, CancellationToken ct) + { + // Bias toward inserts when the pool is small; mix in updates and deletes once we have orders. + var pickInsert = liveOrders.Count < 3 || _random.NextDouble() < 0.4; + var pickDelete = liveOrders.Count > 0 && _random.NextDouble() < 0.2; + + if (pickInsert) + { + var order = new Order + { + Id = Guid.NewGuid(), + CustomerName = s_CustomerNames[_random.Next(s_CustomerNames.Length)], + TotalAmount = Math.Round((decimal)(_random.NextDouble() * 500 + 10), 2), + Status = s_Statuses[0], + }; + + await _repository.InsertAsync(order, ct); + await _tracker.TrackInsertAsync(order, ct); + liveOrders.Add(order); + + _logger.LogInformation( + "Inserted order {OrderId} for {Customer} totalling {Total:C}", + order.Id, order.CustomerName, order.TotalAmount); + } + else if (pickDelete) + { + var idx = _random.Next(liveOrders.Count); + var order = liveOrders[idx]; + liveOrders.RemoveAt(idx); + + await _repository.DeleteAsync(order, ct); + await _tracker.TrackDeleteAsync(order, ct); + + _logger.LogInformation("Deleted order {OrderId}", order.Id); + } + else + { + var order = liveOrders[_random.Next(liveOrders.Count)]; + order.Status = s_Statuses[_random.Next(s_Statuses.Length)]; + order.TotalAmount = Math.Max(0.01m, Math.Round(order.TotalAmount + (decimal)((_random.NextDouble() - 0.5) * 20), 2)); + + await _repository.UpdateAsync(order, ct); + await _tracker.TrackUpdateAsync(order, ct); + + _logger.LogInformation( + "Updated order {OrderId} → status={Status} total={Total:C}", + order.Id, order.Status, order.TotalAmount); + } + } +} diff --git a/examples/Kafka.Microservices/OrderService/Program.cs b/examples/Kafka.Microservices/OrderService/Program.cs new file mode 100644 index 0000000..53b8389 --- /dev/null +++ b/examples/Kafka.Microservices/OrderService/Program.cs @@ -0,0 +1,80 @@ +using KafkaMicroservices.OrderService; +using KafkaMicroservices.Shared; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using RayTree.Core.Plugins; +using RayTree.Core.Plugins.Repository; +using RayTree.Core.Plugins.Serialization; +using RayTree.Hosting; +using RayTree.Plugins.Compressors.Gzip; +using RayTree.Plugins.Kafka; +using RayTree.Plugins.PostgreSQL.Outbox; +using RayTree.Plugins.PostgreSQL.Repository; +using RayTree.Plugins.Serializers.MessagePack; + +// Connection info comes from environment variables so the same binary works under +// docker-compose (service names 'postgres' / 'kafka') and against a developer's localhost. +var pgConnection = Environment.GetEnvironmentVariable("POSTGRES_CONNECTION") + ?? "Host=localhost;Port=5432;Database=raytree_example;Username=postgres;Password=postgres"; +var kafkaBootstrap = Environment.GetEnvironmentVariable("KAFKA_BOOTSTRAP_SERVERS") ?? "localhost:9092"; + +var builder = Host.CreateApplicationBuilder(args); + +// Plugin instances need an ILoggerFactory at construction; AddChangeTracking's configure callback +// runs inside a DI singleton factory which doesn't expose the IServiceProvider. We build a +// dedicated console-logging factory and reuse it for the outbox, repository, and consumer plugins. +var pluginLoggerFactory = LoggerFactory.Create(b => b + .AddConsole() + .SetMinimumLevel(LogLevel.Information)); + +// Single PostgreSqlRepository instance is shared between: +// 1) The EntityChangeTracker (via .UseRepository) — so its InitializeAsync runs on startup. +// 2) DI (as IRepository) — so OrderSimulator can call repository.InsertAsync/UpdateAsync/DeleteAsync. +var orderRepository = new PostgreSqlRepository( + new PostgreSqlRepositoryOptions + { + ConnectionString = pgConnection, + TableName = "orders", + }, + pluginLoggerFactory); + +builder.Services.AddSingleton>(orderRepository); + +// Register RayTree via the documented "primary registration path" from RayTree.Hosting. +// AddChangeTracking wires the EntityChangeTracker singleton, RayTreeMeter, and ChangeTrackingHostedService. +builder.Services.AddChangeTracking(builder.Configuration, cfg => +{ + cfg + // 500 ms polling keeps the demo snappy without hammering the DB. + // For production, prefer NOTIFY/LISTEN via PostgreSqlOutboxOptions.UseNotificationChannel. + .UsePublisherOptions(o => o.PollingInterval = TimeSpan.FromMilliseconds(500)) + // MessagePack + Gzip on the payload pipeline. NotificationService MUST register the same pair. + .UseSerializer(_ => new MessagePackSerializerPlugin()) + .UseCompressor(_ => new GzipCompressorPlugin()) + .ForEntity(e => e + .UseRepository(orderRepository) + .UseOutbox(new PostgreSqlOutbox( + new PostgreSqlOutboxOptions + { + ConnectionString = pgConnection, + OutboxTableName = "order_outbox", + }, + pluginLoggerFactory)) + // Default KeySelector is "{EntityType}:{EntityId}" — all changes for the same Order land on + // the same Kafka partition, preserving per-entity ordering. Override KeySelector to shard + // by tenant or aggregate root. + .UsePublisher(new KafkaPublisher(new KafkaPublisherOptions + { + BootstrapServers = kafkaBootstrap, + Topic = "raytree.order_changes", + }))); +}); + +// OrderSimulator drives the demo: inserts, updates, and deletes Orders in a loop so the +// notification side has a steady stream of events to react to. +builder.Services.AddHostedService(); + +// Graceful shutdown (Ctrl+C / SIGTERM / `docker compose down`) is driven by IHostApplicationLifetime +// — no manual cancellation wiring is needed. +await builder.Build().RunAsync(); diff --git a/examples/Kafka.Microservices/README.md b/examples/Kafka.Microservices/README.md new file mode 100644 index 0000000..d0e2eaf --- /dev/null +++ b/examples/Kafka.Microservices/README.md @@ -0,0 +1,265 @@ +# Kafka Microservices Example + +This example shows **RayTree's outbox-to-Kafka pipeline** using two cooperating microservices: + +- **OrderService** — inserts, updates, and deletes `Order` rows in PostgreSQL; writes every change to a PostgreSQL outbox; and publishes each outbox record to the Kafka topic `raytree.order_changes`. +- **NotificationService** — consumes from that topic and logs each change to the console. + +The example mirrors `examples/RabbitMQ.Microservices` but uses Kafka (KRaft, no Zookeeper) and PostgreSQL as the backing store. + +> **Note:** this example is intentionally **not** part of `RayTree.slnx`. To open it in an IDE, open `examples/Kafka.Microservices/Kafka.Microservices.slnx` directly. + +--- + +## Prerequisites + +| Tool | Version | +|---|---| +| Docker (with Compose v2) | 24+ | +| .NET SDK | 10.0+ (local dev only — not needed for `docker compose up`) | + +--- + +## Running with Docker Compose + +```bash +# from the repository root +cd examples/Kafka.Microservices +docker compose up --build +``` + +Both services start after Kafka passes its healthcheck (TCP probe on port 9092 with a 60-second start window that accommodates KRaft controller-election latency). `OrderService` additionally waits for PostgreSQL to be ready. + +**Expected console output** (interleaved from both services): + +``` +order-service | Inserted order a1b2… for Alice totalling $213.45 +order-service | Inserted order c3d4… for Bob totalling $87.00 +notification-service | [NOTIFY] NEW order a1b2… — customer=Alice total=$213.45 status=Pending +notification-service | [NOTIFY] NEW order c3d4… — customer=Bob total=$87.00 status=Pending +order-service | Updated order a1b2… → status=Confirmed total=$231.00 +notification-service | [NOTIFY] UPDATED order a1b2… — status=Confirmed total=$231.00 +order-service | Deleted order c3d4… +notification-service | [NOTIFY] DELETED order c3d4… +``` + +To stop cleanly: + +```bash +docker compose down +``` + +Both named volumes (`postgres-data`, `kafka-data`) are preserved on restart. To wipe them: + +```bash +docker compose down -v +``` + +--- + +## Connection Details (for Local Dev) + +| Service | Address | +|---|---| +| Kafka broker | `localhost:9092` | +| PostgreSQL | `localhost:5432` — database `raytree_example`, user `postgres`, password `postgres` | + +Override with environment variables when running outside Compose: + +```bash +# OrderService (requires a running PostgreSQL and Kafka) +KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \ +POSTGRES_CONNECTION="Host=localhost;Port=5432;Database=raytree_example;Username=postgres;Password=postgres" \ +dotnet run --project OrderService + +# NotificationService (requires a running Kafka; no PostgreSQL dependency) +KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \ +dotnet run --project NotificationService +``` + +--- + +## Project Structure + +``` +examples/Kafka.Microservices/ +├── Kafka.Microservices.slnx # Standalone solution — open this in your IDE +├── Directory.Build.props # Inherits from repo root; sets IsPackable=false +├── Directory.Packages.props # Inherits from repo root; adds Microsoft.Extensions.Hosting +├── docker-compose.yml # Postgres + Kafka (KRaft) + OrderService + NotificationService +│ +├── Shared/ +│ ├── Shared.csproj +│ └── Order.cs # Shared entity — [Table("orders")], [Key] Guid Id, ... +│ +├── OrderService/ +│ ├── OrderService.csproj +│ ├── Program.cs # AddChangeTracking: outbox → KafkaPublisher +│ ├── OrderSimulator.cs # BackgroundService: inserts/updates/deletes in a loop +│ └── Dockerfile # Multi-stage; build context = repo root +│ +└── NotificationService/ + ├── NotificationService.csproj + ├── Program.cs # AddChangeTracking: KafkaConsumer → OnInsert/Update/Delete + └── Dockerfile # Multi-stage; build context = repo root +``` + +--- + +## How It Works + +``` +OrderSimulator + → IRepository.InsertAsync / UpdateAsync / DeleteAsync (PostgreSQL: orders table) + → EntityChangeTracker.TrackInsertAsync / TrackUpdateAsync / TrackDeleteAsync + → PostgreSqlOutbox (PostgreSQL: order_outbox table) + ↑ polled every 500 ms by OutboxPublisherService + → MessagePack serialize → Gzip compress → MessageEnvelope + → KafkaPublisher → topic: raytree.order_changes (partition key: Order:) + → KafkaConsumer (NotificationService) + → Gzip decompress → MessagePack deserialize + → OnInsert / OnUpdate / OnDelete handlers → ILogger +``` + +### Why an outbox? + +Publishing directly to Kafka inside the same database transaction is not possible — Kafka is not a transactional resource that participates in two-phase commit. The outbox table acts as a durable staging area: `TrackXxxAsync` writes to it atomically with the entity change (or in a logically equivalent step), then a background polling loop reads the outbox and publishes to Kafka, marking records published once the broker confirms receipt. This guarantees that no change is silently lost even if the process crashes between the DB write and the Kafka send. + +### Partition key + +The default `KeySelector` produces `"{EntityType}:{EntityId}"` — all events for the same `Order` ID land on the same Kafka partition, preserving per-entity ordering. With `KAFKA_NUM_PARTITIONS=3` in this example, different order IDs will spread across partitions, so you can observe the routing in action. + +--- + +## Known Limitations + +The `OrderSimulator` calls the repository and the outbox as two **separate, non-atomic** operations: + +```csharp +await _repository.InsertAsync(order, ct); // 1. Write to PostgreSQL orders table +await _tracker.TrackInsertAsync(order, ct); // 2. Write to PostgreSQL order_outbox table +``` + +A process crash between steps 1 and 2 leaves the entity in the `orders` table without a corresponding outbox record — the change is never published to Kafka. + +**Production path:** use `RayTree.EntityFrameworkCore` and `EntityChangeInterceptor`, which hooks into `SaveChangesAsync` and calls `TrackXxxAsync` inside the same EF Core transaction, making both writes atomic. See the `EntityChangeInterceptor` class in `src/RayTree.EntityFrameworkCore` for the implementation. + +--- + +## Going Further + +### NOTIFY/LISTEN fast-path + +The default 500 ms poll interval is readable but adds latency. Enable the PostgreSQL `NOTIFY`/`LISTEN` fast path to publish within milliseconds of the outbox write: + +```csharp +.UseOutbox(new PostgreSqlOutbox(new PostgreSqlOutboxOptions +{ + ConnectionString = pgConnection, + OutboxTableName = "order_outbox", + UseNotificationChannel = true, // trigger fires pg_notify on INSERT into outbox + NotificationChannel = "order_outbox_notify", +}, pluginLoggerFactory)) +``` + +The poll loop becomes a safety net (catches anything the NOTIFY path misses); the notification channel drives normal delivery. + +### At-least-once delivery + +The example uses at-most-once delivery (the default): Kafka offsets are committed immediately after the message is parsed, before the handler runs. If the process crashes mid-handler the message is not redelivered. + +To switch to at-least-once: + +```csharp +.UseConsumer(new KafkaConsumer(new KafkaConsumerOptions +{ + BootstrapServers = kafkaBootstrap, + Topic = "raytree.order_changes", + GroupId = "notification-service", + AckAfterHandler = true, // commit after handler confirms success +}, pluginLoggerFactory)) +``` + +> **Important:** also set `SubscriberOptions.MaxDegreeOfParallelism = 1` (the default). Kafka offset commits are monotonic — committing a higher offset first would advance past any in-flight lower-offset messages, silently skipping them on restart and defeating the guarantee. Sequential processing avoids this entirely. + +### Custom partition key (sharding by tenant / aggregate root) + +```csharp +.UsePublisher(new KafkaPublisher(new KafkaPublisherOptions +{ + BootstrapServers = kafkaBootstrap, + Topic = "raytree.order_changes", + KeySelector = envelope => envelope.Metadata.TryGetValue("TenantId", out var tid) + ? tid + : $"{envelope.EntityType}:{envelope.EntityId}", +})) +``` + +### Isolated-handler dispatch (one consumer group per handler) + +The example uses **shared-handler** mode: a single `KafkaConsumer` delivers each message to all handlers in sequence. For independent downstream systems each requiring their own offset tracking, use **isolated-handler** mode with `UseConsumerFactory`: + +```csharp +.UseConsumerFactory(handlerName => new KafkaConsumer(new KafkaConsumerOptions +{ + BootstrapServers = kafkaBootstrap, + Topic = "raytree.order_changes", + GroupId = $"notification-service-{handlerName}", +}, pluginLoggerFactory)) +.OnInsert("email-handler", SendEmailAsync) +.OnUpdate("audit-handler", WriteAuditLogAsync) +``` + +Each named handler gets its own subscription and Kafka consumer group — offsets advance independently. + +--- + +## Consumer-Group Scaling + +Because all `NotificationService` replicas share the same `GroupId = "notification-service"`, Kafka automatically rebalances the three partitions across them. + +Start two replicas: + +```bash +docker compose up --scale notification-service=2 +``` + +Watch the logs: each replica logs which partitions it was assigned after the rebalance. With 3 partitions and 2 replicas, one replica handles 2 partitions and the other handles 1. Adding a third replica gives each replica exactly one partition. + +A fourth replica receives no partitions (Kafka never assigns more partitions than it has to a single consumer group). Scale beyond the partition count only when you need hot standbys for failover. + +--- + +## Partition-Key Behaviour + +Each `Order` is assigned a Kafka partition based on its ID via the default key `"Order:"`. To inspect which messages landed on which partition, use the Kafka console consumer directly against the running container: + +```bash +# List messages on partition 0 +docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh \ + --topic raytree.order_changes \ + --partition 0 \ + --bootstrap-server localhost:9092 \ + --from-beginning \ + --max-messages 20 +``` + +Repeat with `--partition 1` and `--partition 2`. You should observe: +- All events for a given `Order.Id` appear on exactly one partition. +- Different `Order.Id`s spread across the three partitions according to the Murmur2 hash of the key. + +To remove the partition-key guarantee (round-robin distribution), set `KeySelector = _ => string.Empty` in `KafkaPublisherOptions`. Per-entity ordering is then no longer preserved. + +--- + +## `FromEarliest = true` — How It Works + +`KafkaConsumerOptions.FromEarliest = true` (the default) means that when a **new** consumer group first connects to a topic, it begins reading from offset 0 — the very beginning of the partition log. Once it commits an offset, subsequent restarts of the same group resume from the last committed offset, not from the beginning. + +This is why `notification-service` has no `depends_on: order-service` in `docker-compose.yml`. The sequence is safe: + +1. `notification-service` starts, subscribes to `raytree.order_changes`, finds no messages yet, waits. +2. `order-service` starts, publishes messages starting from offset 0. +3. `notification-service` reads from offset 0 and processes every message — nothing is missed. + +If you restart `notification-service` after it has processed some messages, it resumes from its last committed offset rather than replaying from the beginning. diff --git a/examples/Kafka.Microservices/Shared/Order.cs b/examples/Kafka.Microservices/Shared/Order.cs new file mode 100644 index 0000000..de57295 --- /dev/null +++ b/examples/Kafka.Microservices/Shared/Order.cs @@ -0,0 +1,17 @@ +using System.ComponentModel.DataAnnotations; +using System.ComponentModel.DataAnnotations.Schema; + +namespace KafkaMicroservices.Shared; + +[Table("orders")] +public class Order +{ + [Key] + public Guid Id { get; set; } + + public string CustomerName { get; set; } = string.Empty; + + public decimal TotalAmount { get; set; } + + public string Status { get; set; } = string.Empty; +} diff --git a/examples/Kafka.Microservices/Shared/Shared.csproj b/examples/Kafka.Microservices/Shared/Shared.csproj new file mode 100644 index 0000000..4b60129 --- /dev/null +++ b/examples/Kafka.Microservices/Shared/Shared.csproj @@ -0,0 +1,6 @@ + + + KafkaMicroservices.Shared + KafkaMicroservices.Shared + + diff --git a/examples/Kafka.Microservices/docker-compose.yml b/examples/Kafka.Microservices/docker-compose.yml new file mode 100644 index 0000000..0536990 --- /dev/null +++ b/examples/Kafka.Microservices/docker-compose.yml @@ -0,0 +1,93 @@ +services: + postgres: + image: postgres:18.3-alpine + environment: + POSTGRES_DB: raytree_example + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - "5432:5432" + volumes: + # postgres 18+ stores data under a major-version subdirectory inside /var/lib/postgresql + # (e.g. /var/lib/postgresql/18/main). Mount the parent so the named volume captures + # the version-specific path and survives upgrades via pg_upgrade. + - postgres-data:/var/lib/postgresql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d raytree_example"] + interval: 5s + timeout: 5s + retries: 10 + + kafka: + # Pinned (not :latest) so the example stays reproducible as upstream Kafka releases land. + image: apache/kafka:3.9.0 + ports: + - "9092:9092" + environment: + # --- KRaft single-node configuration (no Zookeeper) --- + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093" + # Bind address omitted (just :port) so the listener accepts on all interfaces without + # placing "0.0.0.0" into the configuration — apache/kafka:3.9.0 rejects that address + # in advertised.listeners validation during storage formatting. + KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092" + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + # Explicit log directory matches the apache/kafka official Docker example; without it + # the wrapper picks a tmpfs path that differs between image versions. + KAFKA_LOG_DIRS: /var/lib/kafka/data + # --- Topic / cluster defaults --- + # 3 partitions on auto-created topics so the EntityType:EntityId partition key has somewhere + # to spread to. With 1 partition every message lands on partition 0 and the routing story is + # vacuous. + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_NUM_PARTITIONS: 3 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + volumes: + - kafka-data:/var/lib/kafka/data + healthcheck: + # TCP-level probe: lightest reliable signal that the broker is accepting client connections. + # `kafka-topics.sh` would wait for cluster metadata to converge — too slow for cold KRaft start. + test: ["CMD-SHELL", "nc -z localhost 9092 || exit 1"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 60s + + order-service: + build: + # Build context is the repo root so ProjectReference paths to ../../../src/RayTree.* resolve. + context: ../.. + dockerfile: examples/Kafka.Microservices/OrderService/Dockerfile + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + POSTGRES_CONNECTION: "Host=postgres;Port=5432;Database=raytree_example;Username=postgres;Password=postgres" + DOTNET_ENVIRONMENT: Development + depends_on: + postgres: + condition: service_healthy + kafka: + condition: service_healthy + + notification-service: + build: + context: ../.. + dockerfile: examples/Kafka.Microservices/NotificationService/Dockerfile + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + DOTNET_ENVIRONMENT: Development + # NO dependency on order-service — Kafka auto-creates the topic on Subscribe and FromEarliest=true + # makes the consumer replay from offset 0 once order-service later publishes. + depends_on: + kafka: + condition: service_healthy + +volumes: + postgres-data: + kafka-data: diff --git a/examples/RabbitMQ.Microservices/NotificationService/Dockerfile b/examples/RabbitMQ.Microservices/NotificationService/Dockerfile index 236596a..da5d895 100644 --- a/examples/RabbitMQ.Microservices/NotificationService/Dockerfile +++ b/examples/RabbitMQ.Microservices/NotificationService/Dockerfile @@ -4,13 +4,32 @@ FROM mcr.microsoft.com/dotnet/sdk:10.0-alpine AS build WORKDIR /src +# ── Layer 1: central build infrastructure ──────────────────────────────────────────────────────── +# Copied first because it changes rarely. All subsequent layers inherit TargetFramework, Nullable, +# TreatWarningsAsErrors, and centrally-pinned NuGet versions from these files. COPY Directory.Build.props Directory.Packages.props NuGet.config ./ + +# ── Layer 2: .csproj files only (restore cache) ────────────────────────────────────────────────── +# Copy project files for NotificationService and every ProjectReference in its dependency graph +# (direct: Shared + five RayTree plugins; transitive: all plugins → RayTree.Core). +# NotificationService has no PostgreSQL dependency — only the RabbitMQ + serializer + compressor stack. +COPY src/RayTree.Core/RayTree.Core.csproj src/RayTree.Core/ +COPY src/RayTree.Hosting/RayTree.Hosting.csproj src/RayTree.Hosting/ +COPY src/RayTree.Plugins.RabbitMQ/RayTree.Plugins.RabbitMQ.csproj src/RayTree.Plugins.RabbitMQ/ +COPY src/RayTree.Plugins.Serializers.MessagePack/RayTree.Plugins.Serializers.MessagePack.csproj src/RayTree.Plugins.Serializers.MessagePack/ +COPY src/RayTree.Plugins.Compressors.Gzip/RayTree.Plugins.Compressors.Gzip.csproj src/RayTree.Plugins.Compressors.Gzip/ +COPY examples/RabbitMQ.Microservices/Directory.Build.props examples/RabbitMQ.Microservices/ +COPY examples/RabbitMQ.Microservices/Directory.Packages.props examples/RabbitMQ.Microservices/ +COPY examples/RabbitMQ.Microservices/Shared/Shared.csproj examples/RabbitMQ.Microservices/Shared/ +COPY examples/RabbitMQ.Microservices/NotificationService/NotificationService.csproj examples/RabbitMQ.Microservices/NotificationService/ +RUN dotnet restore examples/RabbitMQ.Microservices/NotificationService/NotificationService.csproj + +# ── Layer 3: source files → publish ────────────────────────────────────────────────────────────── +# Packages are already restored above; --no-restore skips the redundant network round-trip. COPY src/ ./src/ COPY examples/RabbitMQ.Microservices/ ./examples/RabbitMQ.Microservices/ - WORKDIR /src/examples/RabbitMQ.Microservices/NotificationService -RUN dotnet restore -RUN dotnet publish -c Release -o /app /p:UseAppHost=false +RUN dotnet publish -c Release -o /app /p:UseAppHost=false --no-restore FROM mcr.microsoft.com/dotnet/runtime:10.0-alpine AS runtime WORKDIR /app diff --git a/examples/RabbitMQ.Microservices/NotificationService/Program.cs b/examples/RabbitMQ.Microservices/NotificationService/Program.cs index ce05e3b..efd4036 100644 --- a/examples/RabbitMQ.Microservices/NotificationService/Program.cs +++ b/examples/RabbitMQ.Microservices/NotificationService/Program.cs @@ -1,7 +1,7 @@ -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using RabbitMqMicroservices.Shared; +using RayTree.Core.Models; using RayTree.Core.Plugins; using RayTree.Core.Plugins.Serialization; using RayTree.Hosting; @@ -13,6 +13,13 @@ var builder = Host.CreateApplicationBuilder(args); +// Handlers close over this logger so structured log properties (OrderId, Status, etc.) flow through +// the Generic Host logging pipeline rather than bypassing it via Console.WriteLine. +var loggerFactory = LoggerFactory.Create(b => b + .AddConsole() + .SetMinimumLevel(LogLevel.Information)); +var handlerLogger = loggerFactory.CreateLogger("Notifications"); + builder.Services.AddChangeTracking(builder.Configuration, cfg => { cfg @@ -33,6 +40,11 @@ // OrderService's RoutingKeySelector this yields keys like `change.Order.insert`. ExchangeName = "raytree.changes", BindingKey = "change.Order.*", + // Probe the exchange passively until order-service declares it. This replaces + // the compose-level depends_on: order-service with an application-level readiness + // check, correctly decoupling startup order without tight container coupling. + WaitForTopology = true, + TopologyWaitInterval = TimeSpan.FromSeconds(5), })) // Shared-handler dispatch: all three handlers run sequentially in registration order // on every matching delivery. Each handler binds to exactly one ChangeType. @@ -46,25 +58,28 @@ await builder.Build().RunAsync(); // ---- handlers ------------------------------------------------------------------------------- +// Local functions close over handlerLogger — non-static so they capture the variable above. -static Task LogInsertAsync(RayTree.Core.Models.EntityChange change, CancellationToken ct) +Task LogInsertAsync(EntityChange change, CancellationToken ct) { var order = change.State; - Console.WriteLine( - $"[NOTIFY] NEW order {order?.Id} — customer={order?.CustomerName} total={order?.TotalAmount:C} status={order?.Status}"); + handlerLogger.LogInformation( + "[NOTIFY] NEW order {OrderId} — customer={Customer} total={Total:C} status={Status}", + order?.Id, order?.CustomerName, order?.TotalAmount, order?.Status); return Task.CompletedTask; } -static Task LogUpdateAsync(RayTree.Core.Models.EntityChange change, CancellationToken ct) +Task LogUpdateAsync(EntityChange change, CancellationToken ct) { var order = change.State; - Console.WriteLine( - $"[NOTIFY] UPDATED order {order?.Id} — status={order?.Status} total={order?.TotalAmount:C}"); + handlerLogger.LogInformation( + "[NOTIFY] UPDATED order {OrderId} — status={Status} total={Total:C}", + order?.Id, order?.Status, order?.TotalAmount); return Task.CompletedTask; } -static Task LogDeleteAsync(RayTree.Core.Models.EntityChange change, CancellationToken ct) +Task LogDeleteAsync(EntityChange change, CancellationToken ct) { - Console.WriteLine($"[NOTIFY] DELETED order {change.EntityId}"); + handlerLogger.LogInformation("[NOTIFY] DELETED order {OrderId}", change.EntityId); return Task.CompletedTask; } diff --git a/examples/RabbitMQ.Microservices/OrderService/Dockerfile b/examples/RabbitMQ.Microservices/OrderService/Dockerfile index 7205705..e971e8d 100644 --- a/examples/RabbitMQ.Microservices/OrderService/Dockerfile +++ b/examples/RabbitMQ.Microservices/OrderService/Dockerfile @@ -4,15 +4,34 @@ FROM mcr.microsoft.com/dotnet/sdk:10.0-alpine AS build WORKDIR /src -# Copy the bits the example needs to restore + build. Copying selectively keeps the image small -# and avoids invalidating the cache when unrelated parts of the repo change. +# ── Layer 1: central build infrastructure ──────────────────────────────────────────────────────── +# Copied first because it changes rarely. All subsequent layers inherit TargetFramework, Nullable, +# TreatWarningsAsErrors, and centrally-pinned NuGet versions from these files. COPY Directory.Build.props Directory.Packages.props NuGet.config ./ + +# ── Layer 2: .csproj files only (restore cache) ────────────────────────────────────────────────── +# Copy project files for OrderService and every ProjectReference in its dependency graph +# (direct: Shared + six RayTree plugins; transitive: all plugins → RayTree.Core). +# Restore runs before source is copied so the NuGet package layer is reused on every build that +# does not change a .csproj or .props file — only source edits skip directly to the publish step. +COPY src/RayTree.Core/RayTree.Core.csproj src/RayTree.Core/ +COPY src/RayTree.Hosting/RayTree.Hosting.csproj src/RayTree.Hosting/ +COPY src/RayTree.Plugins.PostgreSQL/RayTree.Plugins.PostgreSQL.csproj src/RayTree.Plugins.PostgreSQL/ +COPY src/RayTree.Plugins.RabbitMQ/RayTree.Plugins.RabbitMQ.csproj src/RayTree.Plugins.RabbitMQ/ +COPY src/RayTree.Plugins.Serializers.MessagePack/RayTree.Plugins.Serializers.MessagePack.csproj src/RayTree.Plugins.Serializers.MessagePack/ +COPY src/RayTree.Plugins.Compressors.Gzip/RayTree.Plugins.Compressors.Gzip.csproj src/RayTree.Plugins.Compressors.Gzip/ +COPY examples/RabbitMQ.Microservices/Directory.Build.props examples/RabbitMQ.Microservices/ +COPY examples/RabbitMQ.Microservices/Directory.Packages.props examples/RabbitMQ.Microservices/ +COPY examples/RabbitMQ.Microservices/Shared/Shared.csproj examples/RabbitMQ.Microservices/Shared/ +COPY examples/RabbitMQ.Microservices/OrderService/OrderService.csproj examples/RabbitMQ.Microservices/OrderService/ +RUN dotnet restore examples/RabbitMQ.Microservices/OrderService/OrderService.csproj + +# ── Layer 3: source files → publish ────────────────────────────────────────────────────────────── +# Packages are already restored above; --no-restore skips the redundant network round-trip. COPY src/ ./src/ COPY examples/RabbitMQ.Microservices/ ./examples/RabbitMQ.Microservices/ - WORKDIR /src/examples/RabbitMQ.Microservices/OrderService -RUN dotnet restore -RUN dotnet publish -c Release -o /app /p:UseAppHost=false +RUN dotnet publish -c Release -o /app /p:UseAppHost=false --no-restore FROM mcr.microsoft.com/dotnet/runtime:10.0-alpine AS runtime WORKDIR /app diff --git a/examples/RabbitMQ.Microservices/OrderService/OrderSimulator.cs b/examples/RabbitMQ.Microservices/OrderService/OrderSimulator.cs index 06b409c..3430ab2 100644 --- a/examples/RabbitMQ.Microservices/OrderService/OrderSimulator.cs +++ b/examples/RabbitMQ.Microservices/OrderService/OrderSimulator.cs @@ -105,7 +105,7 @@ private async Task StepAsync(List liveOrders, CancellationToken ct) { var order = liveOrders[_random.Next(liveOrders.Count)]; order.Status = s_Statuses[_random.Next(s_Statuses.Length)]; - order.TotalAmount = Math.Round(order.TotalAmount + (decimal)((_random.NextDouble() - 0.5) * 20), 2); + order.TotalAmount = Math.Max(0.01m, Math.Round(order.TotalAmount + (decimal)((_random.NextDouble() - 0.5) * 20), 2)); await _repository.UpdateAsync(order, ct); await _tracker.TrackUpdateAsync(order, ct); diff --git a/examples/RabbitMQ.Microservices/README.md b/examples/RabbitMQ.Microservices/README.md index 4c33219..facf9a5 100644 --- a/examples/RabbitMQ.Microservices/README.md +++ b/examples/RabbitMQ.Microservices/README.md @@ -15,16 +15,22 @@ PostgreSqlRepository → queue: notification-service.orders └──────────────────────────────────────────┘ ``` -`OrderService` continuously inserts/updates/deletes `Order` rows. Each change is written to a PostgreSQL outbox table and picked up by the publisher loop, serialized with MessagePack, compressed with Gzip, and published to RabbitMQ. `NotificationService` subscribes to the topic exchange, decompresses + deserializes, and dispatches to per-`ChangeType` handlers that simply log to stdout. +`OrderService` continuously inserts/updates/deletes `Order` rows. Each change is written to a PostgreSQL outbox table and picked up by the publisher loop, serialized with MessagePack, compressed with Gzip, and published to RabbitMQ. `NotificationService` subscribes to the topic exchange, decompresses + deserializes, and dispatches to per-`ChangeType` handlers that log to the console via `ILogger`. -This example is intentionally **not** part of the main `RayTree.slnx` solution. Open `examples/RabbitMQ.Microservices/RabbitMQ.Microservices.slnx` directly. +> **Note:** this example is intentionally **not** part of `RayTree.slnx`. To open it in an IDE, open `examples/RabbitMQ.Microservices/RabbitMQ.Microservices.slnx` directly. + +--- ## Prerequisites -- Docker / Docker Compose -- .NET 10 SDK (only required if you want to build / debug outside Docker) +| Tool | Version | +|---|---| +| Docker (with Compose v2) | 24+ | +| .NET SDK | 10.0+ (local dev only — not needed for `docker compose up`) | + +--- -## Run +## Running with Docker Compose From `examples/RabbitMQ.Microservices/`: @@ -32,77 +38,204 @@ From `examples/RabbitMQ.Microservices/`: docker compose up --build ``` -Expected console output (interleaved between the two services): +Both services start after RabbitMQ passes its healthcheck (`rabbitmq-diagnostics ping`). `OrderService` additionally waits for PostgreSQL. `NotificationService` does not depend on `OrderService` — it probes the exchange via `WaitForTopology = true` and begins consuming as soon as `OrderService` declares it. + +**Expected console output** (interleaved, simplified — log lines include level and category): + +``` +order-service-1 | info: Inserted order 5a1c... for Alice totalling $342.18 +order-service-1 | info: Inserted order 7b2d... for Bob totalling $87.00 +notification-service-1 | info: [NOTIFY] NEW order 5a1c... — customer=Alice total=$342.18 status=Pending +notification-service-1 | info: [NOTIFY] NEW order 7b2d... — customer=Bob total=$87.00 status=Pending +order-service-1 | info: Updated order 5a1c... → status=Shipped total=$348.92 +notification-service-1 | info: [NOTIFY] UPDATED order 5a1c... — status=Shipped total=$348.92 +order-service-1 | info: Deleted order 7b2d... +notification-service-1 | info: [NOTIFY] DELETED order 7b2d... +``` + +To stop cleanly: +```bash +docker compose down ``` -order-service-1 | Inserted order 5a1c... for Alice totalling $342.18 -notification-service-1 | [NOTIFY] NEW order 5a1c... — customer=Alice total=$342.18 status=Pending -order-service-1 | Updated order 5a1c... → status=Shipped total=$348.92 -notification-service-1 | [NOTIFY] UPDATED order 5a1c... — status=Shipped total=$348.92 -order-service-1 | Deleted order 5a1c... -notification-service-1 | [NOTIFY] DELETED order 5a1c... + +Both named volumes (`postgres-data`, `rabbitmq-data`) are preserved on restart. To wipe them: + +```bash +docker compose down -v ``` -Stop with `Ctrl+C` then `docker compose down` to remove containers. +--- -## What to look at +## Connection Details (for Local Dev) -| Surface | URL / location | +| Service | Address | |---|---| -| RabbitMQ management UI | `http://localhost:15672` (login `guest` / `guest`) | -| PostgreSQL | `localhost:5432`, db `raytree_example`, user/pass `postgres`/`postgres` | -| Outbox table | `order_outbox` — see published vs unpublished rows | -| Entity table | `orders` — keys-only journal (the outbox payload carries the full state) | +| RabbitMQ broker | `localhost:5672` | +| RabbitMQ management UI | `http://localhost:15672` — login `guest` / `guest` | +| PostgreSQL | `localhost:5432` — database `raytree_example`, user `postgres`, password `postgres` | -In RabbitMQ's management UI you can: +Override with environment variables when running outside Compose: + +```bash +# OrderService (requires a running PostgreSQL and RabbitMQ) +RABBITMQ_HOST=localhost \ +POSTGRES_CONNECTION="Host=localhost;Port=5432;Database=raytree_example;Username=postgres;Password=postgres" \ +dotnet run --project OrderService + +# NotificationService (requires a running RabbitMQ; no PostgreSQL dependency) +RABBITMQ_HOST=localhost \ +dotnet run --project NotificationService +``` + +--- + +## What to Look at + +In the RabbitMQ management UI (`http://localhost:15672`): - Inspect the `raytree.changes` topic exchange and the `notification-service.orders` queue. -- Watch live message rates. -- Browse a single payload to see the compressed MessagePack bytes. +- Watch live message rates on the queue's **Message rates** chart. +- Browse a single payload under **Get messages** to see the compressed MessagePack bytes (the body is binary, not readable text). -## Project structure +In PostgreSQL (`localhost:5432`, database `raytree_example`): +- `orders` — the entity table; one row per live order. +- `order_outbox` — the staging table; `published = true` rows have been sent to RabbitMQ and will be cleaned up by the outbox rotation loop. + +--- + +## Project Structure ``` examples/RabbitMQ.Microservices/ -├── Directory.Build.props # Imports root props, disables packaging for console apps -├── Directory.Packages.props # Imports root props, adds Microsoft.Extensions.Hosting -├── RabbitMQ.Microservices.slnx # Standalone solution — NOT in RayTree.slnx -├── docker-compose.yml -├── README.md +├── RabbitMQ.Microservices.slnx # Standalone solution — open this in your IDE +├── Directory.Build.props # Inherits from repo root; sets IsPackable=false +├── Directory.Packages.props # Inherits from repo root; adds Microsoft.Extensions.Hosting +├── docker-compose.yml # Postgres + RabbitMQ + OrderService + NotificationService +│ ├── Shared/ │ ├── Shared.csproj -│ └── Order.cs # POCO with [Table("orders")] + [Key] Id (no MessagePack attrs) +│ └── Order.cs # Shared entity — [Table("orders")], [Key] Guid Id, ... +│ ├── OrderService/ │ ├── OrderService.csproj -│ ├── Dockerfile -│ ├── Program.cs # Host.CreateApplicationBuilder + AddChangeTracking -│ └── OrderSimulator.cs # BackgroundService that drives the demo +│ ├── Program.cs # AddChangeTracking: outbox → RabbitMqPublisher +│ ├── OrderSimulator.cs # BackgroundService: inserts/updates/deletes in a loop +│ └── Dockerfile # Multi-stage; build context = repo root +│ └── NotificationService/ ├── NotificationService.csproj - ├── Dockerfile - └── Program.cs # Shared-handler consumer with OnInsert/OnUpdate/OnDelete + ├── Program.cs # AddChangeTracking: RabbitMqConsumer → OnInsert/Update/Delete + └── Dockerfile # Multi-stage; build context = repo root +``` + +--- + +## How It Works + +``` +OrderSimulator + → IRepository.InsertAsync / UpdateAsync / DeleteAsync (PostgreSQL: orders table) + → EntityChangeTracker.TrackInsertAsync / TrackUpdateAsync / TrackDeleteAsync + → PostgreSqlOutbox (PostgreSQL: order_outbox table) + ↑ polled every 500 ms by OutboxPublisherService + → MessagePack serialize → Gzip compress → MessageEnvelope + → RabbitMqPublisher → exchange: raytree.changes (routing key: change.Order.) + → RabbitMqConsumer (NotificationService) + → Gzip decompress → MessagePack deserialize + → OnInsert / OnUpdate / OnDelete handlers → ILogger ``` -## Configuration +### Startup sequencing — `WaitForTopology` -Both services read connection info from environment variables: +`NotificationService` does **not** have a `depends_on: order-service` in `docker-compose.yml`. Instead, `RabbitMqConsumerOptions.WaitForTopology = true` makes the consumer probe the exchange (`raytree.changes`) via AMQP passive declares and retry every 5 seconds until `OrderService` declares it. This correctly decouples startup order at the application level rather than relying on Compose container ordering, and it mirrors how the services would behave in a Kubernetes or ECS deployment where container scheduling is non-deterministic. -| Variable | Default (localhost) | Compose value | -|---|---|---| -| `POSTGRES_CONNECTION` | `Host=localhost;Port=5432;Database=raytree_example;Username=postgres;Password=postgres` | `Host=postgres;…` | -| `RABBITMQ_HOST` | `localhost` | `rabbitmq` | +--- -## Known limitations +## Known Limitations **The example is not transactionally safe between the entity table and the outbox.** -`OrderSimulator` calls `repository.InsertAsync(order)` and then `tracker.TrackInsertAsync(order)` — two separate database round-trips. A crash between them can leave `orders` and `order_outbox` inconsistent. The outbox pattern usually relies on a single transaction wrapping both writes; that's exactly what `RayTree.EntityFrameworkCore` (specifically the `EntityChangeInterceptor`) does inside `SaveChangesAsync`. Production code targeting transactional safety should use that integration, **not** this example's pattern. +`OrderSimulator` calls `repository.InsertAsync(order)` and then `tracker.TrackInsertAsync(order)` — two separate database round-trips. A crash between them can leave `orders` and `order_outbox` inconsistent. + +**Production path:** use `RayTree.EntityFrameworkCore` and `EntityChangeInterceptor`, which hooks into `SaveChangesAsync` and calls `TrackXxxAsync` inside the same EF Core transaction, making both writes atomic. See `src/RayTree.EntityFrameworkCore` for the implementation. + +--- + +## Going Further + +### NOTIFY/LISTEN fast-path + +The default 500 ms poll interval adds latency. Enable the PostgreSQL `NOTIFY`/`LISTEN` fast path to publish within milliseconds of the outbox write: + +```csharp +.UseOutbox(new PostgreSqlOutbox(new PostgreSqlOutboxOptions +{ + ConnectionString = pgConnection, + OutboxTableName = "order_outbox", + UseNotificationChannel = true, + NotificationChannel = "order_outbox_notify", +}, pluginLoggerFactory)) +``` + +The poll loop becomes a safety net; the notification channel drives normal delivery. + +### At-least-once delivery + +The example uses at-most-once delivery (the default): the broker ACK is sent immediately when the message is received, before any handler runs. To switch to at-least-once: + +```csharp +.UseConsumer(new RabbitMqConsumer(new RabbitMqConsumerOptions +{ + ... + AckAfterHandler = true, // ACK deferred until all handlers succeed +})) +``` + +On handler retry-exhaustion the consumer issues `BasicNack(requeue: true)`, returning the message to the queue for redelivery. + +### Topology wait (topology owned by another service) + +In the example, `OrderService` declares the exchange (`DeclareExchange = true`) and `NotificationService` waits for it (`WaitForTopology = true`). If your topology is owned by a third service (e.g. a dedicated infrastructure bootstrap), apply `WaitForTopology` on the publisher side too: + +```csharp +.UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions +{ + ExchangeName = "raytree.changes", + DeclareExchange = false, // owned externally + WaitForTopology = true, + TopologyWaitTimeout = TimeSpan.FromMinutes(5), +})) +``` -This example deliberately omits EF Core to keep the focus on RabbitMQ wiring, but the limitation is real — please don't copy `OrderSimulator` into a service that needs durable consistency. +### Isolated-handler dispatch (one subscription per handler) + +The example uses **shared-handler** mode: a single `RabbitMqConsumer` delivers each message to all handlers in sequence. For independent downstream systems each needing their own queue, use `UseConsumerFactory`: + +```csharp +.UseConsumerFactory(handlerName => new RabbitMqConsumer(new RabbitMqConsumerOptions +{ + HostName = rmqHost, + QueueName = $"notification-service.orders.{handlerName}", + ExchangeName = "raytree.changes", + BindingKey = "change.Order.*", + DeclareQueue = true, + Durable = true, +})) +.OnInsert("email-handler", SendEmailAsync) +.OnUpdate("audit-handler", WriteAuditLogAsync) +``` + +Each named handler gets its own queue and consumer — offsets and retry budgets are isolated. + +### Distributed deduplication + +The default in-memory deduplication store does not survive restarts. For durable, cross-replica dedup swap in Redis: + +```csharp +builder.Services.AddChangeTracking(builder.Configuration, cfg => + cfg.UseDeduplicationStore(new RedisDeduplicationStore("localhost:6379"))); +``` -## Going further +### OpenTelemetry -- **NOTIFY/LISTEN fast-path** — set `PostgreSqlOutboxOptions.UseNotificationChannel = true` to get sub-100 ms publish latency instead of polling. A DB trigger fires `pg_notify` on every outbox INSERT; the publisher subscribes and claims rows atomically. -- **At-least-once delivery** — set `RabbitMqConsumerOptions.AckAfterHandler = true` so the broker ACK is deferred until all handlers complete. Combined with the deduplication store, this yields effectively-once semantics. -- **Isolated-handler dispatch** — replace `e.UseConsumer(...)` with `e.UseConsumerFactory(name => new RabbitMqConsumer(...))` to give each named handler its own broker subscription, retry budget, and dedup namespace. -- **Distributed deduplication** — swap the default in-memory dedup store for `RedisDeduplicationStore` so processed `correlationId`s survive restarts and are visible across replicas. -- **OpenTelemetry** — add `RayTree.OpenTelemetry` and call `meterProvider.AddRayTreeMetrics()` to export the 18 RayTree instruments (outbox lag, publish duration, handler attempts, etc.) to your OTel collector. +Add `RayTree.OpenTelemetry` and call `meterProvider.AddRayTreeMetrics()` to export the 18 RayTree instruments (outbox lag, publish duration, handler attempts, payload size, queue depth) to your OTel collector. diff --git a/examples/RabbitMQ.Microservices/docker-compose.yml b/examples/RabbitMQ.Microservices/docker-compose.yml index 50d5cfc..394de1e 100644 --- a/examples/RabbitMQ.Microservices/docker-compose.yml +++ b/examples/RabbitMQ.Microservices/docker-compose.yml @@ -8,7 +8,10 @@ services: ports: - "5432:5432" volumes: - - postgres-data:/var/lib/postgresql/data + # postgres 18+ stores data under a major-version subdirectory inside /var/lib/postgresql + # (e.g. /var/lib/postgresql/18/main). Mount the parent so the named volume captures + # the version-specific path and survives upgrades via pg_upgrade. + - postgres-data:/var/lib/postgresql healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres -d raytree_example"] interval: 5s @@ -16,10 +19,13 @@ services: retries: 10 rabbitmq: + # Pinned (not :latest) so the example stays reproducible as upstream RabbitMQ releases land. image: rabbitmq:4.3.0-management-alpine ports: - "5672:5672" - - "15672:15672" + - "15672:15672" # management UI — http://localhost:15672 (guest/guest) + volumes: + - rabbitmq-data:/var/lib/rabbitmq healthcheck: test: ["CMD", "rabbitmq-diagnostics", "ping"] interval: 10s @@ -48,11 +54,13 @@ services: environment: RABBITMQ_HOST: rabbitmq DOTNET_ENVIRONMENT: Development + # NO hard dependency on order-service. WaitForTopology = true in the consumer options probes + # the exchange passively and retries until order-service declares it — decoupling startup order + # without relying on compose depends_on for application-level topology readiness. depends_on: rabbitmq: condition: service_healthy - order-service: - condition: service_started volumes: postgres-data: + rabbitmq-data: diff --git a/openspec/changes/archive/2026-05-22-kafka-microservices-example/.openspec.yaml b/openspec/changes/archive/2026-05-22-kafka-microservices-example/.openspec.yaml new file mode 100644 index 0000000..4a1c677 --- /dev/null +++ b/openspec/changes/archive/2026-05-22-kafka-microservices-example/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-22 diff --git a/openspec/changes/archive/2026-05-22-kafka-microservices-example/design.md b/openspec/changes/archive/2026-05-22-kafka-microservices-example/design.md new file mode 100644 index 0000000..4702acc --- /dev/null +++ b/openspec/changes/archive/2026-05-22-kafka-microservices-example/design.md @@ -0,0 +1,119 @@ +## Context + +RayTree has full Kafka support (`KafkaPublisher`, `KafkaConsumer`, configurable partition-key selector, `AckAfterHandler`) but no runnable multi-service example. All existing Kafka coverage lives in integration tests (`RayTree.Plugins.Kafka.Tests`) that use Testcontainers and focus on correctness, not on illustrating real-world wiring. The new RabbitMQ example establishes the pattern; this change mirrors it for Kafka, adapting where the two brokers genuinely differ (topic-based routing vs. exchange/queue, partition keys, consumer groups, KRaft Docker image). + +The example must be completely self-contained: it cannot modify library source, must not be compiled as part of the main solution, and must run with a single `docker compose up`. + +## Goals / Non-Goals + +**Goals:** +- Show a realistic producer microservice (`OrderService`) that tracks `Order` entity changes into a PostgreSQL outbox and publishes to a Kafka topic. +- Show a realistic consumer microservice (`NotificationService`) that consumes order change messages from Kafka and dispatches to typed handlers. +- Demonstrate shared-consumer handler dispatch mode (all handlers on one consumer). +- Include `docker-compose.yml` that starts PostgreSQL, Kafka (KRaft — no Zookeeper), and both services. +- Use `PostgreSqlOutbox` and `PostgreSqlRepository` in `OrderService` to demonstrate the durable outbox path. +- Produce `README.md` explaining how to run and what to observe. + +**Non-Goals:** +- Isolated-consumer (per-handler subscription) mode — shared mode is sufficient. +- EF Core integration — that belongs in a separate dedicated example. +- OpenTelemetry wiring — out of scope for this example. +- Kafka Streams, KSQL, or Schema Registry — this example stays at the raw producer/consumer level. +- Production-hardened configuration (SSL/SASL, retry policies) — kept simple for readability. +- Changing any library source file or existing test. + +## Decisions + +### D1: Separate .NET solution file for the example + +The example uses its own `Kafka.Microservices.slnx` rather than being added to `RayTree.slnx`. This keeps CI unaffected and makes the example clearly standalone. Projects reference RayTree via `` so developers can hack on both simultaneously. + +**Alternative considered**: add to main solution. Rejected — same reason as the RMQ example: it couples the example's compile state to every library PR. + +### D2: Two console apps in one solution, not two separate repos + +`OrderService` and `NotificationService` are both console apps inside `examples/Kafka.Microservices/`, each a `Program.cs` using top-level statements. Keeping them together simplifies the `docker-compose.yml` and README. + +### D3: PostgreSQL outbox and repository + +`PostgreSqlOutbox` and `PostgreSqlRepository` are used for `OrderService`, identical to the RMQ example. The outbox survives restarts, schema migration runs automatically on `InitializeAsync`, and the repository provides typed INSERT/UPDATE/DELETE/SELECT. Both share the same `POSTGRES_CONNECTION` environment variable. + +`NotificationService` is subscriber-only — no outbox or repository. + +**Alternative considered**: in-memory outbox. Rejected — the user explicitly asked for PostgreSQL storage and outbox. + +### D4: Single Kafka topic, default partition key, multi-partition + +A single topic `raytree.order_changes` carries all change types (Insert, Update, Delete). The default `KafkaPublisherOptions.KeySelector` (`envelope => $"{envelope.EntityType}:{envelope.EntityId}"`) is used — no custom selector. This guarantees all changes for the same `Order` entity land on the same partition, preserving per-entity ordering. + +The broker is configured with `KAFKA_NUM_PARTITIONS=3` so auto-created topics get 3 partitions by default. Without this, `KAFKA_AUTO_CREATE_TOPICS_ENABLE=true` creates topics with a single partition — every message would land on partition 0 regardless of key, and the partition-key story would be vacuously true. Three is the minimum that lets a reader open the Kafka console and visually confirm different entities are spread across partitions. + +The topic name uses underscores (`order_changes`) instead of mixing `.` and `-` because Kafka's JMX metric subsystem treats `.` and `_` as interchangeable for metric naming, so mixed separators can produce ambiguous metric names in monitoring tools. + +`NotificationService` uses a single `KafkaConsumer` subscribed to `raytree.order_changes` with `GroupId = "notification-service"`. The `ChangeSubscriber` dispatches to Insert / Update / Delete handlers based on `envelope.ChangeType`. + +**Alternative considered**: separate topics per change type (`orders.insert`, `orders.update`, `orders.delete`). Rejected — it multiplies topic configuration for no benefit at demo scale; it also prevents per-entity ordering across change types. + +**Alternative considered**: custom partition key by `EntityId` only (strip type prefix). Rejected — the default already produces per-entity ordering without extra configuration. Showing the override belongs in a more advanced example. + +### D5: Shared-handler dispatch mode + +`NotificationService` uses `UseConsumer(consumer)` (shared mode) with `OnInsert`, `OnUpdate`, and `OnDelete` handlers chained on the returned `ISharedHandlerBuilder`. This mirrors the RMQ example and covers the common case without the boilerplate of `UseConsumerFactory`. + +### D6: KRaft mode Kafka image (no Zookeeper) + +The `docker-compose.yml` uses `apache/kafka:3.9.0` in KRaft mode — Kafka 3.x+, no separate Zookeeper container. The image tag is pinned (not `:latest`) so the example stays reproducible as upstream releases land. This simplifies the compose file (two backing services instead of three: PostgreSQL + Kafka) and reflects current Kafka best practice. + +Topic auto-creation is enabled (`KAFKA_AUTO_CREATE_TOPICS_ENABLE=true`) so neither service needs to pre-create the topic; Kafka creates `raytree.order_changes` on the first produce or subscribe call. `KAFKA_NUM_PARTITIONS=3` ensures the auto-created topic has enough partitions to demonstrate the partition-key behaviour (see D4). + +**Healthcheck**: a TCP probe (`nc -z localhost 9092`) with `start_period: 60s`. KRaft brokers can take 10–20 s to complete controller election on cold start, and `kafka-topics.sh` (the obvious alternative) waits for cluster metadata to converge which can add another 10 s. A TCP probe is the lightest reliable signal that the broker is accepting client connections. + +**Alternative considered**: Bitnami Kafka image with Zookeeper. Rejected — adds a third container and is the legacy path. + +**Alternative considered**: `confluentinc/confluent-local`. Rejected — it bundles Schema Registry and ksqlDB that this example doesn't use, and `apache/kafka` is the official upstream image with smaller surface. + +**Alternative considered**: require manual topic creation before running. Rejected — introduces friction that makes the example harder to follow. + +**Alternative considered**: `kafka-topics.sh` as the healthcheck. Rejected — slow first-pass convergence and brittle on cold KRaft start. + +### D7: `RayTree.Hosting` over raw builder + +Both services use `Host.CreateApplicationBuilder(args)` + `services.AddChangeTracking(configuration, configure)` from `RayTree.Hosting`. Same rationale as the RMQ example: graceful shutdown, structured logging, and `IConfiguration` binding without bespoke env-var parsing. + +### D8: Repository / outbox atomicity is a known simplification + +Identical caveat to the RMQ example. The example is not transactionally safe between the entity table and the outbox. Documented prominently in the README with a pointer to `RayTree.EntityFrameworkCore`. + +### D9: Shared class library for the `Order` entity + +A `Shared/Shared.csproj` class library holds `Order.cs` with `[Key]`, `[Table("orders")]`, and the four standard properties (`Id`, `CustomerName`, `TotalAmount`, `Status`). Identical structure to the RMQ example so readers can compare the two examples side-by-side without re-learning the entity definition. + +### D10: Local Directory.Build.props and Directory.Packages.props that inherit the root + +Same solution as the RMQ example. The example needs `Microsoft.Extensions.Hosting` (full, not abstractions-only) and `Confluent.Kafka` (via `RayTree.Plugins.Kafka` transitive reference). The root `Directory.Packages.props` must not be modified. Local files in `examples/Kafka.Microservices/` import the parent via `$([MSBuild]::GetPathOfFileAbove(...))` and add example-only entries. + +### D11: MessagePack serializer + Gzip compressor + +Same payload pipeline as the RMQ example: `RayTree.Plugins.Serializers.MessagePack` + `RayTree.Plugins.Compressors.Gzip`. Both services register the same pair; the `Order` POCO needs no MessagePack-specific attributes. `MessageEnvelope.Payload` on the broker is `gzip(messagepack(EntityChange))`. + +### D12: At-most-once delivery (default `AckAfterHandler = false`) + +The example uses the default offset-commit behaviour: `KafkaConsumer` commits the offset on the poll thread immediately after parsing the message, before dispatch to `ChangeSubscriber` — at-most-once semantics. (This is RayTree's behaviour, not Confluent.Kafka's time-based auto-commit, which is disabled internally by RayTree.) A process crash between commit and handler completion loses the message because the committed offset has already advanced. + +This keeps the example simple and avoids the `MaxDegreeOfParallelism = 1` caveat that at-least-once requires. The README notes that setting `AckAfterHandler = true` plus `MaxDegreeOfParallelism = 1` enables at-least-once delivery for production use — and explains the parallelism constraint (Kafka offset commits are monotonic; concurrent commits of out-of-order offsets could advance past in-flight messages and undo the guarantee). + +### D13: NotificationService does NOT depend on order-service in compose + +In the RMQ example the consumer depends on `order-service: service_started` because the consumer only *binds* the exchange (`DeclareExchange = false` on the consumer side) and would fail if the exchange does not exist yet. In Kafka there is no equivalent binding step — the consumer's `Subscribe` call auto-creates the topic when `KAFKA_AUTO_CREATE_TOPICS_ENABLE=true`, and `FromEarliest = true` (the default in `KafkaConsumerOptions`) makes the consumer replay from offset 0 once `order-service` later publishes. So the dependency is unnecessary and the consumer is allowed to start first. + +**Alternative considered**: keep the dependency "for log readability". Rejected — it would teach the wrong mental model. Kafka consumers should be free to start independently of producers; that's a Kafka design property worth showing. + +## Risks / Trade-offs + +- **PostgreSQL schema migration on every startup** → `InitializeAsync` is idempotent; safe. Adds a few hundred milliseconds. +- **Both services reference library projects directly** → example fails to compile if the library has errors. Acceptable; no NuGet publish step needed. +- **KRaft Kafka container first-boot latency** → KRaft broker may take 5–15 s to become ready. Mitigated by a `healthcheck` on the `kafka` service that polls the broker API, and `condition: service_healthy` in both service `depends_on` blocks. +- **Topic auto-creation** → `KAFKA_AUTO_CREATE_TOPICS_ENABLE=true` creates `raytree.order-changes` with default partition count (1) and replication factor (1). Acceptable for a single-broker example; production deployments would pre-create topics with explicit settings. +- **Non-atomic repository + outbox writes** → See D8. Documented in README. +- **Default 5 s outbox polling sluggish for a demo** → Mitigated by setting `PollingInterval = TimeSpan.FromMilliseconds(500)` in `OrderService`. +- **Hard-coded localhost broker addresses** → Runtime behaviour configurable via `KAFKA_BOOTSTRAP_SERVERS` and `POSTGRES_CONNECTION` environment variables; README documents the pattern. diff --git a/openspec/changes/archive/2026-05-22-kafka-microservices-example/proposal.md b/openspec/changes/archive/2026-05-22-kafka-microservices-example/proposal.md new file mode 100644 index 0000000..14bf5b3 --- /dev/null +++ b/openspec/changes/archive/2026-05-22-kafka-microservices-example/proposal.md @@ -0,0 +1,26 @@ +## Why + +The library has a runnable RabbitMQ multi-service example but no equivalent for Kafka. Developers evaluating RayTree in a Kafka-based platform need a concrete, working reference that shows how to wire producer and consumer microservices end-to-end — including PostgreSQL-backed outbox, partition-key routing, and Docker Compose spin-up — without relying on the RabbitMQ example for guidance. + +## What Changes + +- Add a new `examples/Kafka.Microservices/` solution demonstrating two microservices: an `OrderService` that tracks order entity changes via a PostgreSQL outbox and publishes to Kafka, and a `NotificationService` that consumes those changes and handles them. +- The example covers: PostgreSQL-backed outbox and repository for `OrderService`; `KafkaPublisher` targeting a single topic with the default partition key (`EntityType:EntityId`); `KafkaConsumer` in `NotificationService`; MessagePack serialization with Gzip compression; shared-handler dispatch mode; and Docker Compose for local spin-up (PostgreSQL + Kafka + Zookeeper + both services). +- No changes to library source code or existing tests. + +## Capabilities + +### New Capabilities + +- `kafka-microservices-example`: A self-contained multi-microservice example project demonstrating RayTree's Kafka integration for entity change streaming across service boundaries. + +### Modified Capabilities + + + +## Impact + +- New `examples/Kafka.Microservices/` directory; no changes to existing library source, tests, or CI. +- Requires `RayTree.Core`, `RayTree.Hosting`, `RayTree.Plugins.PostgreSQL`, `RayTree.Plugins.Kafka`, `RayTree.Plugins.Serializers.MessagePack`, and `RayTree.Plugins.Compressors.Gzip` project references (all exist in the solution). +- Example is excluded from the main `RayTree.slnx` solution file but can be opened standalone. +- Adds a `docker-compose.yml` with Kafka (KRaft mode — no Zookeeper), PostgreSQL, and both services; does not affect existing Testcontainers-based integration tests. diff --git a/openspec/changes/archive/2026-05-22-kafka-microservices-example/specs/kafka-microservices-example/spec.md b/openspec/changes/archive/2026-05-22-kafka-microservices-example/specs/kafka-microservices-example/spec.md new file mode 100644 index 0000000..3d90941 --- /dev/null +++ b/openspec/changes/archive/2026-05-22-kafka-microservices-example/specs/kafka-microservices-example/spec.md @@ -0,0 +1,170 @@ +## ADDED Requirements + +### Requirement: Example solution structure +The example SHALL be a standalone .NET solution at `examples/Kafka.Microservices/` containing three projects (a `Shared` class library, an `OrderService` console app, and a `NotificationService` console app) plus a `docker-compose.yml`. It SHALL NOT be included in the main `RayTree.slnx` solution. + +#### Scenario: Solution compiles independently +- **WHEN** a developer runs `dotnet build` inside `examples/Kafka.Microservices/` +- **THEN** `Shared`, `OrderService`, and `NotificationService` build successfully without errors + +#### Scenario: Projects reference library via ProjectReference +- **WHEN** the example projects are opened in an IDE +- **THEN** `OrderService` and `NotificationService` reference RayTree assemblies via `` pointing to the `src/` directory, and both reference the `Shared` project for the `Order` entity + +#### Scenario: Central package management is inherited via local props files +- **WHEN** a developer inspects the example `.csproj` files +- **THEN** package references carry no `Version=` attribute and resolve via `examples/Kafka.Microservices/Directory.Packages.props`, which itself ``s the repo-root `Directory.Packages.props` and only appends example-only packages (e.g. `Microsoft.Extensions.Hosting`) + +#### Scenario: Local Directory.Build.props isolates packaging metadata +- **WHEN** a developer inspects `examples/Kafka.Microservices/Directory.Build.props` +- **THEN** it imports the repo-root `Directory.Build.props` and overrides packaging metadata (`false`, no `` / author / license inheritance for the console apps), leaving the root file untouched + +### Requirement: Order entity definition +Both services SHALL share a common `Order` entity class with at least `Id` (Guid, explicitly annotated `[Key]`), `CustomerName` (string), `TotalAmount` (decimal), and `Status` (string) properties. The class SHALL carry `[Table("orders")]` so the `PostgreSqlRepository` source table is the plural form. The entity SHALL live in a `Shared` class library project referenced by both services. + +#### Scenario: Entity properties round-trip through MessagePack +- **WHEN** an `Order` is serialized by `RayTree.Plugins.Serializers.MessagePack` and then deserialized +- **THEN** all four properties (`Id`, `CustomerName`, `TotalAmount`, `Status`) round-trip with their original values + +#### Scenario: No MessagePack-specific attributes required +- **WHEN** the `Order` POCO is inspected +- **THEN** it carries no `[MessagePackObject]` / `[Key(int)]` attributes — the contractless resolver handles plain properties + +### Requirement: OrderService publishes change events +`OrderService` SHALL configure `EntityChangeTracker` with a `PostgreSqlOutbox`, a `PostgreSqlRepository`, a `KafkaPublisher` targeting a topic named `raytree.order_changes`, the MessagePack serializer (`.UseMessagePackSerializer()`), and the Gzip compressor (`.UseGzipCompressor()`). It SHALL periodically create, update, and delete `Order` entities — writing through the repository and tracking changes — to generate a continuous stream of change events. + +#### Scenario: Insert event published +- **WHEN** `OrderService` calls `TrackInsertAsync` for a new `Order` +- **THEN** a `MessageEnvelope` with `ChangeType = Insert` is published to the `raytree.order_changes` Kafka topic with a partition key of `"Order:"` + +#### Scenario: Update event published +- **WHEN** `OrderService` calls `TrackUpdateAsync` for an existing `Order` +- **THEN** a `MessageEnvelope` with `ChangeType = Update` is published to the `raytree.order_changes` topic with the same partition key, guaranteeing the update lands on the same partition as the preceding Insert + +#### Scenario: Delete event published +- **WHEN** `OrderService` calls `TrackDeleteAsync` for an `Order` +- **THEN** a `MessageEnvelope` with `ChangeType = Delete` is published to the `raytree.order_changes` topic on the same partition as the Insert and Update for that entity + +### Requirement: NotificationService consumes change events +`NotificationService` SHALL configure `EntityChangeTracker` with a `KafkaConsumer` subscribed to the `raytree.order_changes` topic using `GroupId = "notification-service"`, plus the matching MessagePack serializer (`.UseMessagePackSerializer()`) and Gzip compressor (`.UseGzipCompressor()`). It SHALL register separate handlers for Insert, Update, and Delete change types using shared-handler dispatch mode. + +#### Scenario: Insert handler invoked +- **WHEN** a `MessageEnvelope` with `ChangeType = Insert` is consumed from Kafka +- **THEN** the registered `OnInsert` handler is invoked with the deserialized `EntityChange` + +#### Scenario: Update handler invoked +- **WHEN** a `MessageEnvelope` with `ChangeType = Update` is consumed from Kafka +- **THEN** the registered `OnUpdate` handler is invoked with the deserialized `EntityChange` + +#### Scenario: Delete handler invoked +- **WHEN** a `MessageEnvelope` with `ChangeType = Delete` is consumed from Kafka +- **THEN** the registered `OnDelete` handler is invoked with the deserialized `EntityChange` + +#### Scenario: Serializer / compressor mismatch fails fast +- **WHEN** `NotificationService` is started with a different serializer or compressor than `OrderService` +- **THEN** deserialization throws on the first received envelope (illustrating that both services must register the same payload pipeline) + +### Requirement: Default partition key preserves per-entity ordering +`OrderService` SHALL use the default `KafkaPublisherOptions.KeySelector` (`envelope => $"{envelope.EntityType}:{envelope.EntityId}"`). All changes for the same `Order` entity SHALL be published to the same Kafka partition, so `NotificationService` processes Insert / Update / Delete events for a given entity in the order they were written. The example topic SHALL be created with **at least 3 partitions** so the partition-key effect is observable (with 1 partition every message lands on the same partition regardless of key). + +#### Scenario: Same entity lands on same partition +- **WHEN** Insert, Update, and Delete events are published for the same `Order.Id` against a multi-partition topic +- **THEN** all three `DeliveryResult` records returned by `Confluent.Kafka` report the same `Partition` value + +#### Scenario: Different entities spread across partitions +- **WHEN** Insert events are published for several different `Order.Id` values against a multi-partition topic +- **THEN** the resulting partition assignments are distributed (not all messages on partition 0), demonstrating that the default key selector shards by entity + +### Requirement: Default at-most-once delivery +The example SHALL use the default `KafkaConsumerOptions.AckAfterHandler = false`. The offset is committed on the poll thread immediately after parsing the message, before dispatch to `ChangeSubscriber`. The README SHALL document this and SHALL explain how to opt in to at-least-once delivery (`AckAfterHandler = true`) and the accompanying `MaxDegreeOfParallelism = 1` requirement. + +#### Scenario: Default is at-most-once +- **WHEN** the `KafkaConsumer` is constructed in `NotificationService` +- **THEN** `KafkaConsumerOptions.AckAfterHandler` is left at its default `false`, and no explicit `MaxDegreeOfParallelism` override is set on `SubscriberOptions` + +### Requirement: Consumer group enables horizontal scaling +The `NotificationService` `GroupId` SHALL be a stable identifier (`"notification-service"`) so that multiple replicas of `NotificationService` form a single Kafka consumer group. Kafka SHALL assign disjoint subsets of the topic's partitions to each replica, parallelising consumption without any RayTree-level configuration. + +#### Scenario: Two replicas share the partitions +- **WHEN** a second instance of `NotificationService` is started with the same `GroupId` against a topic with at least 2 partitions +- **THEN** the Kafka broker triggers a consumer-group rebalance and each replica receives a non-overlapping subset of the partitions + +### Requirement: PostgreSQL outbox and repository schema +`OrderService` SHALL use `PostgreSqlOutbox` and `PostgreSqlRepository`. On startup, `InitializeAsync` SHALL automatically create or migrate the `order_outbox` table and the `orders` table (set by `[Table("orders")]` on the entity). + +#### Scenario: Schema created on first run +- **WHEN** `OrderService` starts against an empty PostgreSQL database +- **THEN** `order_outbox` and `orders` tables are created with all required columns and indexes before the publish loop starts + +#### Scenario: Schema migration is idempotent +- **WHEN** `OrderService` is restarted against a database that already has the tables +- **THEN** startup completes without error and no duplicate tables or columns are created + +### Requirement: Outbox polling interval +`OrderService` SHALL configure `OutboxPublisherOptions.PollingInterval = TimeSpan.FromMilliseconds(500)` so demonstration changes surface in Kafka within roughly half a second of being tracked, rather than the default 5 s. + +#### Scenario: Demo events appear quickly +- **WHEN** `OrderService` calls `TrackInsertAsync` +- **THEN** the corresponding `MessageEnvelope` is observable in Kafka within ~1 s under steady-state polling + +### Requirement: Docker Compose local run +A `docker-compose.yml` at the example root SHALL define services for PostgreSQL, Kafka (KRaft mode, no Zookeeper), `order-service`, and `notification-service`. Running `docker compose up` SHALL start all services without manual configuration steps. The Kafka service SHALL pin a concrete image tag (e.g. `apache/kafka:3.9.0`), not `:latest`. The `order-service` SHALL declare `depends_on` health-check dependencies on both `postgres` and `kafka`. The `notification-service` SHALL declare only a health-check dependency on `kafka` — it does NOT depend on `order-service` because Kafka auto-creates the topic on the first `Subscribe` call, so the consumer can start before the producer without losing messages (the `FromEarliest = true` consumer default replays from offset 0 when `order-service` later begins publishing). + +The Kafka healthcheck SHALL use a TCP-level probe (`nc -z localhost 9092` or equivalent) with a `start_period` of at least 60 seconds to accommodate KRaft controller election latency on cold start. The healthcheck SHALL NOT depend on `kafka-topics.sh` or other JVM-startup-bound tooling. + +#### Scenario: Services start with docker compose up +- **WHEN** a developer runs `docker compose up` from `examples/Kafka.Microservices/` +- **THEN** PostgreSQL starts on port 5432, Kafka starts on port 9092, and both .NET services connect and begin producing/consuming messages + +#### Scenario: Kafka broker address is configurable +- **WHEN** a developer sets the `KAFKA_BOOTSTRAP_SERVERS` environment variable before running +- **THEN** both services use that address instead of the default `localhost:9092` + +#### Scenario: PostgreSQL connection is configurable +- **WHEN** a developer sets the `POSTGRES_CONNECTION` environment variable before running +- **THEN** `OrderService` uses that connection string instead of the default + +### Requirement: Topic auto-creation with multiple partitions +The `docker-compose.yml` SHALL configure the Kafka broker with `KAFKA_AUTO_CREATE_TOPICS_ENABLE=true` AND `KAFKA_NUM_PARTITIONS=3` (or higher) so the `raytree.order_changes` topic is created automatically on the first produce or subscribe call with enough partitions to demonstrate per-entity routing. No manual topic administration step SHALL be required. + +#### Scenario: Topic created on first publish +- **WHEN** `OrderService` publishes its first message to `raytree.order_changes` +- **THEN** the broker creates the topic automatically with 3 partitions + +#### Scenario: Topic created on first subscribe +- **WHEN** `NotificationService` starts before `OrderService` and calls `Subscribe` +- **THEN** the broker creates the topic automatically with 3 partitions, and `NotificationService` begins receiving messages as soon as `OrderService` publishes + +### Requirement: Generic Host integration and graceful shutdown +Both services SHALL bootstrap via `Host.CreateApplicationBuilder(args)` and register RayTree through `services.AddChangeTracking(configuration, configure)` from `RayTree.Hosting`. The `IHostApplicationLifetime` SHALL drive graceful shutdown — Ctrl+C, SIGTERM, or `docker compose down` SHALL stop the publisher and consumer loops cleanly. + +#### Scenario: Ctrl+C triggers graceful shutdown +- **WHEN** a developer presses Ctrl+C in the `OrderService` console +- **THEN** `OutboxPublisherService` stops polling, in-flight publishes complete, and the process exits with code 0 + +#### Scenario: docker compose down stops cleanly +- **WHEN** a developer runs `docker compose down` +- **THEN** both services receive SIGTERM, `ChangeTrackingHostedService.StopAsync` runs, and containers exit gracefully + +### Requirement: Atomicity caveat is documented +Because `PostgreSqlRepository` and `PostgreSqlOutbox` do not share a transaction in this example, the README SHALL contain a prominent caveat explaining that a crash between the repository write and the outbox write can leave the two tables inconsistent, and SHALL point readers to `RayTree.EntityFrameworkCore` (`EntityChangeInterceptor`) as the production-grade transactional path. + +#### Scenario: Caveat is visible in README +- **WHEN** a developer reads the example README +- **THEN** a clearly marked section explains the non-atomic write path and recommends EF Core integration for production use + +### Requirement: README with run instructions +A `README.md` SHALL accompany the example explaining prerequisites, how to run with Docker Compose, what to observe in the console output, and a brief description of the project structure. It SHALL cover the following Kafka-specific topics: + +- **Default partition-key strategy** — `EntityType:EntityId` keeps per-entity ordering; how to override `KafkaPublisherOptions.KeySelector` to shard by tenant or aggregate root. +- **`FromEarliest = true` default** — a new consumer group reads from offset 0; restarting the same group resumes from the last committed offset. +- **Delivery guarantees** — the example uses the default `AckAfterHandler = false` (at-most-once); document how to switch to at-least-once (`AckAfterHandler = true`) AND the accompanying `SubscriberOptions.MaxDegreeOfParallelism = 1` requirement (Kafka offset commits are monotonic — concurrent commits could skip in-flight messages). +- **Consumer-group scaling** — running multiple `NotificationService` replicas with the same `GroupId` causes Kafka to rebalance partitions across them automatically, with no RayTree-level configuration. + +#### Scenario: Developer can follow README without prior RayTree knowledge +- **WHEN** a developer reads the README and follows its steps +- **THEN** they can run the example and see change events flowing from `OrderService` to `NotificationService` in under 5 minutes + +#### Scenario: README documents at-least-once trade-off +- **WHEN** a developer reads the delivery-guarantees section +- **THEN** they see both the flag (`AckAfterHandler = true`) and the parallelism constraint (`MaxDegreeOfParallelism = 1`) called out explicitly diff --git a/openspec/changes/archive/2026-05-22-kafka-microservices-example/tasks.md b/openspec/changes/archive/2026-05-22-kafka-microservices-example/tasks.md new file mode 100644 index 0000000..21090a0 --- /dev/null +++ b/openspec/changes/archive/2026-05-22-kafka-microservices-example/tasks.md @@ -0,0 +1,62 @@ +## 1. Solution Scaffold + +- [x] 1.1 Create `examples/Kafka.Microservices/` directory structure +- [x] 1.2 Create `Kafka.Microservices.slnx` solution file (standalone — not added to `RayTree.slnx`) +- [x] 1.3 Add `Shared/Shared.csproj` class library project (target framework `net10.0`) — holds the `Order` entity +- [x] 1.4 Add `OrderService/OrderService.csproj` console-app project referencing `Shared`, `RayTree.Core`, `RayTree.Hosting`, `RayTree.Plugins.PostgreSQL`, `RayTree.Plugins.Kafka`, `RayTree.Plugins.Serializers.MessagePack`, `RayTree.Plugins.Compressors.Gzip` +- [x] 1.5 Add `NotificationService/NotificationService.csproj` console-app project referencing `Shared`, `RayTree.Core`, `RayTree.Hosting`, `RayTree.Plugins.Kafka`, `RayTree.Plugins.Serializers.MessagePack`, `RayTree.Plugins.Compressors.Gzip` +- [x] 1.6 Add all three projects to the solution file +- [x] 1.7 Create `examples/Kafka.Microservices/Directory.Build.props` that ``s the repo-root `Directory.Build.props` and overrides `false` plus blanks out packaging metadata that doesn't apply to console apps +- [x] 1.8 Create `examples/Kafka.Microservices/Directory.Packages.props` that ``s the repo-root `Directory.Packages.props` and appends `` +- [x] 1.9 Verify no `Version=` attributes appear on any `` so central package management governs versions + +## 2. Shared Entity + +- [x] 2.1 Create `Shared/Order.cs` with `[Table("orders")]` on the class and the following properties: `[Key] Guid Id`, `string CustomerName`, `decimal TotalAmount`, `string Status` +- [x] 2.2 Ensure `Order` is a plain POCO — no `[MessagePackObject]` / `[Key(int)]` attributes are required because the plugin uses the contractless typeless resolver + +## 3. OrderService Implementation + +- [x] 3.1 In `OrderService/Program.cs` use `Host.CreateApplicationBuilder(args)` and register RayTree via `builder.Services.AddChangeTracking(builder.Configuration, configure => { ... })` +- [x] 3.2 Inside the `configure` callback: register `PostgreSqlOutbox` and `PostgreSqlRepository` against the connection string from `POSTGRES_CONNECTION` (default `Host=localhost;Port=5432;Database=raytree_example;Username=postgres;Password=postgres`) +- [x] 3.3 Register `KafkaPublisher` targeting topic `raytree.order_changes`; use the default `KeySelector` (`envelope => $"{envelope.EntityType}:{envelope.EntityId}"`) — no explicit override required +- [x] 3.4 Call `.UseMessagePackSerializer()` and `.UseGzipCompressor()` on the builder so the payload pipeline is MessagePack + Gzip +- [x] 3.5 Configure `OutboxPublisherOptions.PollingInterval = TimeSpan.FromMilliseconds(500)` for snappy demo behaviour +- [x] 3.6 Read Kafka broker address from `KAFKA_BOOTSTRAP_SERVERS` environment variable (default `localhost:9092`) +- [x] 3.7 Add a `BackgroundService` (e.g. `OrderSimulator`) that periodically inserts, updates, and deletes `Order` rows via `IRepository` and tracks each change via `EntityChangeTracker.TrackXxxAsync`. Use a short delay between operations (e.g. 1–2 s) so output is readable. +- [x] 3.8 Log every operation to the console using `ILogger` (structured logging via Generic Host defaults) +- [x] 3.9 Rely on `IHostApplicationLifetime` for graceful shutdown — no manual `Console.CancelKeyPress` wiring needed + +## 4. NotificationService Implementation + +- [x] 4.1 In `NotificationService/Program.cs` use `Host.CreateApplicationBuilder(args)` and register RayTree via `builder.Services.AddChangeTracking(builder.Configuration, configure => { ... })` +- [x] 4.2 Register a `KafkaConsumer` subscribed to topic `raytree.order_changes` with `GroupId = "notification-service"` and `BootstrapServers` from `KAFKA_BOOTSTRAP_SERVERS` (default `localhost:9092`); leave `FromEarliest = true` (the default in `KafkaConsumerOptions`) and `AckAfterHandler = false` (also the default) — at-most-once is the example baseline +- [x] 4.3 Call `.UseMessagePackSerializer()` and `.UseGzipCompressor()` on the builder — must match `OrderService`'s payload pipeline exactly +- [x] 4.4 Inside `ForEntity(b => b.UseConsumer(...))`, chain `OnInsert` / `OnUpdate` / `OnDelete` handlers in shared-handler mode that each log the change details with `ILogger` +- [x] 4.5 Rely on `ChangeTrackingHostedService` (registered by `AddChangeTracking`) for `StartAsync`/`StopAsync` — no manual lifetime code + +## 5. Docker Compose + +- [x] 5.1 Create `docker-compose.yml` with `postgres:18.1-alpine3.22` service on port 5432, env vars `POSTGRES_DB=raytree_example`, `POSTGRES_USER=postgres`, `POSTGRES_PASSWORD=postgres`, and a `healthcheck` running `pg_isready -U postgres -d raytree_example` every 5 s +- [x] 5.2 Add a Kafka service using **`apache/kafka:3.9.0`** (pinned, not `:latest`) in KRaft single-node mode exposing port 9092. Required env vars: `KAFKA_AUTO_CREATE_TOPICS_ENABLE=true`, `KAFKA_NUM_PARTITIONS=3` (so the partition-key behaviour is observable — with 1 partition every message lands on the same partition regardless of key), `KAFKA_PROCESS_ROLES=broker,controller`, `KAFKA_NODE_ID=1`, `KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093`, `KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093`, `KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092`, `KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER`, `KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT`, `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1`, `KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0` +- [x] 5.3 Configure Kafka healthcheck as a TCP probe (`test: ["CMD-SHELL", "nc -z localhost 9092 || exit 1"]`) with `interval: 10s`, `timeout: 5s`, `retries: 10`, and `start_period: 60s` — accommodates KRaft controller-election latency on cold start without depending on `kafka-topics.sh` or other JVM-bound tooling +- [x] 5.4 Add `order-service` with env `KAFKA_BOOTSTRAP_SERVERS=kafka:9092`, `POSTGRES_CONNECTION=Host=postgres;Port=5432;Database=raytree_example;Username=postgres;Password=postgres`, and `depends_on: {postgres: {condition: service_healthy}, kafka: {condition: service_healthy}}` +- [x] 5.5 Add `notification-service` with env `KAFKA_BOOTSTRAP_SERVERS=kafka:9092` and `depends_on: {kafka: {condition: service_healthy}}` — **no dependency on `order-service`** because Kafka auto-creates the topic on the consumer's `Subscribe` call and the `FromEarliest = true` default makes the consumer replay from offset 0 once `order-service` later publishes (per design D13) +- [x] 5.6 Define a named volume for PostgreSQL data so restarts preserve the outbox state + +## 6. Dockerfiles + +- [x] 6.1 Add multi-stage `OrderService/Dockerfile` (`mcr.microsoft.com/dotnet/sdk:10.0` build → `mcr.microsoft.com/dotnet/runtime:10.0` runtime) +- [x] 6.2 Add multi-stage `NotificationService/Dockerfile` with the same base images +- [x] 6.3 Ensure both Dockerfiles copy and restore from the repo root so `ProjectReference` paths to `src/RayTree.*` resolve correctly inside the build context + +## 7. Documentation + +- [x] 7.1 Write `README.md` covering: prerequisites (Docker, .NET 10 SDK for local dev), `docker compose up` instructions, expected console output, Kafka broker address (`localhost:9092`), PostgreSQL connection details, project structure overview +- [x] 7.2 Add a **"Known limitations"** section explaining the non-atomic repository + outbox writes (per design decision D8) and pointing readers to `RayTree.EntityFrameworkCore` / `EntityChangeInterceptor` as the production-grade transactional path +- [x] 7.3 Add a **"Going further"** section covering: NOTIFY/LISTEN fast-path (`PostgreSqlOutboxOptions.UseNotificationChannel = true`); **at-least-once delivery** — set `KafkaConsumerOptions.AckAfterHandler = true` AND `SubscriberOptions.MaxDegreeOfParallelism = 1` together (explain explicitly that Kafka offset commits are monotonic and concurrent commits of out-of-order offsets would advance past in-flight messages, undoing the guarantee); custom `KeySelector` for sharding by tenant or aggregate root; isolated-handler dispatch mode for per-handler consumer groups +- [x] 7.4 Add a **"Consumer-group scaling"** section showing that running multiple `NotificationService` replicas with the same `GroupId` causes Kafka to rebalance the 3 partitions across them automatically — demonstrate by running `docker compose up --scale notification-service=2` and observing partition assignment in the logs +- [x] 7.5 Add a **"Partition-key behaviour"** section showing how to inspect partition assignment (`docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh --topic raytree.order_changes --partition --bootstrap-server localhost:9092 --from-beginning`) and verify that all events for one `Order.Id` land on the same partition while different `Id`s spread across partitions +- [x] 7.6 Document the **`FromEarliest = true`** default — a new consumer group reads from offset 0; restarting the same group resumes from the last committed offset. This is why `notification-service` can start before `order-service` and still see every message. +- [x] 7.7 Note in README that the example is intentionally not part of `RayTree.slnx` — open `examples/Kafka.Microservices/Kafka.Microservices.slnx` directly +- [x] 7.8 Add inline code comments in both `Program.cs` files explaining the key builder calls (one short line per non-obvious step) diff --git a/openspec/specs/kafka-microservices-example/spec.md b/openspec/specs/kafka-microservices-example/spec.md new file mode 100644 index 0000000..3d90941 --- /dev/null +++ b/openspec/specs/kafka-microservices-example/spec.md @@ -0,0 +1,170 @@ +## ADDED Requirements + +### Requirement: Example solution structure +The example SHALL be a standalone .NET solution at `examples/Kafka.Microservices/` containing three projects (a `Shared` class library, an `OrderService` console app, and a `NotificationService` console app) plus a `docker-compose.yml`. It SHALL NOT be included in the main `RayTree.slnx` solution. + +#### Scenario: Solution compiles independently +- **WHEN** a developer runs `dotnet build` inside `examples/Kafka.Microservices/` +- **THEN** `Shared`, `OrderService`, and `NotificationService` build successfully without errors + +#### Scenario: Projects reference library via ProjectReference +- **WHEN** the example projects are opened in an IDE +- **THEN** `OrderService` and `NotificationService` reference RayTree assemblies via `` pointing to the `src/` directory, and both reference the `Shared` project for the `Order` entity + +#### Scenario: Central package management is inherited via local props files +- **WHEN** a developer inspects the example `.csproj` files +- **THEN** package references carry no `Version=` attribute and resolve via `examples/Kafka.Microservices/Directory.Packages.props`, which itself ``s the repo-root `Directory.Packages.props` and only appends example-only packages (e.g. `Microsoft.Extensions.Hosting`) + +#### Scenario: Local Directory.Build.props isolates packaging metadata +- **WHEN** a developer inspects `examples/Kafka.Microservices/Directory.Build.props` +- **THEN** it imports the repo-root `Directory.Build.props` and overrides packaging metadata (`false`, no `` / author / license inheritance for the console apps), leaving the root file untouched + +### Requirement: Order entity definition +Both services SHALL share a common `Order` entity class with at least `Id` (Guid, explicitly annotated `[Key]`), `CustomerName` (string), `TotalAmount` (decimal), and `Status` (string) properties. The class SHALL carry `[Table("orders")]` so the `PostgreSqlRepository` source table is the plural form. The entity SHALL live in a `Shared` class library project referenced by both services. + +#### Scenario: Entity properties round-trip through MessagePack +- **WHEN** an `Order` is serialized by `RayTree.Plugins.Serializers.MessagePack` and then deserialized +- **THEN** all four properties (`Id`, `CustomerName`, `TotalAmount`, `Status`) round-trip with their original values + +#### Scenario: No MessagePack-specific attributes required +- **WHEN** the `Order` POCO is inspected +- **THEN** it carries no `[MessagePackObject]` / `[Key(int)]` attributes — the contractless resolver handles plain properties + +### Requirement: OrderService publishes change events +`OrderService` SHALL configure `EntityChangeTracker` with a `PostgreSqlOutbox`, a `PostgreSqlRepository`, a `KafkaPublisher` targeting a topic named `raytree.order_changes`, the MessagePack serializer (`.UseMessagePackSerializer()`), and the Gzip compressor (`.UseGzipCompressor()`). It SHALL periodically create, update, and delete `Order` entities — writing through the repository and tracking changes — to generate a continuous stream of change events. + +#### Scenario: Insert event published +- **WHEN** `OrderService` calls `TrackInsertAsync` for a new `Order` +- **THEN** a `MessageEnvelope` with `ChangeType = Insert` is published to the `raytree.order_changes` Kafka topic with a partition key of `"Order:"` + +#### Scenario: Update event published +- **WHEN** `OrderService` calls `TrackUpdateAsync` for an existing `Order` +- **THEN** a `MessageEnvelope` with `ChangeType = Update` is published to the `raytree.order_changes` topic with the same partition key, guaranteeing the update lands on the same partition as the preceding Insert + +#### Scenario: Delete event published +- **WHEN** `OrderService` calls `TrackDeleteAsync` for an `Order` +- **THEN** a `MessageEnvelope` with `ChangeType = Delete` is published to the `raytree.order_changes` topic on the same partition as the Insert and Update for that entity + +### Requirement: NotificationService consumes change events +`NotificationService` SHALL configure `EntityChangeTracker` with a `KafkaConsumer` subscribed to the `raytree.order_changes` topic using `GroupId = "notification-service"`, plus the matching MessagePack serializer (`.UseMessagePackSerializer()`) and Gzip compressor (`.UseGzipCompressor()`). It SHALL register separate handlers for Insert, Update, and Delete change types using shared-handler dispatch mode. + +#### Scenario: Insert handler invoked +- **WHEN** a `MessageEnvelope` with `ChangeType = Insert` is consumed from Kafka +- **THEN** the registered `OnInsert` handler is invoked with the deserialized `EntityChange` + +#### Scenario: Update handler invoked +- **WHEN** a `MessageEnvelope` with `ChangeType = Update` is consumed from Kafka +- **THEN** the registered `OnUpdate` handler is invoked with the deserialized `EntityChange` + +#### Scenario: Delete handler invoked +- **WHEN** a `MessageEnvelope` with `ChangeType = Delete` is consumed from Kafka +- **THEN** the registered `OnDelete` handler is invoked with the deserialized `EntityChange` + +#### Scenario: Serializer / compressor mismatch fails fast +- **WHEN** `NotificationService` is started with a different serializer or compressor than `OrderService` +- **THEN** deserialization throws on the first received envelope (illustrating that both services must register the same payload pipeline) + +### Requirement: Default partition key preserves per-entity ordering +`OrderService` SHALL use the default `KafkaPublisherOptions.KeySelector` (`envelope => $"{envelope.EntityType}:{envelope.EntityId}"`). All changes for the same `Order` entity SHALL be published to the same Kafka partition, so `NotificationService` processes Insert / Update / Delete events for a given entity in the order they were written. The example topic SHALL be created with **at least 3 partitions** so the partition-key effect is observable (with 1 partition every message lands on the same partition regardless of key). + +#### Scenario: Same entity lands on same partition +- **WHEN** Insert, Update, and Delete events are published for the same `Order.Id` against a multi-partition topic +- **THEN** all three `DeliveryResult` records returned by `Confluent.Kafka` report the same `Partition` value + +#### Scenario: Different entities spread across partitions +- **WHEN** Insert events are published for several different `Order.Id` values against a multi-partition topic +- **THEN** the resulting partition assignments are distributed (not all messages on partition 0), demonstrating that the default key selector shards by entity + +### Requirement: Default at-most-once delivery +The example SHALL use the default `KafkaConsumerOptions.AckAfterHandler = false`. The offset is committed on the poll thread immediately after parsing the message, before dispatch to `ChangeSubscriber`. The README SHALL document this and SHALL explain how to opt in to at-least-once delivery (`AckAfterHandler = true`) and the accompanying `MaxDegreeOfParallelism = 1` requirement. + +#### Scenario: Default is at-most-once +- **WHEN** the `KafkaConsumer` is constructed in `NotificationService` +- **THEN** `KafkaConsumerOptions.AckAfterHandler` is left at its default `false`, and no explicit `MaxDegreeOfParallelism` override is set on `SubscriberOptions` + +### Requirement: Consumer group enables horizontal scaling +The `NotificationService` `GroupId` SHALL be a stable identifier (`"notification-service"`) so that multiple replicas of `NotificationService` form a single Kafka consumer group. Kafka SHALL assign disjoint subsets of the topic's partitions to each replica, parallelising consumption without any RayTree-level configuration. + +#### Scenario: Two replicas share the partitions +- **WHEN** a second instance of `NotificationService` is started with the same `GroupId` against a topic with at least 2 partitions +- **THEN** the Kafka broker triggers a consumer-group rebalance and each replica receives a non-overlapping subset of the partitions + +### Requirement: PostgreSQL outbox and repository schema +`OrderService` SHALL use `PostgreSqlOutbox` and `PostgreSqlRepository`. On startup, `InitializeAsync` SHALL automatically create or migrate the `order_outbox` table and the `orders` table (set by `[Table("orders")]` on the entity). + +#### Scenario: Schema created on first run +- **WHEN** `OrderService` starts against an empty PostgreSQL database +- **THEN** `order_outbox` and `orders` tables are created with all required columns and indexes before the publish loop starts + +#### Scenario: Schema migration is idempotent +- **WHEN** `OrderService` is restarted against a database that already has the tables +- **THEN** startup completes without error and no duplicate tables or columns are created + +### Requirement: Outbox polling interval +`OrderService` SHALL configure `OutboxPublisherOptions.PollingInterval = TimeSpan.FromMilliseconds(500)` so demonstration changes surface in Kafka within roughly half a second of being tracked, rather than the default 5 s. + +#### Scenario: Demo events appear quickly +- **WHEN** `OrderService` calls `TrackInsertAsync` +- **THEN** the corresponding `MessageEnvelope` is observable in Kafka within ~1 s under steady-state polling + +### Requirement: Docker Compose local run +A `docker-compose.yml` at the example root SHALL define services for PostgreSQL, Kafka (KRaft mode, no Zookeeper), `order-service`, and `notification-service`. Running `docker compose up` SHALL start all services without manual configuration steps. The Kafka service SHALL pin a concrete image tag (e.g. `apache/kafka:3.9.0`), not `:latest`. The `order-service` SHALL declare `depends_on` health-check dependencies on both `postgres` and `kafka`. The `notification-service` SHALL declare only a health-check dependency on `kafka` — it does NOT depend on `order-service` because Kafka auto-creates the topic on the first `Subscribe` call, so the consumer can start before the producer without losing messages (the `FromEarliest = true` consumer default replays from offset 0 when `order-service` later begins publishing). + +The Kafka healthcheck SHALL use a TCP-level probe (`nc -z localhost 9092` or equivalent) with a `start_period` of at least 60 seconds to accommodate KRaft controller election latency on cold start. The healthcheck SHALL NOT depend on `kafka-topics.sh` or other JVM-startup-bound tooling. + +#### Scenario: Services start with docker compose up +- **WHEN** a developer runs `docker compose up` from `examples/Kafka.Microservices/` +- **THEN** PostgreSQL starts on port 5432, Kafka starts on port 9092, and both .NET services connect and begin producing/consuming messages + +#### Scenario: Kafka broker address is configurable +- **WHEN** a developer sets the `KAFKA_BOOTSTRAP_SERVERS` environment variable before running +- **THEN** both services use that address instead of the default `localhost:9092` + +#### Scenario: PostgreSQL connection is configurable +- **WHEN** a developer sets the `POSTGRES_CONNECTION` environment variable before running +- **THEN** `OrderService` uses that connection string instead of the default + +### Requirement: Topic auto-creation with multiple partitions +The `docker-compose.yml` SHALL configure the Kafka broker with `KAFKA_AUTO_CREATE_TOPICS_ENABLE=true` AND `KAFKA_NUM_PARTITIONS=3` (or higher) so the `raytree.order_changes` topic is created automatically on the first produce or subscribe call with enough partitions to demonstrate per-entity routing. No manual topic administration step SHALL be required. + +#### Scenario: Topic created on first publish +- **WHEN** `OrderService` publishes its first message to `raytree.order_changes` +- **THEN** the broker creates the topic automatically with 3 partitions + +#### Scenario: Topic created on first subscribe +- **WHEN** `NotificationService` starts before `OrderService` and calls `Subscribe` +- **THEN** the broker creates the topic automatically with 3 partitions, and `NotificationService` begins receiving messages as soon as `OrderService` publishes + +### Requirement: Generic Host integration and graceful shutdown +Both services SHALL bootstrap via `Host.CreateApplicationBuilder(args)` and register RayTree through `services.AddChangeTracking(configuration, configure)` from `RayTree.Hosting`. The `IHostApplicationLifetime` SHALL drive graceful shutdown — Ctrl+C, SIGTERM, or `docker compose down` SHALL stop the publisher and consumer loops cleanly. + +#### Scenario: Ctrl+C triggers graceful shutdown +- **WHEN** a developer presses Ctrl+C in the `OrderService` console +- **THEN** `OutboxPublisherService` stops polling, in-flight publishes complete, and the process exits with code 0 + +#### Scenario: docker compose down stops cleanly +- **WHEN** a developer runs `docker compose down` +- **THEN** both services receive SIGTERM, `ChangeTrackingHostedService.StopAsync` runs, and containers exit gracefully + +### Requirement: Atomicity caveat is documented +Because `PostgreSqlRepository` and `PostgreSqlOutbox` do not share a transaction in this example, the README SHALL contain a prominent caveat explaining that a crash between the repository write and the outbox write can leave the two tables inconsistent, and SHALL point readers to `RayTree.EntityFrameworkCore` (`EntityChangeInterceptor`) as the production-grade transactional path. + +#### Scenario: Caveat is visible in README +- **WHEN** a developer reads the example README +- **THEN** a clearly marked section explains the non-atomic write path and recommends EF Core integration for production use + +### Requirement: README with run instructions +A `README.md` SHALL accompany the example explaining prerequisites, how to run with Docker Compose, what to observe in the console output, and a brief description of the project structure. It SHALL cover the following Kafka-specific topics: + +- **Default partition-key strategy** — `EntityType:EntityId` keeps per-entity ordering; how to override `KafkaPublisherOptions.KeySelector` to shard by tenant or aggregate root. +- **`FromEarliest = true` default** — a new consumer group reads from offset 0; restarting the same group resumes from the last committed offset. +- **Delivery guarantees** — the example uses the default `AckAfterHandler = false` (at-most-once); document how to switch to at-least-once (`AckAfterHandler = true`) AND the accompanying `SubscriberOptions.MaxDegreeOfParallelism = 1` requirement (Kafka offset commits are monotonic — concurrent commits could skip in-flight messages). +- **Consumer-group scaling** — running multiple `NotificationService` replicas with the same `GroupId` causes Kafka to rebalance partitions across them automatically, with no RayTree-level configuration. + +#### Scenario: Developer can follow README without prior RayTree knowledge +- **WHEN** a developer reads the README and follows its steps +- **THEN** they can run the example and see change events flowing from `OrderService` to `NotificationService` in under 5 minutes + +#### Scenario: README documents at-least-once trade-off +- **WHEN** a developer reads the delivery-guarantees section +- **THEN** they see both the flag (`AckAfterHandler = true`) and the parallelism constraint (`MaxDegreeOfParallelism = 1`) called out explicitly