Skip to content

Commit 763159b

Browse files
committed
Verify duplicate baggage in telemetry only captures most recent value
1 parent 1a9c435 commit 763159b

2 files changed

Lines changed: 106 additions & 2 deletions

File tree

src/MassTransit/Logging/Diagnostics/LogContextActivityExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public static class LogContextActivityExtensions
217217

218218
activity.Start();
219219

220-
IList<KeyValuePair<string, string?>>? baggage = null;
220+
List<KeyValuePair<string, string?>>? baggage = null;
221221
foreach (KeyValuePair<string, string?> pair in activity.Baggage)
222222
{
223223
if (pair.Key.Equals(DiagnosticHeaders.Messaging.ConversationId) || pair.Key.Equals(DiagnosticHeaders.CorrelationId))
@@ -226,7 +226,7 @@ public static class LogContextActivityExtensions
226226
if (string.IsNullOrWhiteSpace(pair.Value))
227227
continue;
228228

229-
baggage ??= new List<KeyValuePair<string, string?>>();
229+
baggage ??= new List<KeyValuePair<string, string?>>(1);
230230
baggage.Add(pair);
231231
}
232232

tests/MassTransit.RabbitMqTransport.Tests/OpenTelemetry_Specs.cs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,108 @@ await Assert.MultipleAsync(async () =>
7272
});
7373
}
7474

75+
[Test]
76+
public async Task Should_carry_excess_baggage_with_newtonsoft()
77+
{
78+
var services = new ServiceCollection();
79+
AddTraceListener(services, "order-api");
80+
81+
await using var provider = services
82+
.AddMassTransitTestHarness(x =>
83+
{
84+
x.AddConsumer<MonitoredSubmitOrderConsumer>();
85+
86+
x.UsingRabbitMq((context, cfg) =>
87+
{
88+
cfg.UseNewtonsoftJsonSerializer();
89+
90+
cfg.ConfigureEndpoints(context);
91+
});
92+
})
93+
.BuildServiceProvider(true);
94+
95+
var harness = provider.GetTestHarness();
96+
97+
await harness.Start();
98+
99+
IRequestClient<SubmitOrder> client = harness.GetRequestClient<SubmitOrder>();
100+
101+
await harness.Bus.Publish(new PingMessage());
102+
103+
StartedActivity? activity = LogContext.Current?.StartGenericActivity("api process");
104+
105+
activity?.Activity.SetBaggage("MyBag", "IsFull");
106+
activity?.Activity.SetBaggage("MyBag", "IsEmpty");
107+
activity?.Activity.SetBaggage("MyBag", "IsHalfFull");
108+
109+
Response<OrderSubmitted> response = await client.GetResponse<OrderSubmitted>(new
110+
{
111+
OrderId = InVar.Id,
112+
OrderNumber = "123"
113+
});
114+
115+
activity?.Stop();
116+
117+
await Assert.MultipleAsync(async () =>
118+
{
119+
Assert.That(await harness.Sent.Any<OrderSubmitted>(), Is.True);
120+
121+
Assert.That(await harness.Consumed.Any<SubmitOrder>(), Is.True);
122+
123+
Assert.That(response.Headers.Get<string>("BaggageValue"), Is.EqualTo("IsHalfFull"));
124+
});
125+
}
126+
127+
[Test]
128+
public async Task Should_carry_excess_baggage()
129+
{
130+
var services = new ServiceCollection();
131+
AddTraceListener(services, "order-api");
132+
133+
await using var provider = services
134+
.AddMassTransitTestHarness(x =>
135+
{
136+
x.AddConsumer<MonitoredSubmitOrderConsumer>();
137+
138+
x.UsingRabbitMq((context, cfg) =>
139+
{
140+
cfg.ConfigureEndpoints(context);
141+
});
142+
})
143+
.BuildServiceProvider(true);
144+
145+
var harness = provider.GetTestHarness();
146+
147+
await harness.Start();
148+
149+
IRequestClient<SubmitOrder> client = harness.GetRequestClient<SubmitOrder>();
150+
151+
await harness.Bus.Publish(new PingMessage());
152+
153+
StartedActivity? activity = LogContext.Current?.StartGenericActivity("api process");
154+
155+
activity?.Activity.SetBaggage("MyBag", "IsFull");
156+
activity?.Activity.SetBaggage("MyBag", "IsEmpty");
157+
activity?.Activity.SetBaggage("MyBag", "IsHalfFull");
158+
159+
Response<OrderSubmitted> response = await client.GetResponse<OrderSubmitted>(new
160+
{
161+
OrderId = InVar.Id,
162+
OrderNumber = "123"
163+
});
164+
165+
activity?.Stop();
166+
167+
await Assert.MultipleAsync(async () =>
168+
{
169+
Assert.That(await harness.Sent.Any<OrderSubmitted>(), Is.True);
170+
171+
Assert.That(await harness.Consumed.Any<SubmitOrder>(), Is.True);
172+
173+
Assert.That(response.Headers.Get<string>("BaggageValue"), Is.EqualTo("IsHalfFull"));
174+
});
175+
}
176+
75177
[Test]
76178
public async Task Should_report_telemetry_to_jaeger()
77179
{
@@ -374,6 +476,8 @@ public Task Consume(ConsumeContext<SubmitOrder> context)
374476
{
375477
var value = System.Diagnostics.Activity.Current?.GetBaggageItem("MyBag");
376478

479+
LogContext.Debug?.Log("Body: {Body}", context.ReceiveContext.Body.GetString());
480+
377481
return context.RespondAsync<OrderSubmitted>(context.Message, x =>
378482
{
379483
x.Headers.Set("BaggageValue", value);

0 commit comments

Comments
 (0)