RayTree's plugin system allows you to implement custom providers for outbox storage, queue publishing/consuming, serialization, and compression.
Stores changes and tracks their publish state.
public interface IOutbox
{
Task InitializeAsync(CancellationToken cancellationToken = default);
Task WriteAsync<TEntity>(EntityChange<TEntity> change, CancellationToken cancellationToken = default)
where TEntity : class;
Task<IReadOnlyList<EntityChange<TEntity>>> GetUnpublishedAsync<TEntity>(
int batchSize,
CancellationToken cancellationToken = default)
where TEntity : class;
Task<IReadOnlyList<EntityChange<TEntity>>> GetUnpublishedAsync<TEntity>(
ChangeType? changeType = null,
DateTime? since = null,
int batchSize = 100,
CancellationToken cancellationToken = default)
where TEntity : class;
Task MarkPublishedAsync(long id, CancellationToken cancellationToken = default);
Task<int> CleanupPublishedAsync(TimeSpan retentionPeriod, CancellationToken cancellationToken = default);
Task<EntityChange<TEntity>?> GetByIdAsync<TEntity>(long id, CancellationToken cancellationToken = default)
where TEntity : class;
}Implementation notes:
WriteAsyncshould setchange.Idto the auto-generated row ID (useRETURNING idin PostgreSQL)GetUnpublishedAsyncreturns entries ordered byTimestamp, limited bybatchSizeMarkPublishedAsyncsetsPublished = truefor the given IDCleanupPublishedAsyncdeletes published rows older thanretentionPeriodand returns the count deleted
CRUD operations for source entity persistence.
public interface IRepository<TEntity> : IRepository where TEntity : class
{
Task InsertAsync(TEntity entity, CancellationToken cancellationToken = default);
Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default);
Task DeleteAsync(TEntity entity, CancellationToken cancellationToken = default);
Task<TEntity?> GetByIdAsync(object[] keyValues, CancellationToken cancellationToken = default);
}
public interface IRepository
{
Task InitializeAsync(CancellationToken cancellationToken = default);
}Publishes a MessageEnvelope to a message broker. The envelope is the only thing that crosses the queue boundary — it contains change metadata plus the already-serialized and compressed entity state as byte[] Payload.
public interface IQueuePublisher
{
Task InitializeAsync(CancellationToken cancellationToken = default);
Task PublishAsync(MessageEnvelope envelope, CancellationToken cancellationToken = default);
}Implementation notes:
InitializeAsyncshould create the broker-side infrastructure (exchange, topic, queue) if it does not existPayloadis already compressed; write it to the broker as-is without re-encoding
Receives MessageEnvelope messages from a broker and exposes them as an async stream.
public interface IQueueConsumer
{
Task InitializeAsync(CancellationToken cancellationToken = default);
IAsyncEnumerable<MessageEnvelope> ConsumeAsync(CancellationToken cancellationToken = default);
}Implementation notes:
InitializeAsyncopens the connection and sets up the subscriptionConsumeAsyncmust be called afterInitializeAsync- All broker operations (consume + ack) must run on the same thread for brokers with native single-thread requirements (e.g., Confluent.Kafka); use a dedicated background thread with a
Channel<MessageEnvelope>buffer - Accept
ILoggerFactory loggerFactoryas a required constructor parameter and create a typedILogger<T>from it — do not add aNullLoggerFactory.Instancefallback inside the class itself
Serializes an EntityChange<TEntity> (including its typed State) into a byte stream and deserializes it back.
public interface IChangeSerializer
{
string Name { get; }
Task SerializeAsync<TEntity>(
EntityChange<TEntity> change,
Stream destination,
CancellationToken cancellationToken = default)
where TEntity : class;
Task<EntityChange<TEntity>> DeserializeAsync<TEntity>(
Stream source,
CancellationToken cancellationToken = default)
where TEntity : class;
}Compresses and decompresses byte streams.
public interface IChangeCompressor
{
string Name { get; }
Task CompressAsync(Stream source, Stream destination, CancellationToken cancellationToken = default);
Task DecompressAsync(Stream source, Stream destination, CancellationToken cancellationToken = default);
}Changes flow through the pipeline in this order on the publisher side:
EntityChange<T>
→ IChangeSerializer.SerializeAsync (writes entity state to a MemoryStream)
→ IChangeCompressor.CompressAsync (compresses into another MemoryStream)
→ MessageEnvelope { Payload = compressed bytes }
→ IQueuePublisher.PublishAsync (sends envelope to broker)
On the subscriber side the envelope is received and the stages reverse:
MessageEnvelope
→ IChangeCompressor.DecompressAsync (expands Payload)
→ IChangeSerializer.DeserializeAsync<TEntity>
→ EntityChange<TEntity> { State = typed entity }
→ ChangeHandlerAsync<TEntity>(change, cancellationToken)
builder.ForEntity<MyEntity>()
.UseOutbox(new MyCustomOutbox(connectionString))
.UsePublisher(new MyCustomQueuePublisher(brokerOptions))
.UseSerializer(new MyCustomSerializer())
.UseCompressor(new MyCustomCompressor());builder.UseSerializer<IChangeSerializer>(_ => new MyCustomSerializer());
builder.UseCompressor<IChangeCompressor>(_ => new MyCustomCompressor());public static class MyOutboxExtensions
{
public static IEntityBuilder UseMyCustomOutbox(
this IEntityBuilder builder,
string connectionString)
=> builder.UseOutbox(new MyCustomOutbox(connectionString));
}Use the builder to wire plugins and construct EntityChangeTracker:
using var tracker = EntityChangeTracker.Create()
.ForEntity<MyEntity>(e => e
.UseOutbox(new InMemoryOutbox())
.UsePublisher(new InMemoryQueue())
.UseSerializer(new MyCustomSerializer())
.UseCompressor(new MyCustomCompressor()))
.Build();Verify serializer round-trips:
var change = new EntityChange<MyEntity>
{
EntityId = "1",
ChangeType = ChangeType.Insert,
EntityType = typeof(MyEntity).FullName!,
State = new MyEntity { Id = 1 }
};
using var stream = new MemoryStream();
await serializer.SerializeAsync(change, stream);
stream.Position = 0;
var deserialized = await serializer.DeserializeAsync<MyEntity>(stream);
Assert.That(deserialized.EntityId, Is.EqualTo(change.EntityId));
Assert.That(deserialized.State!.Id, Is.EqualTo(1));