Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Exclude build artefacts so they are not sent to the Docker daemon as part of the build context.
# Without this file, local bin/ and obj/ directories (which include large native Confluent.Kafka
# and Npgsql binaries) inflate the context to several GB and make every build slow.

# .NET build output
**/bin/
**/obj/

# IDE and editor directories
.vs/
.idea/
.vscode/

# Git metadata
.git/

# Local user overrides
*.user
*.suo
*.DotSettings.user

# OS noise
.DS_Store
Thumbs.db
17 changes: 17 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,23 @@ var consumer = new RabbitMqConsumer(new RabbitMqConsumerOptions
See the [Configuration Guide](configuration.md#rabbitmq-topology-wait) for the full option
reference and for consumer-factory / Generic Host patterns.

## Examples

The `examples/` directory contains two complete runnable microservice demos showing the full outbox-to-broker pipeline end to end. Both are standalone solutions (not part of `RayTree.slnx`) and start with a single `docker compose up --build`:

| Example | Broker | Key concepts demonstrated |
|---|---|---|
| [`examples/RabbitMQ.Microservices/`](../examples/RabbitMQ.Microservices/README.md) | RabbitMQ 4 (topic exchange) | Outbox → publish → consume, `WaitForTopology`, at-most-once vs at-least-once, management UI |
| [`examples/Kafka.Microservices/`](../examples/Kafka.Microservices/README.md) | Apache Kafka 3.9 (KRaft, no Zookeeper) | Partition-key ordering, `FromEarliest` consumer replay, consumer-group scaling, 3-partition routing |

Both examples share the same structure:
- `OrderService` — `PostgreSqlRepository<Order>` + `PostgreSqlOutbox<Order>` + broker publisher + `OrderSimulator` background service
- `NotificationService` — broker consumer + `OnInsert`/`OnUpdate`/`OnDelete` handlers logging via `ILogger`
- Multi-stage Dockerfiles with csproj-first layer caching (NuGet packages cached across source-only edits)
- `postgres:18.3-alpine` with the postgres 18 volume layout (`/var/lib/postgresql`)

---

## Cleanup

`EntityChangeTracker` implements `IDisposable`. Disposing it stops all publisher services:
Expand Down
22 changes: 22 additions & 0 deletions examples/Kafka.Microservices/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project>
<!--
Import the repo-root Directory.Build.props to inherit framework settings (TargetFramework,
Nullable, ImplicitUsings, LangVersion, TreatWarningsAsErrors). Then override the packaging
metadata that does not apply to the example console apps.
-->
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<PropertyGroup>
<IsPackable>false</IsPackable>
<VersionPrefix></VersionPrefix>
<VersionSuffix></VersionSuffix>
<Authors></Authors>
<Copyright></Copyright>
<PackageLicenseExpression></PackageLicenseExpression>
<RepositoryUrl></RepositoryUrl>
<RepositoryType></RepositoryType>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<IncludeSymbols>false</IncludeSymbols>
<SymbolPackageFormat></SymbolPackageFormat>
</PropertyGroup>
</Project>
13 changes: 13 additions & 0 deletions examples/Kafka.Microservices/Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project>
<!--
Import the repo-root Directory.Packages.props to inherit centrally-pinned versions
(Microsoft.Extensions.*, Npgsql, Confluent.Kafka, MessagePack, etc.), then append
example-only packages that the library projects do not consume.
-->
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Packages.props', '$(MSBuildThisFileDirectory)../'))" />

<ItemGroup>
<!-- Generic Host is needed for Host.CreateApplicationBuilder in the console apps. -->
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="10.0.8" />
</ItemGroup>
</Project>
11 changes: 11 additions & 0 deletions examples/Kafka.Microservices/Kafka.Microservices.slnx
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<Solution>
<Folder Name="/src/">
<Project Path="Shared/Shared.csproj" />
<Project Path="OrderService/OrderService.csproj" />
<Project Path="NotificationService/NotificationService.csproj" />
</Folder>
<Folder Name="/Solution Items/">
<File Path="docker-compose.yml" />
<File Path="README.md" />
</Folder>
</Solution>
37 changes: 37 additions & 0 deletions examples/Kafka.Microservices/NotificationService/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Build context: repository root (../../ from this file). The compose file sets this explicitly
# so that ProjectReference paths from NotificationService.csproj into ../../../src/RayTree.* resolve.

FROM mcr.microsoft.com/dotnet/sdk:10.0-alpine AS build
WORKDIR /src

# ── Layer 1: central build infrastructure ────────────────────────────────────────────────────────
# Copied first because it changes rarely. All subsequent layers inherit TargetFramework, Nullable,
# TreatWarningsAsErrors, and centrally-pinned NuGet versions from these files.
COPY Directory.Build.props Directory.Packages.props NuGet.config ./

# ── Layer 2: .csproj files only (restore cache) ──────────────────────────────────────────────────
# Copy project files for NotificationService and every ProjectReference in its dependency graph
# (direct: Shared + five RayTree plugins; transitive: all plugins → RayTree.Core).
# NotificationService has no PostgreSQL dependency — only the Kafka + serializer + compressor stack.
COPY src/RayTree.Core/RayTree.Core.csproj src/RayTree.Core/
COPY src/RayTree.Hosting/RayTree.Hosting.csproj src/RayTree.Hosting/
COPY src/RayTree.Plugins.Kafka/RayTree.Plugins.Kafka.csproj src/RayTree.Plugins.Kafka/
COPY src/RayTree.Plugins.Serializers.MessagePack/RayTree.Plugins.Serializers.MessagePack.csproj src/RayTree.Plugins.Serializers.MessagePack/
COPY src/RayTree.Plugins.Compressors.Gzip/RayTree.Plugins.Compressors.Gzip.csproj src/RayTree.Plugins.Compressors.Gzip/
COPY examples/Kafka.Microservices/Directory.Build.props examples/Kafka.Microservices/
COPY examples/Kafka.Microservices/Directory.Packages.props examples/Kafka.Microservices/
COPY examples/Kafka.Microservices/Shared/Shared.csproj examples/Kafka.Microservices/Shared/
COPY examples/Kafka.Microservices/NotificationService/NotificationService.csproj examples/Kafka.Microservices/NotificationService/
RUN dotnet restore examples/Kafka.Microservices/NotificationService/NotificationService.csproj

# ── Layer 3: source files → publish ──────────────────────────────────────────────────────────────
# Packages are already restored above; --no-restore skips the redundant network round-trip.
COPY src/ ./src/
COPY examples/Kafka.Microservices/ ./examples/Kafka.Microservices/
WORKDIR /src/examples/Kafka.Microservices/NotificationService
RUN dotnet publish -c Release -o /app /p:UseAppHost=false --no-restore

FROM mcr.microsoft.com/dotnet/runtime:10.0-alpine AS runtime
WORKDIR /app
COPY --from=build /app ./
ENTRYPOINT ["dotnet", "KafkaMicroservices.NotificationService.dll"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<RootNamespace>KafkaMicroservices.NotificationService</RootNamespace>
<AssemblyName>KafkaMicroservices.NotificationService</AssemblyName>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Core\RayTree.Core.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Hosting\RayTree.Hosting.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.Kafka\RayTree.Plugins.Kafka.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.Serializers.MessagePack\RayTree.Plugins.Serializers.MessagePack.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.Compressors.Gzip\RayTree.Plugins.Compressors.Gzip.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>
</Project>
82 changes: 82 additions & 0 deletions examples/Kafka.Microservices/NotificationService/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using KafkaMicroservices.Shared;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RayTree.Core.Models;
using RayTree.Core.Plugins;
using RayTree.Core.Plugins.Serialization;
using RayTree.Hosting;
using RayTree.Plugins.Compressors.Gzip;
using RayTree.Plugins.Kafka;
using RayTree.Plugins.Serializers.MessagePack;

var kafkaBootstrap = Environment.GetEnvironmentVariable("KAFKA_BOOTSTRAP_SERVERS") ?? "localhost:9092";

var builder = Host.CreateApplicationBuilder(args);

// KafkaConsumer requires an ILoggerFactory at construction. We reuse a dedicated console-logging
// factory for the consumer instance (the host's own pipeline is unaffected).
var pluginLoggerFactory = LoggerFactory.Create(b => b
.AddConsole()
.SetMinimumLevel(LogLevel.Information));

// Handlers close over this logger so structured log properties (OrderId, Status, etc.) flow through
// the Generic Host pipeline — level filtering, formatters, and scopes — rather than bypassing it
// via Console.WriteLine.
var handlerLogger = pluginLoggerFactory.CreateLogger("Notifications");

builder.Services.AddChangeTracking(builder.Configuration, cfg =>
{
cfg
// Payload pipeline MUST match OrderService exactly (MessagePack + Gzip).
// A mismatch here means deserialization throws on the first envelope and the consumer crashes.
.UseSerializer<IChangeSerializer>(_ => new MessagePackSerializerPlugin())
.UseCompressor<IChangeCompressor>(_ => new GzipCompressorPlugin())
.ForEntity<Order>(e => e
// FromEarliest = true (default) so a fresh consumer group replays from offset 0 — this is
// why notification-service is allowed to start before order-service. AckAfterHandler = false
// (default) means the offset is committed on the poll thread immediately after parsing
// (at-most-once). Switch to true plus SubscriberOptions.MaxDegreeOfParallelism = 1 for
// at-least-once delivery.
.UseConsumer(new KafkaConsumer(new KafkaConsumerOptions
{
BootstrapServers = kafkaBootstrap,
Topic = "raytree.order_changes",
GroupId = "notification-service",
}, pluginLoggerFactory))
// Shared-handler dispatch: all three handlers run sequentially in registration order
// on every matching delivery. Each handler binds to exactly one ChangeType.
.OnInsert(LogInsertAsync)
.OnUpdate(LogUpdateAsync)
.OnDelete(LogDeleteAsync));
});

// ChangeTrackingHostedService (registered by AddChangeTracking) drives StartAsync/StopAsync.
// IHostApplicationLifetime handles graceful shutdown for Ctrl+C and `docker compose down`.
await builder.Build().RunAsync();

// ---- handlers -------------------------------------------------------------------------------
// Local functions close over handlerLogger — non-static so they capture the variable above.

Task LogInsertAsync(EntityChange<Order> change, CancellationToken ct)
{
var order = change.State;
handlerLogger.LogInformation(
"[NOTIFY] NEW order {OrderId} — customer={Customer} total={Total:C} status={Status}",
order?.Id, order?.CustomerName, order?.TotalAmount, order?.Status);
return Task.CompletedTask;
}

Task LogUpdateAsync(EntityChange<Order> change, CancellationToken ct)
{
var order = change.State;
handlerLogger.LogInformation(
"[NOTIFY] UPDATED order {OrderId} — status={Status} total={Total:C}",
order?.Id, order?.Status, order?.TotalAmount);
return Task.CompletedTask;
}

Task LogDeleteAsync(EntityChange<Order> change, CancellationToken ct)
{
handlerLogger.LogInformation("[NOTIFY] DELETED order {OrderId}", change.EntityId);
return Task.CompletedTask;
}
39 changes: 39 additions & 0 deletions examples/Kafka.Microservices/OrderService/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Build context: repository root (../../ from this file). The compose file sets this explicitly
# so that ProjectReference paths from OrderService.csproj into ../../../src/RayTree.* resolve.

FROM mcr.microsoft.com/dotnet/sdk:10.0-alpine AS build
WORKDIR /src

# ── Layer 1: central build infrastructure ────────────────────────────────────────────────────────
# Copied first because it changes rarely. All subsequent layers inherit TargetFramework, Nullable,
# TreatWarningsAsErrors, and centrally-pinned NuGet versions from these files.
COPY Directory.Build.props Directory.Packages.props NuGet.config ./

# ── Layer 2: .csproj files only (restore cache) ──────────────────────────────────────────────────
# Copy project files for OrderService and every ProjectReference in its dependency graph
# (direct: Shared + six RayTree plugins; transitive: all plugins → RayTree.Core).
# Restore runs before source is copied so the NuGet package layer is reused on every build that
# does not change a .csproj or .props file — only source edits skip directly to the publish step.
COPY src/RayTree.Core/RayTree.Core.csproj src/RayTree.Core/
COPY src/RayTree.Hosting/RayTree.Hosting.csproj src/RayTree.Hosting/
COPY src/RayTree.Plugins.PostgreSQL/RayTree.Plugins.PostgreSQL.csproj src/RayTree.Plugins.PostgreSQL/
COPY src/RayTree.Plugins.Kafka/RayTree.Plugins.Kafka.csproj src/RayTree.Plugins.Kafka/
COPY src/RayTree.Plugins.Serializers.MessagePack/RayTree.Plugins.Serializers.MessagePack.csproj src/RayTree.Plugins.Serializers.MessagePack/
COPY src/RayTree.Plugins.Compressors.Gzip/RayTree.Plugins.Compressors.Gzip.csproj src/RayTree.Plugins.Compressors.Gzip/
COPY examples/Kafka.Microservices/Directory.Build.props examples/Kafka.Microservices/
COPY examples/Kafka.Microservices/Directory.Packages.props examples/Kafka.Microservices/
COPY examples/Kafka.Microservices/Shared/Shared.csproj examples/Kafka.Microservices/Shared/
COPY examples/Kafka.Microservices/OrderService/OrderService.csproj examples/Kafka.Microservices/OrderService/
RUN dotnet restore examples/Kafka.Microservices/OrderService/OrderService.csproj

# ── Layer 3: source files → publish ──────────────────────────────────────────────────────────────
# Packages are already restored above; --no-restore skips the redundant network round-trip.
COPY src/ ./src/
COPY examples/Kafka.Microservices/ ./examples/Kafka.Microservices/
WORKDIR /src/examples/Kafka.Microservices/OrderService
RUN dotnet publish -c Release -o /app /p:UseAppHost=false --no-restore

FROM mcr.microsoft.com/dotnet/runtime:10.0-alpine AS runtime
WORKDIR /app
COPY --from=build /app ./
ENTRYPOINT ["dotnet", "KafkaMicroservices.OrderService.dll"]
21 changes: 21 additions & 0 deletions examples/Kafka.Microservices/OrderService/OrderService.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<RootNamespace>KafkaMicroservices.OrderService</RootNamespace>
<AssemblyName>KafkaMicroservices.OrderService</AssemblyName>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Core\RayTree.Core.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Hosting\RayTree.Hosting.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.PostgreSQL\RayTree.Plugins.PostgreSQL.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.Kafka\RayTree.Plugins.Kafka.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.Serializers.MessagePack\RayTree.Plugins.Serializers.MessagePack.csproj" />
<ProjectReference Include="..\..\..\src\RayTree.Plugins.Compressors.Gzip\RayTree.Plugins.Compressors.Gzip.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>
</Project>
Loading
Loading