Skip to content

Commit df4bd59

Browse files
committed
Fixed in-memory outbox telemetry to capture ExecutionContext, which should properly flow activity traces
1 parent 8fbb1f9 commit df4bd59

5 files changed

Lines changed: 143 additions & 54 deletions

File tree

src/MassTransit/Middleware/InMemoryOutbox/InMemoryOutboxConsumeContext.cs

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ public class InMemoryOutboxConsumeContext :
1414
OutboxContext
1515
{
1616
readonly TaskCompletionSource<InMemoryOutboxConsumeContext> _clearToSend;
17+
readonly InMemoryOutboxDeferredMethodCollection _deferredMethods;
1718
readonly InMemoryOutboxMessageSchedulerContext _outboxSchedulerContext;
18-
readonly List<Func<Task>> _pendingActions;
1919

2020
protected InMemoryOutboxConsumeContext(ConsumeContext context)
2121
: base(context)
@@ -27,9 +27,10 @@ protected InMemoryOutboxConsumeContext(ConsumeContext context)
2727
ReceiveContext = outboxReceiveContext;
2828
PublishEndpointProvider = outboxReceiveContext.PublishEndpointProvider;
2929

30-
_pendingActions = [];
3130
_clearToSend = TaskUtil.GetTask<InMemoryOutboxConsumeContext>();
3231

32+
_deferredMethods = new InMemoryOutboxDeferredMethodCollection(_clearToSend.Task);
33+
3334
if (context.TryGetPayload(out MessageSchedulerContext schedulerContext))
3435
{
3536
_outboxSchedulerContext = (InMemoryOutboxMessageSchedulerContext)context.AddOrUpdatePayload<MessageSchedulerContext>(
@@ -44,45 +45,14 @@ protected InMemoryOutboxConsumeContext(ConsumeContext context)
4445

4546
public Task Add(Func<Task> method)
4647
{
47-
if (_clearToSend.Task.IsCompleted)
48-
return method();
49-
50-
lock (_pendingActions)
51-
{
52-
_pendingActions.Add(method);
53-
54-
return Task.CompletedTask;
55-
}
48+
return _deferredMethods.Add(method);
5649
}
5750

5851
public virtual async Task ExecutePendingActions(bool concurrentMessageDelivery)
5952
{
6053
_clearToSend.TrySetResult(this);
6154

62-
Func<Task>[] pendingActions;
63-
lock (_pendingActions)
64-
pendingActions = _pendingActions.ToArray();
65-
66-
if (pendingActions.Length > 0)
67-
{
68-
if (concurrentMessageDelivery)
69-
{
70-
var collection = new PendingTaskCollection(pendingActions.Length);
71-
72-
collection.Add(pendingActions.Select(action => action()));
73-
74-
await collection.Completed().ConfigureAwait(false);
75-
}
76-
else
77-
{
78-
foreach (Func<Task> action in pendingActions)
79-
{
80-
var task = action();
81-
if (task != null)
82-
await task.ConfigureAwait(false);
83-
}
84-
}
85-
}
55+
await _deferredMethods.Execute(concurrentMessageDelivery).ConfigureAwait(false);
8656

8757
if (_outboxSchedulerContext != null)
8858
{
@@ -99,8 +69,7 @@ public virtual async Task ExecutePendingActions(bool concurrentMessageDelivery)
9969

10070
public virtual async Task DiscardPendingActions()
10171
{
102-
lock (_pendingActions)
103-
_pendingActions.Clear();
72+
await _deferredMethods.Discard().ConfigureAwait(false);
10473

10574
if (_outboxSchedulerContext != null)
10675
{
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
namespace MassTransit.Middleware.InMemoryOutbox;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
8+
public class InMemoryOutboxDeferredMethod :
9+
IDisposable
10+
{
11+
readonly Func<Task> _method;
12+
ExecutionContext _executionContext;
13+
14+
public InMemoryOutboxDeferredMethod(ExecutionContext executionContext, Func<Task> method)
15+
{
16+
_executionContext = executionContext;
17+
_method = method;
18+
}
19+
20+
public void Dispose()
21+
{
22+
_executionContext?.Dispose();
23+
}
24+
25+
public async Task Run()
26+
{
27+
try
28+
{
29+
using var ec = _executionContext.CreateCopy();
30+
31+
Task task = null;
32+
33+
ExecutionContext.Run(ec, _ => task = _method(), null);
34+
35+
await task.ConfigureAwait(false);
36+
}
37+
finally
38+
{
39+
_executionContext?.Dispose();
40+
_executionContext = null;
41+
}
42+
}
43+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#nullable enable
2+
namespace MassTransit.Middleware.InMemoryOutbox;
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Util;
10+
11+
12+
public class InMemoryOutboxDeferredMethodCollection
13+
{
14+
readonly Task? _clearToSend;
15+
readonly List<InMemoryOutboxDeferredMethod> _pendingMethods;
16+
17+
public InMemoryOutboxDeferredMethodCollection(Task? clearToSend = null)
18+
{
19+
_clearToSend = clearToSend;
20+
_pendingMethods = [];
21+
}
22+
23+
public Task Add(Func<Task> method)
24+
{
25+
if (_clearToSend?.IsCompleted ?? false)
26+
return method();
27+
28+
var executionContext = ExecutionContext.Capture();
29+
30+
lock (_pendingMethods)
31+
{
32+
_pendingMethods.Add(new InMemoryOutboxDeferredMethod(executionContext, method));
33+
34+
return Task.CompletedTask;
35+
}
36+
}
37+
38+
public async Task Execute(bool concurrent)
39+
{
40+
InMemoryOutboxDeferredMethod[] pendingActions;
41+
lock (_pendingMethods)
42+
pendingActions = _pendingMethods.ToArray();
43+
44+
try
45+
{
46+
if (pendingActions.Length > 0)
47+
{
48+
if (concurrent)
49+
{
50+
var collection = new PendingTaskCollection(pendingActions.Length);
51+
52+
collection.Add(pendingActions.Select(method => method.Run()));
53+
54+
await collection.Completed().ConfigureAwait(false);
55+
}
56+
else
57+
{
58+
foreach (var method in pendingActions)
59+
{
60+
var task = method.Run();
61+
if (task != null)
62+
await task.ConfigureAwait(false);
63+
}
64+
}
65+
}
66+
}
67+
finally
68+
{
69+
foreach (var deferredMethod in pendingActions)
70+
deferredMethod.Dispose();
71+
}
72+
}
73+
74+
public async Task Discard()
75+
{
76+
InMemoryOutboxDeferredMethod[] pendingMethods;
77+
lock (_pendingMethods)
78+
{
79+
pendingMethods = _pendingMethods.ToArray();
80+
_pendingMethods.Clear();
81+
}
82+
83+
foreach (var method in pendingMethods)
84+
method.Dispose();
85+
}
86+
}

src/MassTransit/Middleware/InMemoryOutbox/InMemoryOutboxMessageSchedulerContext.cs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
public class InMemoryOutboxMessageSchedulerContext :
1111
MessageSchedulerContext
1212
{
13-
readonly List<Func<Task>> _cancelMessages;
13+
readonly InMemoryOutboxDeferredMethodCollection _cancelMessages;
1414
readonly Task _clearToSend;
1515
readonly Uri _inputAddress;
1616
readonly object _listLock = new object();
@@ -26,8 +26,8 @@ public InMemoryOutboxMessageSchedulerContext(ConsumeContext consumeContext, Mess
2626

2727
_scheduler = new Lazy<IMessageScheduler>(() => schedulerFactory(consumeContext));
2828

29-
_scheduledMessages = new List<ScheduledMessage>();
30-
_cancelMessages = new List<Func<Task>>();
29+
_scheduledMessages = [];
30+
_cancelMessages = new InMemoryOutboxDeferredMethodCollection();
3131
}
3232

3333
public MessageSchedulerFactory SchedulerFactory { get; }
@@ -422,20 +422,7 @@ public Task CancelAllScheduledMessages()
422422

423423
public Task ExecutePendingActions()
424424
{
425-
Func<Task>[] cancelMessages;
426-
lock (_listLock)
427-
{
428-
if (_cancelMessages.Count == 0)
429-
return Task.CompletedTask;
430-
431-
cancelMessages = _cancelMessages.ToArray();
432-
}
433-
434-
var tasks = new PendingTaskCollection(cancelMessages.Length);
435-
foreach (Func<Task> cancel in cancelMessages)
436-
tasks.Add(cancel());
437-
438-
return tasks.Completed();
425+
return _cancelMessages.Execute(true);
439426
}
440427
}
441428
}

tests/MassTransit.Tests/ContainerTests/Batch_Specs.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace MassTransit.Tests.ContainerTests
22
{
33
using System;
4+
using System.Diagnostics;
45
using System.Linq;
56
using System.Threading;
67
using System.Threading.Tasks;
@@ -49,6 +50,7 @@ public class When_retry_and_in_memory_outbox_are_used_with_batch_consumers
4950
public async Task Should_deliver_the_batch_to_the_consumer()
5051
{
5152
await using var provider = new ServiceCollection()
53+
.AddTelemetryListener()
5254
.AddMassTransitTestHarness(x =>
5355
{
5456
x.AddConsumer<TestOutboxBatchConsumer>();
@@ -141,6 +143,8 @@ class TestOutboxBatchConsumer :
141143
{
142144
public Task Consume(ConsumeContext<Batch<BatchItem>> context)
143145
{
146+
using var activity = new Activity("TestOutboxBatchConsumer").Start();
147+
144148
if (context.TryGetPayload<InMemoryOutboxConsumeContext>(out _))
145149
{
146150
context.Respond(new BatchResult

0 commit comments

Comments
 (0)