Skip to content

Latest commit

 

History

History
686 lines (543 loc) · 29.4 KB

File metadata and controls

686 lines (543 loc) · 29.4 KB

Configuration Guide

The primary configuration API is accessed via EntityChangeTracker.Create(), which returns IChangeTrackingBuilder. It registers per-entity plugins, sets global defaults, and produces an EntityChangeTracker via Build() / BuildAsync().

EntityChangeTracker.Create()

Per-Entity Configuration

ForEntity<T> accepts a callback that scopes all per-entity configuration. The parent builder is always returned, so multiple entity registrations chain cleanly:

var builder = EntityChangeTracker.Create();

builder
    .ForEntity<Product>(e => e
        .UseOutbox(new PostgreSqlOutbox<Product>(new PostgreSqlOutboxOptions
        {
            ConnectionString = connectionString
            // OutboxTableName defaults to "product_outbox"
        }))
        .UsePublisher(new InMemoryQueue())
        .UseSerializer(new JsonSerializerPlugin())
        .UseCompressor(new GzipCompressorPlugin()))
    .ForEntity<Order>(e => e
        .UseOutbox(new PostgreSqlOutbox<Order>(new PostgreSqlOutboxOptions
        {
            ConnectionString = connectionString
            // OutboxTableName defaults to "order_outbox"
        }))
        .UsePublisher(new InMemoryQueue())
        .UseSerializer(new ProtobufSerializerPlugin())
        .UseCompressor(new Lz4CompressorPlugin()));

var tracker = builder.Build();

Global Serializer / Compressor

Extension methods on IChangeTrackingBuilder set a default factory applied to every entity type that does not have an explicit override:

var builder = EntityChangeTracker.Create();
builder.UseJsonSerializer();      // RayTree.Plugins.Serializers.Json
builder.UseGzipCompressor();      // RayTree.Plugins.Compressors.Gzip
// builder.UseProtobufSerializer()
// builder.UseMessagePackSerializer()
// builder.UseLz4Compressor()
// builder.UseBrotliCompressor()
// builder.UseNoOpCompressor()

builder.ForEntity<Product>(e => e
    .UseOutbox(new PostgreSqlOutbox<Product>(new PostgreSqlOutboxOptions
    {
        ConnectionString = connectionString
    }))
    .UsePublisher(new InMemoryQueue()));
// Inherits JsonSerializer + GzipCompressor from global defaults

var tracker = builder.Build();

Publisher Options

Control the polling interval, batch size, and retry behaviour for all OutboxPublisherService instances:

builder.UsePublisherOptions(opt =>
{
    opt.PollingInterval = TimeSpan.FromSeconds(5);
    opt.BatchSize       = 100;
    opt.MaxRetryCount   = 3;
    opt.RetryDelay      = TimeSpan.FromSeconds(2);
});

PostgreSQL Repository

Register a source table alongside the outbox:

builder.ForEntity<Product>(e => e
    .UseRepository(new PostgreSqlRepository<Product>(new PostgreSqlRepositoryOptions
    {
        ConnectionString = connectionString
        // TableName defaults to "product"
    }))
    .UseOutbox(new PostgreSqlOutbox<Product>(new PostgreSqlOutboxOptions
    {
        ConnectionString = connectionString
    }))
    .UsePublisher(new InMemoryQueue())
    .UseSerializer(new JsonSerializerPlugin())
    .UseCompressor(new GzipCompressorPlugin()));

Outbox Notification Mode (Low-Latency)

Enable PostgreSQL NOTIFY/LISTEN on the outbox, then create a NotificationBasedPublisher alongside the tracker:

var outboxOptions = new PostgreSqlOutboxOptions
{
    ConnectionString = connectionString
};
outboxOptions.UseNotificationChannel("product_notify")
             .WithFallbackPolling(TimeSpan.FromSeconds(30));

builder.ForEntity<Product>(e => e
    .UseOutbox(new PostgreSqlOutbox<Product>(outboxOptions))
    .UsePublisher(new InMemoryQueue())
    .UseSerializer(new JsonSerializerPlugin())
    .UseCompressor(new GzipCompressorPlugin()));

var tracker = builder.Build(); // creates table + trigger

var notificationPublisher = new NotificationBasedPublisher(
    tracker,
    new NotificationBasedPublisherOptions
    {
        ConnectionString        = connectionString,
        ChannelName             = "product_notify",
        FallbackPollingInterval = TimeSpan.FromSeconds(30)
    },
    loggerFactory);  // ILoggerFactory — required; use NullLoggerFactory.Instance in tests

await notificationPublisher.StartAsync();

See trigger-setup.md for full details and hosting in ASP.NET Core.

ChangeTrackingConfiguration

ChangeTrackingConfiguration is a thin wrapper around IChangeTrackingBuilder that adds WithPollingInterval() and WithBatchSize() convenience methods. It does not expose per-entity fluent configuration — use EntityChangeTracker.Create() directly for most scenarios.

var config = new ChangeTrackingConfiguration()
    .WithPollingInterval(TimeSpan.FromSeconds(5))
    .WithBatchSize(50);

// Register per-entity via the underlying builder factory methods
config.UseSerializer<IChangeSerializer>(_ => new JsonSerializerPlugin());
config.UseCompressor<IChangeCompressor>(_ => new GzipCompressorPlugin());
config.UseOutbox<IOutbox>(_ => new InMemoryOutbox());
config.UsePublisher<IQueuePublisher>(_ => new InMemoryQueue());

var tracker = config.Build();

Tracking Changes

// Typed convenience methods — State is captured automatically
await tracker.TrackInsertAsync(new Product { Id = 1, Name = "Widget" });
await tracker.TrackUpdateAsync(new Product { Id = 1, Name = "Widget Pro" });
await tracker.TrackDeleteAsync(new Product { Id = 1, Name = "Widget Pro" });

// Generic overload (when change type is dynamic)
await tracker.TrackChangeAsync(entity, ChangeType.Insert);

ChangeSubscriberBuilder

Configure the subscriber side using ChangeSubscriberBuilder. Global defaults (serializer, compressor, options) apply to every entity registration; per-entity callbacks can override any of them. The builder produces a ChangeSubscriber via Build().

Basic usage

var subscriber = new ChangeSubscriberBuilder()
    .UseSerializer(new JsonSerializerPlugin())   // global default
    .UseCompressor(new GzipCompressorPlugin())   // global default
    .UseOptions(opt =>
    {
        opt.MaxRetries    = 3;
        opt.RetryDelay    = TimeSpan.FromSeconds(1);
        opt.SkipOnFailure = false;
    })
    .ForEntity<Order>(e => e
        .UseConsumer(myConsumer)                 // IQueueConsumer for Order messages
        .OnInsert(async (change, ct) =>
        {
            var order = change.State;            // fully-typed Order
            Console.WriteLine($"New order: {order?.Id}");
        })
        .OnUpdate(async (change, ct) => { /* ... */ })
        .OnDelete(async (change, ct) => { /* ... */ }))
    .Build();

Multiple entities with global defaults

Set a serializer and compressor once globally, then register each entity with only the overrides it needs:

var subscriber = new ChangeSubscriberBuilder()
    .UseSerializer(new JsonSerializerPlugin())
    .UseCompressor(new GzipCompressorPlugin())
    .ForEntity<Order>(e => e
        .UseConsumer(orderConsumer)
        // inherits global serializer + compressor
        .OnInsert(async (change, ct) => { /* ... */ }))
    .ForEntity<Product>(e => e
        .UseSerializer(new ProtobufSerializerPlugin())  // per-entity override
        .UseConsumer(productConsumer)
        .OnInsert(async (change, ct) => { /* ... */ }))
    .Build();

Per-entity options override

Fine-tune retry behaviour for individual entity types while keeping global defaults for others:

var subscriber = new ChangeSubscriberBuilder()
    .UseOptions(opt => opt.MaxRetries = 2)   // global default
    .ForEntity<Order>(e => e
        .UseOptions(opt => opt.MaxRetries = 5)  // Order-only override
        .UseConsumer(orderConsumer)
        .OnInsert(async (change, ct) => { /* ... */ }))
    .ForEntity<Product>(e => e
        .UseConsumer(productConsumer)
        // inherits MaxRetries = 2 from global
        .OnInsert(async (change, ct) => { /* ... */ }))
    .Build();

Kafka publisher — partition key

KafkaPublisherOptions.KeySelector controls which Kafka partition key is stamped on each outgoing message. Messages with the same key are guaranteed to land on the same partition, so they are consumed in order.

The default selector uses EntityType:EntityId:

builder.ForEntity<Order>(e => e
    .UsePublisher(new KafkaPublisher(new KafkaPublisherOptions
    {
        BootstrapServers = "localhost:9092",
        Topic            = "orders"
        // KeySelector defaults to envelope => $"{envelope.EntityType}:{envelope.EntityId}"
    })));

Override KeySelector to shard by a different field. For example, to shard by tenant so all tenant changes land on the same partition — and different tenants can be processed in parallel by separate consumer-group members:

new KafkaPublisherOptions
{
    BootstrapServers = "localhost:9092",
    Topic            = "orders",
    KeySelector      = envelope => envelope.EntityId.Split(':')[0]  // "tenantId:entityId" → tenantId
}

Or use any envelope metadata — change type, entity type, a custom field embedded in EntityId, etc. The selector runs on the publisher side; the consumer side is unaffected.

Consumer-group parallelism

Kafka distributes partitions across all members of a consumer group. To process different entities (or entity key ranges) in parallel, run multiple KafkaConsumer instances that share the same GroupId and point at the same topic — Kafka assigns each instance a disjoint set of partitions automatically. No RayTree configuration is needed beyond the standard KafkaConsumerOptions:

// Instance A and Instance B both use GroupId = "order-processors"
// Kafka assigns ~half the partitions to each.
new KafkaConsumerOptions
{
    BootstrapServers = "localhost:9092",
    Topic            = "orders",
    GroupId          = "order-processors"
}

With AckAfterHandler = true, keep MaxDegreeOfParallelism = 1 per consumer instance (offset commits are monotonic — out-of-order commits can skip messages).

Broker-specific queue helpers

Call the broker extension inside the ForEntity callback:

// Kafka
.ForEntity<Order>(e => e
    .UseKafka(opt =>
    {
        opt.BootstrapServers = "localhost:9092";
        opt.Topic            = "orders";
        opt.GroupId          = "my-service";
    })
    .OnInsert(async (change, ct) => { /* ... */ }))

// RabbitMQ
.ForEntity<Order>(e => e
    .UseRabbitMq(opt =>
    {
        opt.HostName  = "localhost";
        opt.QueueName = "orders";
    })
    .OnInsert(async (change, ct) => { /* ... */ }))

RabbitMQ publisher — routing key

RabbitMqPublisherOptions.RoutingKeySelector controls the AMQP routing key stamped on each message. On a topic exchange, consumers bind queues with wildcard patterns to receive only the messages they need — that is how RabbitMQ routes and parallelises processing.

The default produces {RoutingKey}.{EntityType}.{changeType} (e.g. change.Order.insert):

builder.ForEntity<Order>(e => e
    .UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions
    {
        ExchangeName = "entity_changes",
        RoutingKey   = "change"
        // RoutingKeySelector is null → falls back to "change.Order.insert" / "change.Order.update" etc.
    })));

Override RoutingKeySelector to route by any envelope field. For example, to shard by tenant so each tenant's messages land on a dedicated queue:

new RabbitMqPublisherOptions
{
    ExchangeName       = "entity_changes",
    RoutingKeySelector = envelope => $"change.{envelope.EntityId.Split(':')[0]}.{envelope.EntityType}"
    // "tenantId:entityId" → "change.tenantId.Order"
    // Consumer binds with "change.acme.*" to receive only ACME tenant messages
}

When RoutingKeySelector is set it takes full control of the key; the RoutingKey base prefix is ignored.

RabbitMQ topology wait

In microservice deployments one service owns and declares the exchange or queue while other services consume it. If the owning service has not yet started when InitializeAsync is called, the broker returns NOT_FOUND and the client crashes. The opt-in topology wait loop retries with AMQP passive declares until the topology appears.

Three options control the wait on both RabbitMqPublisherOptions and RabbitMqConsumerOptions:

Option Default Description
WaitForTopology false Enable the wait loop.
TopologyWaitInterval 5 s Delay between passive-declare attempts.
TopologyWaitTimeout null (unlimited) Hard deadline; null means retry until cancellation.

When does the publisher probe? When WaitForTopology = true and DeclareExchange = false. The publisher probes ExchangeName with ExchangeDeclarePassiveAsync before connecting.

When does the consumer probe?

  • When WaitForTopology = true and DeclareQueue = false — probes QueueName with QueueDeclarePassiveAsync.
  • When WaitForTopology = true and ExchangeName is non-empty — also probes the exchange with ExchangeDeclarePassiveAsync before QueueBindAsync.

Only NOT_FOUND (404) retries. All other errors (PRECONDITION_FAILED, ACCESS_REFUSED, connection failures) propagate immediately.

// Publisher — waits for an exchange declared by another service
builder.ForEntity<Order>(e => e
    .UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions
    {
        HostName              = "rabbitmq",
        ExchangeName          = "entity_changes",
        DeclareExchange       = false,        // owned by topology-service
        WaitForTopology       = true,
        TopologyWaitInterval  = TimeSpan.FromSeconds(2),
        TopologyWaitTimeout   = TimeSpan.FromMinutes(5)
    })));

// Consumer — waits for a queue and binding exchange
builder.ForEntity<Order>(e => e
    .UseSerializer(new JsonSerializerPlugin())
    .UseConsumer(new RabbitMqConsumer(new RabbitMqConsumerOptions
    {
        HostName              = "rabbitmq",
        QueueName             = "order-events",
        ExchangeName          = "entity_changes",
        DeclareQueue          = false,        // queue owned externally
        WaitForTopology       = true,
        TopologyWaitInterval  = TimeSpan.FromSeconds(2)
        // TopologyWaitTimeout = null → retry until CancellationToken is cancelled
    }))
    .OnInsert(async (change, ct) => { /* ... */ }));

The default (WaitForTopology = false) is unchanged — a missing exchange or queue surfaces the underlying OperationInterruptedException immediately.

Kafka topic wait

The Kafka analogue of WaitForTopology for deployments where the topic owner (often a dedicated "schema-owner" pod) comes up after the publisher / consumer that depends on it. When WaitForTopic = true, InitializeAsync probes the broker via IAdminClient.GetMetadata and retries until the topic is reported available.

Three options on both KafkaPublisherOptions and KafkaConsumerOptions:

Option Default Description
WaitForTopic false Enable the wait loop.
TopicWaitInterval 5 s Delay between metadata probe attempts.
TopicWaitTimeout null (unlimited) Hard deadline; null means retry until cancellation.

Retryable responses — the probe retries on any of:

  • Empty Metadata.Topics collection (some broker versions return no entry rather than a placeholder).
  • Per-topic ErrorCode.UnknownTopicOrPart.
  • Per-topic ErrorCode.LeaderNotAvailable (transient during cluster bootstrap and partition-leader election).
  • Transient transport-level KafkaExceptions: Local_Transport (connection refused / socket closed), Local_AllBrokersDown, Local_Resolve (DNS failure), Local_TimedOut. This covers the common startup race where the broker pod has not yet finished starting.

All other broker errors, fatal KafkaExceptions (Error.IsFatal == true), and OperationCanceledException propagate immediately.

Probe placement

  • The publisher probes inside the lazy producer-init path so both InitializeAsync and PublishAsync benefit (the probe runs at most once per KafkaPublisher lifetime, then a volatile bool flag short-circuits subsequent calls).
  • The consumer probes before allocating the native IConsumer handle and re-checks cancellation immediately after, so a Ctrl+C during a slow probe never leaks a librdkafka handle.

Cancellation latency. Each GetMetadata call is bounded by a small fixed timeout (~1 s), decoupled from TopicWaitInterval. This keeps shutdown-thread-pinning small regardless of how long you set the interval to.

// Publisher — waits for an externally-owned topic
builder.ForEntity<Order>(e => e
    .UsePublisher(new KafkaPublisher(new KafkaPublisherOptions
    {
        BootstrapServers   = "kafka:9092",
        Topic              = "orders.events",
        WaitForTopic       = true,
        TopicWaitInterval  = TimeSpan.FromSeconds(2),
        TopicWaitTimeout   = TimeSpan.FromMinutes(5)
    }, loggerFactory))); // pass loggerFactory so probe progress is observable

// Consumer — waits for the same topic
builder.ForEntity<Order>(e => e
    .UseSerializer(new JsonSerializerPlugin())
    .UseKafka(o =>
    {
        o.BootstrapServers  = "kafka:9092";
        o.Topic             = "orders.events";
        o.GroupId           = "orders-service";
        o.WaitForTopic      = true;
        o.TopicWaitInterval = TimeSpan.FromSeconds(2);
        // TopicWaitTimeout = null → retry until CancellationToken is cancelled
    }, loggerFactory)
    .OnInsert(async (change, ct) => { /* ... */ }));

Caveats

  • Auto-create. Brokers with auto.create.topics.enable=true (the default on many distributions, including the stock confluentinc/cp-kafka image) will create the topic in response to the metadata probe itself, defeating the wait. Set the broker option to false in deployments that depend on this feature.
  • Sync Build() + TopicWaitTimeout = null. The synchronous ChangeTrackingBuilder.Build() overload (which AddChangeTracking uses) does not plumb a cancellation token through. With an unbounded wait, host startup will block indefinitely with no SIGTERM escape. Either set a non-null timeout, or use BuildAsync(cancellationToken) with the host's ApplicationStopping token.
  • Logger plumbing. The publisher's ILoggerFactory? parameter and the subscriber-side UseKafka(configure, loggerFactory) overload both default to silent (NullLoggerFactory.Instance). Pass the host's logger factory explicitly when using WaitForTopic so the first-miss / recovery Information logs are visible — otherwise a stuck startup is silently invisible.

The default (WaitForTopic = false) is unchanged — a missing topic surfaces as UnknownTopicOrPart on the first ProduceAsync (publisher) or as silent no-message returns from Consume (consumer).

// InMemory (testing)
.ForEntity<Order>(e => e
    .UseInMemoryQueue(inMemoryQueue)
    .OnInsert(async (change, ct) => { /* ... */ }))

ASP.NET Core (DI)

AddChangeSubscriber registers ChangeSubscriber as a singleton and ChangeSubscriberHostedService as a hosted service. It returns IChangeSubscriberBuilder so you chain entity registrations directly. Options are bound from the ChangeTracking:Subscriber configuration section:

builder.Services
    .AddChangeSubscriber(builder.Configuration)
    .UseRedisDeduplication(multiplexer)          // optional; default is in-memory
    .ForEntity<Order>(e => e
        .UseInMemoryQueue(orderQueue)
        .UseSerializer(new JsonSerializerPlugin())
        .UseCompressor(new GzipCompressorPlugin())
        .OnInsert(async (change, ct) =>
            Console.WriteLine($"New order: {change.State?.Id}")));

appsettings.json:

{
  "ChangeTracking": {
    "Subscriber": {
      "MaxRetries": 3,
      "RetryDelay": "00:00:01",
      "SkipOnFailure": false
    }
  }
}

Deduplication

Every processed CorrelationId is recorded so duplicate deliveries (at-least-once brokers) are silently dropped.

Store Package When to use
InMemoryDeduplicationStore built-in Single-process, testing
RedisDeduplicationStore RayTree.Plugins.Deduplication.Redis Multiple subscriber instances or cross-restart dedup
// Redis — supply an IConnectionMultiplexer from StackExchange.Redis
using StackExchange.Redis;
using RayTree.Plugins.Deduplication.Redis;

var multiplexer = await ConnectionMultiplexer.ConnectAsync("localhost:6379");

subscriber = new ChangeSubscriberBuilder()
    .UseRedisDeduplication(multiplexer)                 // default options
    .ForEntity<Order>(e => e /* ... */)
    .Build();

// With custom options
subscriber = new ChangeSubscriberBuilder()
    .UseRedisDeduplication(multiplexer, opt =>
    {
        opt.KeyPrefix       = "my-service";  // namespace on shared Redis; default "default"
        opt.RetentionPeriod = TimeSpan.FromHours(48);
        opt.Database        = 1;             // logical DB index; default -1 (connection default)
    })
    .ForEntity<Order>(e => e /* ... */)
    .Build();

// Custom store
subscriber = new ChangeSubscriberBuilder()
    .UseDeduplicationStore(new MyCustomStore())
    .ForEntity<Order>(e => e /* ... */)
    .Build();

Logging

RayTree uses Microsoft.Extensions.Logging throughout. All runtime service classes require a logger — there is no silent NullLogger fallback inside services.

Standalone (no DI)

Pass an ILoggerFactory to EntityChangeTracker.Create():

// No logging (tests, scripts)
var tracker = EntityChangeTracker.Create().Build();

// With logging
using var loggerFactory = LoggerFactory.Create(b => b.AddConsole());
var tracker = EntityChangeTracker.Create(loggerFactory).Build();

EntityChangeTracker.Create() normalises null to NullLoggerFactory.Instance, so calling EntityChangeTracker.Create() without an argument produces a working tracker with no log output.

ASP.NET Core (DI)

AddChangeTracking resolves ILoggerFactory from the DI container automatically:

builder.Services.AddLogging(b => b.AddConsole()); // standard host setup
builder.Services.AddChangeTracking(builder.Configuration, tracking => { ... });
// No UseLoggerFactory call needed — the host's ILoggerFactory is wired in automatically

Broker-specific consumers

KafkaConsumer and RabbitMqConsumer require ILoggerFactory as a second constructor argument. When constructing them directly outside a builder, pass the factory explicitly:

// In tests
var consumer = new KafkaConsumer(options, NullLoggerFactory.Instance);

// In production code
var consumer = new KafkaConsumer(options, loggerFactory);

When using the .UseKafka(...) / .UseRabbitMq(...) extension methods inside a ForEntity callback, NullLoggerFactory.Instance is used internally — to get real logging from these consumers, construct them directly and pass to .UseConsumer(consumer).

What gets logged

Configuration & build phase

Class Level When Structured properties
ChangeTrackingBuilder Information Each global Use* call (UseOutbox, UsePublisher, UseSerializer, UseCompressor, UseRepository, UseDeduplicationStore, UseMeter, UsePublisherOptions, UseSubscriberOptions) {Plugin}
ChangeTrackingBuilder Information ForEntity<TEntity> invocation {EntityType}
ChangeTrackingBuilder Debug BuildInternal falls back to a default RayTreeMeter (no UseMeter)
ChangeTrackingBuilder Information "ChangeTracker built" summary, once per Build() / BuildAsync() {EntityTypes}, {Plugins}, {HasCustomMeter}, {HasCustomDeduplicationStore}, {HasCustomLoggerFactory}
EntityBuilder<TEntity> Debug Per-entity publisher overrides (UseOutbox, UsePublisher, UseSerializer, UseCompressor, UseRepository, UseSubscriberOptions, UseConsumer, UseConsumerFactory) {EntityType}, {Override}, {Plugin}
SharedHandlerBuilder<TEntity> Debug OnInsert / OnUpdate / OnDelete / OnChange handler registrations {EntityType}, {Override}, {Plugin}
IsolatedHandlerBuilder<TEntity> Debug Named handler registrations {EntityType}, {Override} (e.g. OnInsert:audit), {Plugin}

Tracker lifecycle

Class Level When Structured properties
EntityChangeTracker Information "tracker initialization started" — entered InitializeAsync
EntityChangeTracker Debug Publisher init completed {EntityTypeCount}
EntityChangeTracker Debug Consumer connections initialized {ConsumerCount}
EntityChangeTracker Information "tracker initialization completed" — success
EntityChangeTracker Warning "tracker initialization aborted" — abort point marker; inner plugin's Error carries the exception payload
ChangeTrackingHostedService Information "ChangeTracking starting" — DI startup, once per host {ConfigurationBound}

Runtime

Class Level When
OutboxPublisherService Information Polling loop start / stop
OutboxPublisherService Warning Per-retry publish failure
OutboxPublisherService Error Batch error; retries exhausted
ChangeSubscriber Warning Unknown entity type in envelope
ChangeSubscriber Debug Dedup hit; no handlers matched
ChangeSubscriber Warning Handler retry attempt
ChangeSubscriber Error Handler dropped (SkipOnFailure)
ChangePublisher Information Publisher service registered per entity
EntityChangeTracker Information Consumer loop start per entity type / handler (in StartAsync)
ChangeTrackingHostedService Information Service stop
NotificationBasedPublisher Information Start / stop
NotificationBasedPublisher Warning Listen-loop error; fallback-poll error; per-change publish failure
KafkaConsumer Error Fatal Kafka error
KafkaConsumer Warning Consume error; envelope parse failure
RabbitMqConsumer Warning Message processing error (before requeue)

All configuration- and lifecycle-time log calls are guarded by ILogger.IsEnabled(...), so under NullLoggerFactory.Instance they produce zero allocations and zero output. Each builder owns an ILogger<Self> so per-category filtering works as expected (e.g. silence Debug from IsolatedHandlerBuilder while keeping Information from ChangeTrackingBuilder).

Example startup output

For a tracker configured with one global serializer + compressor and two entities, the captured log stream looks like:

info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
      ChangeTracking: registered global serializer JsonSerializerPlugin
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
      ChangeTracking: registered global compressor NoOpCompressorPlugin
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
      ChangeTracking: configuring entity Order
dbug: RayTree.Core.Tracking.EntityBuilder`1[Order][0]
      ChangeTracking: entity override applied EntityType=Order Override=Outbox Plugin=PostgreSqlOutbox`1
dbug: RayTree.Core.Handling.SharedHandlerBuilder`1[Order][0]
      ChangeTracking: entity override applied EntityType=Order Override=OnInsert Plugin=MyService
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
      ChangeTracker built. EntityTypes=["Order", "Customer"] Plugins=Outbox=<none> Publisher=<none> Serializer=JsonSerializerPlugin Compressor=NoOpCompressorPlugin Repository=<none> HasCustomMeter=False HasCustomDeduplicationStore=False HasCustomLoggerFactory=True
info: RayTree.Core.Tracking.EntityChangeTracker[0]
      ChangeTracking: tracker initialization started
dbug: RayTree.Core.Tracking.EntityChangeTracker[0]
      ChangeTracking: publisher initialized EntityTypeCount=2
dbug: RayTree.Core.Tracking.EntityChangeTracker[0]
      ChangeTracking: consumers initialized ConsumerCount=1
info: RayTree.Core.Tracking.EntityChangeTracker[0]
      ChangeTracking: tracker initialization completed
info: RayTree.Hosting.ChangeTrackingHostedService[0]
      ChangeTracking starting. ConfigurationBound=True

On failure, the inner plugin emits the Error with the exception, and the tracker emits a single Warning marking the abort point so operators can grep back to the cause without losing context.

Observability — OpenTelemetry Metrics

RayTree emits System.Diagnostics.Metrics instruments on a Meter named "RayTree" (counters, histograms, and an observable gauge for outbox depth). Instrument calls are silent no-ops when no listener is attached, so there is no overhead for consumers that opt out.

Default (built-in meter)

EntityChangeTracker.Create() creates a RayTreeMeter automatically and EntityChangeTracker disposes it. To collect the metrics, attach a MeterListener to the meter named "RayTree", or use the OTel SDK via the RayTree.OpenTelemetry package:

services.AddOpenTelemetry()
    .WithMetrics(b => b
        .AddRayTreeMetrics()
        .AddPrometheusExporter());

Custom meter

Pass a RayTreeMeter instance to share it across trackers or to control its lifetime:

var meter = new RayTreeMeter();
var tracker = EntityChangeTracker.Create(loggerFactory)
    .UseMeter(meter)
    .ForEntity<Order>(/* ... */)
    .Build();
// Caller-supplied meter is NOT disposed by the tracker.

Full instrument inventory, unit conventions, suggested bucket boundaries, and sample dashboard queries are in opentelemetry-metrics.md.

Cleanup

// EntityChangeTracker is IDisposable — stops all publisher services
tracker.Dispose();

// Or use 'using'
using var tracker = builder.Build();