The primary configuration API is accessed via EntityChangeTracker.Create(), which returns IChangeTrackingBuilder. It registers per-entity plugins, sets global defaults, and produces an EntityChangeTracker via Build() / BuildAsync().
ForEntity<T> accepts a callback that scopes all per-entity configuration. The parent builder is always returned, so multiple entity registrations chain cleanly:
var builder = EntityChangeTracker.Create();
builder
.ForEntity<Product>(e => e
.UseOutbox(new PostgreSqlOutbox<Product>(new PostgreSqlOutboxOptions
{
ConnectionString = connectionString
// OutboxTableName defaults to "product_outbox"
}))
.UsePublisher(new InMemoryQueue())
.UseSerializer(new JsonSerializerPlugin())
.UseCompressor(new GzipCompressorPlugin()))
.ForEntity<Order>(e => e
.UseOutbox(new PostgreSqlOutbox<Order>(new PostgreSqlOutboxOptions
{
ConnectionString = connectionString
// OutboxTableName defaults to "order_outbox"
}))
.UsePublisher(new InMemoryQueue())
.UseSerializer(new ProtobufSerializerPlugin())
.UseCompressor(new Lz4CompressorPlugin()));
var tracker = builder.Build();Extension methods on IChangeTrackingBuilder set a default factory applied to every entity type that does not have an explicit override:
var builder = EntityChangeTracker.Create();
builder.UseJsonSerializer(); // RayTree.Plugins.Serializers.Json
builder.UseGzipCompressor(); // RayTree.Plugins.Compressors.Gzip
// builder.UseProtobufSerializer()
// builder.UseMessagePackSerializer()
// builder.UseLz4Compressor()
// builder.UseBrotliCompressor()
// builder.UseNoOpCompressor()
builder.ForEntity<Product>(e => e
.UseOutbox(new PostgreSqlOutbox<Product>(new PostgreSqlOutboxOptions
{
ConnectionString = connectionString
}))
.UsePublisher(new InMemoryQueue()));
// Inherits JsonSerializer + GzipCompressor from global defaults
var tracker = builder.Build();Control the polling interval, batch size, and retry behaviour for all OutboxPublisherService instances:
builder.UsePublisherOptions(opt =>
{
opt.PollingInterval = TimeSpan.FromSeconds(5);
opt.BatchSize = 100;
opt.MaxRetryCount = 3;
opt.RetryDelay = TimeSpan.FromSeconds(2);
});Register a source table alongside the outbox:
builder.ForEntity<Product>(e => e
.UseRepository(new PostgreSqlRepository<Product>(new PostgreSqlRepositoryOptions
{
ConnectionString = connectionString
// TableName defaults to "product"
}))
.UseOutbox(new PostgreSqlOutbox<Product>(new PostgreSqlOutboxOptions
{
ConnectionString = connectionString
}))
.UsePublisher(new InMemoryQueue())
.UseSerializer(new JsonSerializerPlugin())
.UseCompressor(new GzipCompressorPlugin()));Enable PostgreSQL NOTIFY/LISTEN on the outbox, then create a NotificationBasedPublisher alongside the tracker:
var outboxOptions = new PostgreSqlOutboxOptions
{
ConnectionString = connectionString
};
outboxOptions.UseNotificationChannel("product_notify")
.WithFallbackPolling(TimeSpan.FromSeconds(30));
builder.ForEntity<Product>(e => e
.UseOutbox(new PostgreSqlOutbox<Product>(outboxOptions))
.UsePublisher(new InMemoryQueue())
.UseSerializer(new JsonSerializerPlugin())
.UseCompressor(new GzipCompressorPlugin()));
var tracker = builder.Build(); // creates table + trigger
var notificationPublisher = new NotificationBasedPublisher(
tracker,
new NotificationBasedPublisherOptions
{
ConnectionString = connectionString,
ChannelName = "product_notify",
FallbackPollingInterval = TimeSpan.FromSeconds(30)
},
loggerFactory); // ILoggerFactory — required; use NullLoggerFactory.Instance in tests
await notificationPublisher.StartAsync();See trigger-setup.md for full details and hosting in ASP.NET Core.
ChangeTrackingConfiguration is a thin wrapper around IChangeTrackingBuilder that adds WithPollingInterval() and WithBatchSize() convenience methods. It does not expose per-entity fluent configuration — use EntityChangeTracker.Create() directly for most scenarios.
var config = new ChangeTrackingConfiguration()
.WithPollingInterval(TimeSpan.FromSeconds(5))
.WithBatchSize(50);
// Register per-entity via the underlying builder factory methods
config.UseSerializer<IChangeSerializer>(_ => new JsonSerializerPlugin());
config.UseCompressor<IChangeCompressor>(_ => new GzipCompressorPlugin());
config.UseOutbox<IOutbox>(_ => new InMemoryOutbox());
config.UsePublisher<IQueuePublisher>(_ => new InMemoryQueue());
var tracker = config.Build();// Typed convenience methods — State is captured automatically
await tracker.TrackInsertAsync(new Product { Id = 1, Name = "Widget" });
await tracker.TrackUpdateAsync(new Product { Id = 1, Name = "Widget Pro" });
await tracker.TrackDeleteAsync(new Product { Id = 1, Name = "Widget Pro" });
// Generic overload (when change type is dynamic)
await tracker.TrackChangeAsync(entity, ChangeType.Insert);Configure the subscriber side using ChangeSubscriberBuilder. Global defaults (serializer, compressor, options) apply to every entity registration; per-entity callbacks can override any of them. The builder produces a ChangeSubscriber via Build().
var subscriber = new ChangeSubscriberBuilder()
.UseSerializer(new JsonSerializerPlugin()) // global default
.UseCompressor(new GzipCompressorPlugin()) // global default
.UseOptions(opt =>
{
opt.MaxRetries = 3;
opt.RetryDelay = TimeSpan.FromSeconds(1);
opt.SkipOnFailure = false;
})
.ForEntity<Order>(e => e
.UseConsumer(myConsumer) // IQueueConsumer for Order messages
.OnInsert(async (change, ct) =>
{
var order = change.State; // fully-typed Order
Console.WriteLine($"New order: {order?.Id}");
})
.OnUpdate(async (change, ct) => { /* ... */ })
.OnDelete(async (change, ct) => { /* ... */ }))
.Build();Set a serializer and compressor once globally, then register each entity with only the overrides it needs:
var subscriber = new ChangeSubscriberBuilder()
.UseSerializer(new JsonSerializerPlugin())
.UseCompressor(new GzipCompressorPlugin())
.ForEntity<Order>(e => e
.UseConsumer(orderConsumer)
// inherits global serializer + compressor
.OnInsert(async (change, ct) => { /* ... */ }))
.ForEntity<Product>(e => e
.UseSerializer(new ProtobufSerializerPlugin()) // per-entity override
.UseConsumer(productConsumer)
.OnInsert(async (change, ct) => { /* ... */ }))
.Build();Fine-tune retry behaviour for individual entity types while keeping global defaults for others:
var subscriber = new ChangeSubscriberBuilder()
.UseOptions(opt => opt.MaxRetries = 2) // global default
.ForEntity<Order>(e => e
.UseOptions(opt => opt.MaxRetries = 5) // Order-only override
.UseConsumer(orderConsumer)
.OnInsert(async (change, ct) => { /* ... */ }))
.ForEntity<Product>(e => e
.UseConsumer(productConsumer)
// inherits MaxRetries = 2 from global
.OnInsert(async (change, ct) => { /* ... */ }))
.Build();KafkaPublisherOptions.KeySelector controls which Kafka partition key is stamped on each outgoing message. Messages with the same key are guaranteed to land on the same partition, so they are consumed in order.
The default selector uses EntityType:EntityId:
builder.ForEntity<Order>(e => e
.UsePublisher(new KafkaPublisher(new KafkaPublisherOptions
{
BootstrapServers = "localhost:9092",
Topic = "orders"
// KeySelector defaults to envelope => $"{envelope.EntityType}:{envelope.EntityId}"
})));Override KeySelector to shard by a different field. For example, to shard by tenant so all tenant changes land on the same partition — and different tenants can be processed in parallel by separate consumer-group members:
new KafkaPublisherOptions
{
BootstrapServers = "localhost:9092",
Topic = "orders",
KeySelector = envelope => envelope.EntityId.Split(':')[0] // "tenantId:entityId" → tenantId
}Or use any envelope metadata — change type, entity type, a custom field embedded in EntityId, etc. The selector runs on the publisher side; the consumer side is unaffected.
Kafka distributes partitions across all members of a consumer group. To process different entities (or entity key ranges) in parallel, run multiple KafkaConsumer instances that share the same GroupId and point at the same topic — Kafka assigns each instance a disjoint set of partitions automatically. No RayTree configuration is needed beyond the standard KafkaConsumerOptions:
// Instance A and Instance B both use GroupId = "order-processors"
// Kafka assigns ~half the partitions to each.
new KafkaConsumerOptions
{
BootstrapServers = "localhost:9092",
Topic = "orders",
GroupId = "order-processors"
}With AckAfterHandler = true, keep MaxDegreeOfParallelism = 1 per consumer instance (offset commits are monotonic — out-of-order commits can skip messages).
Call the broker extension inside the ForEntity callback:
// Kafka
.ForEntity<Order>(e => e
.UseKafka(opt =>
{
opt.BootstrapServers = "localhost:9092";
opt.Topic = "orders";
opt.GroupId = "my-service";
})
.OnInsert(async (change, ct) => { /* ... */ }))
// RabbitMQ
.ForEntity<Order>(e => e
.UseRabbitMq(opt =>
{
opt.HostName = "localhost";
opt.QueueName = "orders";
})
.OnInsert(async (change, ct) => { /* ... */ }))RabbitMqPublisherOptions.RoutingKeySelector controls the AMQP routing key stamped on each message. On a topic exchange, consumers bind queues with wildcard patterns to receive only the messages they need — that is how RabbitMQ routes and parallelises processing.
The default produces {RoutingKey}.{EntityType}.{changeType} (e.g. change.Order.insert):
builder.ForEntity<Order>(e => e
.UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions
{
ExchangeName = "entity_changes",
RoutingKey = "change"
// RoutingKeySelector is null → falls back to "change.Order.insert" / "change.Order.update" etc.
})));Override RoutingKeySelector to route by any envelope field. For example, to shard by tenant so each tenant's messages land on a dedicated queue:
new RabbitMqPublisherOptions
{
ExchangeName = "entity_changes",
RoutingKeySelector = envelope => $"change.{envelope.EntityId.Split(':')[0]}.{envelope.EntityType}"
// "tenantId:entityId" → "change.tenantId.Order"
// Consumer binds with "change.acme.*" to receive only ACME tenant messages
}When RoutingKeySelector is set it takes full control of the key; the RoutingKey base prefix is ignored.
In microservice deployments one service owns and declares the exchange or queue while other
services consume it. If the owning service has not yet started when InitializeAsync is called,
the broker returns NOT_FOUND and the client crashes. The opt-in topology wait loop retries with
AMQP passive declares until the topology appears.
Three options control the wait on both RabbitMqPublisherOptions and RabbitMqConsumerOptions:
| Option | Default | Description |
|---|---|---|
WaitForTopology |
false |
Enable the wait loop. |
TopologyWaitInterval |
5 s |
Delay between passive-declare attempts. |
TopologyWaitTimeout |
null (unlimited) |
Hard deadline; null means retry until cancellation. |
When does the publisher probe? When WaitForTopology = true and DeclareExchange = false.
The publisher probes ExchangeName with ExchangeDeclarePassiveAsync before connecting.
When does the consumer probe?
- When
WaitForTopology = trueandDeclareQueue = false— probesQueueNamewithQueueDeclarePassiveAsync. - When
WaitForTopology = trueandExchangeNameis non-empty — also probes the exchange withExchangeDeclarePassiveAsyncbeforeQueueBindAsync.
Only NOT_FOUND (404) retries. All other errors (PRECONDITION_FAILED, ACCESS_REFUSED,
connection failures) propagate immediately.
// Publisher — waits for an exchange declared by another service
builder.ForEntity<Order>(e => e
.UsePublisher(new RabbitMqPublisher(new RabbitMqPublisherOptions
{
HostName = "rabbitmq",
ExchangeName = "entity_changes",
DeclareExchange = false, // owned by topology-service
WaitForTopology = true,
TopologyWaitInterval = TimeSpan.FromSeconds(2),
TopologyWaitTimeout = TimeSpan.FromMinutes(5)
})));
// Consumer — waits for a queue and binding exchange
builder.ForEntity<Order>(e => e
.UseSerializer(new JsonSerializerPlugin())
.UseConsumer(new RabbitMqConsumer(new RabbitMqConsumerOptions
{
HostName = "rabbitmq",
QueueName = "order-events",
ExchangeName = "entity_changes",
DeclareQueue = false, // queue owned externally
WaitForTopology = true,
TopologyWaitInterval = TimeSpan.FromSeconds(2)
// TopologyWaitTimeout = null → retry until CancellationToken is cancelled
}))
.OnInsert(async (change, ct) => { /* ... */ }));The default (WaitForTopology = false) is unchanged — a missing exchange or queue surfaces the
underlying OperationInterruptedException immediately.
The Kafka analogue of WaitForTopology for deployments where the topic owner (often a
dedicated "schema-owner" pod) comes up after the publisher / consumer that depends on it.
When WaitForTopic = true, InitializeAsync probes the broker via
IAdminClient.GetMetadata and retries until the topic is reported available.
Three options on both KafkaPublisherOptions and KafkaConsumerOptions:
| Option | Default | Description |
|---|---|---|
WaitForTopic |
false |
Enable the wait loop. |
TopicWaitInterval |
5 s |
Delay between metadata probe attempts. |
TopicWaitTimeout |
null (unlimited) |
Hard deadline; null means retry until cancellation. |
Retryable responses — the probe retries on any of:
- Empty
Metadata.Topicscollection (some broker versions return no entry rather than a placeholder). - Per-topic
ErrorCode.UnknownTopicOrPart. - Per-topic
ErrorCode.LeaderNotAvailable(transient during cluster bootstrap and partition-leader election). - Transient transport-level
KafkaExceptions:Local_Transport(connection refused / socket closed),Local_AllBrokersDown,Local_Resolve(DNS failure),Local_TimedOut. This covers the common startup race where the broker pod has not yet finished starting.
All other broker errors, fatal KafkaExceptions (Error.IsFatal == true), and OperationCanceledException propagate immediately.
Probe placement
- The publisher probes inside the lazy producer-init path so both
InitializeAsyncandPublishAsyncbenefit (the probe runs at most once perKafkaPublisherlifetime, then avolatile boolflag short-circuits subsequent calls). - The consumer probes before allocating the native
IConsumerhandle and re-checks cancellation immediately after, so a Ctrl+C during a slow probe never leaks a librdkafka handle.
Cancellation latency. Each GetMetadata call is bounded by a small fixed timeout (~1 s),
decoupled from TopicWaitInterval. This keeps shutdown-thread-pinning small regardless of how
long you set the interval to.
// Publisher — waits for an externally-owned topic
builder.ForEntity<Order>(e => e
.UsePublisher(new KafkaPublisher(new KafkaPublisherOptions
{
BootstrapServers = "kafka:9092",
Topic = "orders.events",
WaitForTopic = true,
TopicWaitInterval = TimeSpan.FromSeconds(2),
TopicWaitTimeout = TimeSpan.FromMinutes(5)
}, loggerFactory))); // pass loggerFactory so probe progress is observable
// Consumer — waits for the same topic
builder.ForEntity<Order>(e => e
.UseSerializer(new JsonSerializerPlugin())
.UseKafka(o =>
{
o.BootstrapServers = "kafka:9092";
o.Topic = "orders.events";
o.GroupId = "orders-service";
o.WaitForTopic = true;
o.TopicWaitInterval = TimeSpan.FromSeconds(2);
// TopicWaitTimeout = null → retry until CancellationToken is cancelled
}, loggerFactory)
.OnInsert(async (change, ct) => { /* ... */ }));Caveats
- Auto-create. Brokers with
auto.create.topics.enable=true(the default on many distributions, including the stockconfluentinc/cp-kafkaimage) will create the topic in response to the metadata probe itself, defeating the wait. Set the broker option tofalsein deployments that depend on this feature. - Sync
Build()+TopicWaitTimeout = null. The synchronousChangeTrackingBuilder.Build()overload (whichAddChangeTrackinguses) does not plumb a cancellation token through. With an unbounded wait, host startup will block indefinitely with no SIGTERM escape. Either set a non-null timeout, or useBuildAsync(cancellationToken)with the host'sApplicationStoppingtoken. - Logger plumbing. The publisher's
ILoggerFactory?parameter and the subscriber-sideUseKafka(configure, loggerFactory)overload both default to silent (NullLoggerFactory.Instance). Pass the host's logger factory explicitly when usingWaitForTopicso the first-miss / recoveryInformationlogs are visible — otherwise a stuck startup is silently invisible.
The default (WaitForTopic = false) is unchanged — a missing topic surfaces as UnknownTopicOrPart on the first ProduceAsync (publisher) or as silent no-message returns from Consume (consumer).
// InMemory (testing)
.ForEntity<Order>(e => e
.UseInMemoryQueue(inMemoryQueue)
.OnInsert(async (change, ct) => { /* ... */ }))AddChangeSubscriber registers ChangeSubscriber as a singleton and ChangeSubscriberHostedService as a hosted service. It returns IChangeSubscriberBuilder so you chain entity registrations directly. Options are bound from the ChangeTracking:Subscriber configuration section:
builder.Services
.AddChangeSubscriber(builder.Configuration)
.UseRedisDeduplication(multiplexer) // optional; default is in-memory
.ForEntity<Order>(e => e
.UseInMemoryQueue(orderQueue)
.UseSerializer(new JsonSerializerPlugin())
.UseCompressor(new GzipCompressorPlugin())
.OnInsert(async (change, ct) =>
Console.WriteLine($"New order: {change.State?.Id}")));appsettings.json:
{
"ChangeTracking": {
"Subscriber": {
"MaxRetries": 3,
"RetryDelay": "00:00:01",
"SkipOnFailure": false
}
}
}Every processed CorrelationId is recorded so duplicate deliveries (at-least-once brokers) are silently dropped.
| Store | Package | When to use |
|---|---|---|
InMemoryDeduplicationStore |
built-in | Single-process, testing |
RedisDeduplicationStore |
RayTree.Plugins.Deduplication.Redis |
Multiple subscriber instances or cross-restart dedup |
// Redis — supply an IConnectionMultiplexer from StackExchange.Redis
using StackExchange.Redis;
using RayTree.Plugins.Deduplication.Redis;
var multiplexer = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
subscriber = new ChangeSubscriberBuilder()
.UseRedisDeduplication(multiplexer) // default options
.ForEntity<Order>(e => e /* ... */)
.Build();
// With custom options
subscriber = new ChangeSubscriberBuilder()
.UseRedisDeduplication(multiplexer, opt =>
{
opt.KeyPrefix = "my-service"; // namespace on shared Redis; default "default"
opt.RetentionPeriod = TimeSpan.FromHours(48);
opt.Database = 1; // logical DB index; default -1 (connection default)
})
.ForEntity<Order>(e => e /* ... */)
.Build();
// Custom store
subscriber = new ChangeSubscriberBuilder()
.UseDeduplicationStore(new MyCustomStore())
.ForEntity<Order>(e => e /* ... */)
.Build();RayTree uses Microsoft.Extensions.Logging throughout. All runtime service classes require a logger — there is no silent NullLogger fallback inside services.
Pass an ILoggerFactory to EntityChangeTracker.Create():
// No logging (tests, scripts)
var tracker = EntityChangeTracker.Create().Build();
// With logging
using var loggerFactory = LoggerFactory.Create(b => b.AddConsole());
var tracker = EntityChangeTracker.Create(loggerFactory).Build();EntityChangeTracker.Create() normalises null to NullLoggerFactory.Instance, so calling EntityChangeTracker.Create() without an argument produces a working tracker with no log output.
AddChangeTracking resolves ILoggerFactory from the DI container automatically:
builder.Services.AddLogging(b => b.AddConsole()); // standard host setup
builder.Services.AddChangeTracking(builder.Configuration, tracking => { ... });
// No UseLoggerFactory call needed — the host's ILoggerFactory is wired in automaticallyKafkaConsumer and RabbitMqConsumer require ILoggerFactory as a second constructor argument. When constructing them directly outside a builder, pass the factory explicitly:
// In tests
var consumer = new KafkaConsumer(options, NullLoggerFactory.Instance);
// In production code
var consumer = new KafkaConsumer(options, loggerFactory);When using the .UseKafka(...) / .UseRabbitMq(...) extension methods inside a ForEntity callback, NullLoggerFactory.Instance is used internally — to get real logging from these consumers, construct them directly and pass to .UseConsumer(consumer).
Configuration & build phase
| Class | Level | When | Structured properties |
|---|---|---|---|
ChangeTrackingBuilder |
Information |
Each global Use* call (UseOutbox, UsePublisher, UseSerializer, UseCompressor, UseRepository, UseDeduplicationStore, UseMeter, UsePublisherOptions, UseSubscriberOptions) |
{Plugin} |
ChangeTrackingBuilder |
Information |
ForEntity<TEntity> invocation |
{EntityType} |
ChangeTrackingBuilder |
Debug |
BuildInternal falls back to a default RayTreeMeter (no UseMeter) |
— |
ChangeTrackingBuilder |
Information |
"ChangeTracker built" summary, once per Build() / BuildAsync() |
{EntityTypes}, {Plugins}, {HasCustomMeter}, {HasCustomDeduplicationStore}, {HasCustomLoggerFactory} |
EntityBuilder<TEntity> |
Debug |
Per-entity publisher overrides (UseOutbox, UsePublisher, UseSerializer, UseCompressor, UseRepository, UseSubscriberOptions, UseConsumer, UseConsumerFactory) |
{EntityType}, {Override}, {Plugin} |
SharedHandlerBuilder<TEntity> |
Debug |
OnInsert / OnUpdate / OnDelete / OnChange handler registrations |
{EntityType}, {Override}, {Plugin} |
IsolatedHandlerBuilder<TEntity> |
Debug |
Named handler registrations | {EntityType}, {Override} (e.g. OnInsert:audit), {Plugin} |
Tracker lifecycle
| Class | Level | When | Structured properties |
|---|---|---|---|
EntityChangeTracker |
Information |
"tracker initialization started" — entered InitializeAsync |
— |
EntityChangeTracker |
Debug |
Publisher init completed | {EntityTypeCount} |
EntityChangeTracker |
Debug |
Consumer connections initialized | {ConsumerCount} |
EntityChangeTracker |
Information |
"tracker initialization completed" — success | — |
EntityChangeTracker |
Warning |
"tracker initialization aborted" — abort point marker; inner plugin's Error carries the exception payload |
— |
ChangeTrackingHostedService |
Information |
"ChangeTracking starting" — DI startup, once per host | {ConfigurationBound} |
Runtime
| Class | Level | When |
|---|---|---|
OutboxPublisherService |
Information |
Polling loop start / stop |
OutboxPublisherService |
Warning |
Per-retry publish failure |
OutboxPublisherService |
Error |
Batch error; retries exhausted |
ChangeSubscriber |
Warning |
Unknown entity type in envelope |
ChangeSubscriber |
Debug |
Dedup hit; no handlers matched |
ChangeSubscriber |
Warning |
Handler retry attempt |
ChangeSubscriber |
Error |
Handler dropped (SkipOnFailure) |
ChangePublisher |
Information |
Publisher service registered per entity |
EntityChangeTracker |
Information |
Consumer loop start per entity type / handler (in StartAsync) |
ChangeTrackingHostedService |
Information |
Service stop |
NotificationBasedPublisher |
Information |
Start / stop |
NotificationBasedPublisher |
Warning |
Listen-loop error; fallback-poll error; per-change publish failure |
KafkaConsumer |
Error |
Fatal Kafka error |
KafkaConsumer |
Warning |
Consume error; envelope parse failure |
RabbitMqConsumer |
Warning |
Message processing error (before requeue) |
All configuration- and lifecycle-time log calls are guarded by ILogger.IsEnabled(...), so under NullLoggerFactory.Instance they produce zero allocations and zero output. Each builder owns an ILogger<Self> so per-category filtering works as expected (e.g. silence Debug from IsolatedHandlerBuilder while keeping Information from ChangeTrackingBuilder).
For a tracker configured with one global serializer + compressor and two entities, the captured log stream looks like:
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
ChangeTracking: registered global serializer JsonSerializerPlugin
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
ChangeTracking: registered global compressor NoOpCompressorPlugin
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
ChangeTracking: configuring entity Order
dbug: RayTree.Core.Tracking.EntityBuilder`1[Order][0]
ChangeTracking: entity override applied EntityType=Order Override=Outbox Plugin=PostgreSqlOutbox`1
dbug: RayTree.Core.Handling.SharedHandlerBuilder`1[Order][0]
ChangeTracking: entity override applied EntityType=Order Override=OnInsert Plugin=MyService
info: RayTree.Core.Tracking.ChangeTrackingBuilder[0]
ChangeTracker built. EntityTypes=["Order", "Customer"] Plugins=Outbox=<none> Publisher=<none> Serializer=JsonSerializerPlugin Compressor=NoOpCompressorPlugin Repository=<none> HasCustomMeter=False HasCustomDeduplicationStore=False HasCustomLoggerFactory=True
info: RayTree.Core.Tracking.EntityChangeTracker[0]
ChangeTracking: tracker initialization started
dbug: RayTree.Core.Tracking.EntityChangeTracker[0]
ChangeTracking: publisher initialized EntityTypeCount=2
dbug: RayTree.Core.Tracking.EntityChangeTracker[0]
ChangeTracking: consumers initialized ConsumerCount=1
info: RayTree.Core.Tracking.EntityChangeTracker[0]
ChangeTracking: tracker initialization completed
info: RayTree.Hosting.ChangeTrackingHostedService[0]
ChangeTracking starting. ConfigurationBound=True
On failure, the inner plugin emits the Error with the exception, and the tracker emits a single Warning marking the abort point so operators can grep back to the cause without losing context.
RayTree emits System.Diagnostics.Metrics instruments on a Meter named "RayTree" (counters, histograms, and an observable gauge for outbox depth). Instrument calls are silent no-ops when no listener is attached, so there is no overhead for consumers that opt out.
EntityChangeTracker.Create() creates a RayTreeMeter automatically and EntityChangeTracker disposes it. To collect the metrics, attach a MeterListener to the meter named "RayTree", or use the OTel SDK via the RayTree.OpenTelemetry package:
services.AddOpenTelemetry()
.WithMetrics(b => b
.AddRayTreeMetrics()
.AddPrometheusExporter());Pass a RayTreeMeter instance to share it across trackers or to control its lifetime:
var meter = new RayTreeMeter();
var tracker = EntityChangeTracker.Create(loggerFactory)
.UseMeter(meter)
.ForEntity<Order>(/* ... */)
.Build();
// Caller-supplied meter is NOT disposed by the tracker.Full instrument inventory, unit conventions, suggested bucket boundaries, and sample dashboard queries are in opentelemetry-metrics.md.
// EntityChangeTracker is IDisposable — stops all publisher services
tracker.Dispose();
// Or use 'using'
using var tracker = builder.Build();