Skip to content

Commit 2d7ad75

Browse files
committed
Fixed scenario where the Fault<TJob> event was not published when UseMessageRetry is configured on the bus
1 parent 56e5d30 commit 2d7ad75

5 files changed

Lines changed: 69 additions & 61 deletions

File tree

src/MassTransit/Consumers/Configuration/JobConsumerMessageConnector.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ ConnectHandle ConnectStartJobConsumer(IConsumePipeConnector consumePipe, IConsum
9999
ConnectHandle ConnectFinalizeJobConsumer(IConsumePipeConnector consumePipe, IConsumerSpecification<FinalizeJobConsumer<TJob>> specification,
100100
Guid jobTypeId)
101101
{
102-
var consumerFactory = new DelegateConsumerFactory<FinalizeJobConsumer<TJob>>(() => new FinalizeJobConsumer<TJob>(jobTypeId,
103-
TypeCache<TConsumer>.ShortName));
102+
var consumerFactory = new DelegateConsumerFactory<FinalizeJobConsumer<TJob>>(() => new FinalizeJobConsumer<TJob>(jobTypeId));
104103

105104
return _finalizeJobConsumerConnector.ConnectConsumer(consumePipe, consumerFactory, specification);
106105
}

src/MassTransit/Contexts/ConsumeContextEndpointExtensions.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
namespace MassTransit
22
{
33
using System;
4+
using System.Linq;
45
using System.Threading.Tasks;
6+
using Events;
7+
using Metadata;
8+
using Middleware;
59
using Transports;
610

711

@@ -135,5 +139,59 @@ async Task<ISendEndpoint> GetResponseEndpointAsync()
135139

136140
return GetResponseEndpointAsync();
137141
}
142+
143+
internal static async Task GenerateFault<T>(this ConsumeContext<T> context, Exception exception)
144+
where T : class
145+
{
146+
if (context.ReceiveContext.PublishFaults || context.FaultAddress != null || context.ResponseAddress != null)
147+
{
148+
Fault<T> fault = new FaultEvent<T>(context.Message, context.MessageId, HostMetadataCache.Host, exception,
149+
context.SupportedMessageTypes.ToArray());
150+
151+
var faultPipe = new FaultPipe<T>(context);
152+
153+
var faultContext = InternalOutboxExtensions.SkipOutbox(context);
154+
155+
var faultEndpoint = await faultContext.GetFaultEndpoint<T>().ConfigureAwait(false);
156+
157+
await faultEndpoint.Send(fault, faultPipe, context.CancellationToken).ConfigureAwait(false);
158+
}
159+
}
160+
161+
162+
class FaultPipe<T> :
163+
IPipe<SendContext<Fault<T>>>
164+
where T : class
165+
{
166+
readonly ConsumeContext<T> _context;
167+
168+
public FaultPipe(ConsumeContext<T> context)
169+
{
170+
_context = context;
171+
}
172+
173+
public Task Send(SendContext<Fault<T>> context)
174+
{
175+
context.TransferConsumeContextHeaders(_context);
176+
177+
context.CorrelationId = _context.CorrelationId;
178+
context.RequestId = _context.RequestId;
179+
180+
if (_context.TryGetPayload(out ConsumeRetryContext? consumeRetryContext) && consumeRetryContext.RetryCount > 0)
181+
context.Headers.Set(MessageHeaders.FaultRetryCount, consumeRetryContext.RetryCount);
182+
else if (_context.TryGetPayload(out RetryContext? retryContext) && retryContext.RetryCount > 0)
183+
context.Headers.Set(MessageHeaders.FaultRetryCount, retryContext.RetryCount);
184+
185+
var redeliveryCount = _context.Headers.Get<int>(MessageHeaders.RedeliveryCount);
186+
if (redeliveryCount.HasValue)
187+
context.Headers.Set(MessageHeaders.FaultRedeliveryCount, redeliveryCount);
188+
189+
return Task.CompletedTask;
190+
}
191+
192+
public void Probe(ProbeContext context)
193+
{
194+
}
195+
}
138196
}
139197
}

src/MassTransit/Contexts/Context/BaseConsumeContext.cs

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@ namespace MassTransit.Context
44
using System;
55
using System.Collections.Generic;
66
using System.Diagnostics.CodeAnalysis;
7-
using System.Linq;
87
using System.Threading;
98
using System.Threading.Tasks;
10-
using Events;
11-
using Metadata;
12-
using Middleware;
139
using Transports;
1410

1511

@@ -253,22 +249,10 @@ async Task RespondInternalAsync()
253249
return RespondInternalAsync();
254250
}
255251

256-
protected virtual async Task GenerateFault<T>(ConsumeContext<T> context, Exception exception)
252+
protected virtual Task GenerateFault<T>(ConsumeContext<T> context, Exception exception)
257253
where T : class
258254
{
259-
if (context.ReceiveContext.PublishFaults || context.FaultAddress != null || context.ResponseAddress != null)
260-
{
261-
Fault<T> fault = new FaultEvent<T>(context.Message, context.MessageId, HostMetadataCache.Host, exception,
262-
context.SupportedMessageTypes.ToArray());
263-
264-
var faultPipe = new FaultPipe<T>(context);
265-
266-
var faultContext = InternalOutboxExtensions.SkipOutbox(context);
267-
268-
var faultEndpoint = await faultContext.GetFaultEndpoint<T>().ConfigureAwait(false);
269-
270-
await faultEndpoint.Send(fault, faultPipe, context.CancellationToken).ConfigureAwait(false);
271-
}
255+
return context.GenerateFault(exception);
272256
}
273257

274258
Task ConsumeTask(Task task)
@@ -284,41 +268,5 @@ protected override async Task<ISendEndpoint> GetPublishSendEndpoint<T>()
284268

285269
return new ConsumeSendEndpoint(publishSendEndpoint, this);
286270
}
287-
288-
289-
class FaultPipe<T> :
290-
IPipe<SendContext<Fault<T>>>
291-
where T : class
292-
{
293-
readonly ConsumeContext<T> _context;
294-
295-
public FaultPipe(ConsumeContext<T> context)
296-
{
297-
_context = context;
298-
}
299-
300-
public Task Send(SendContext<Fault<T>> context)
301-
{
302-
context.TransferConsumeContextHeaders(_context);
303-
304-
context.CorrelationId = _context.CorrelationId;
305-
context.RequestId = _context.RequestId;
306-
307-
if (_context.TryGetPayload(out ConsumeRetryContext? consumeRetryContext) && consumeRetryContext.RetryCount > 0)
308-
context.Headers.Set(MessageHeaders.FaultRetryCount, consumeRetryContext.RetryCount);
309-
else if (_context.TryGetPayload(out RetryContext? retryContext) && retryContext.RetryCount > 0)
310-
context.Headers.Set(MessageHeaders.FaultRetryCount, retryContext.RetryCount);
311-
312-
var redeliveryCount = _context.Headers.Get<int>(MessageHeaders.RedeliveryCount);
313-
if (redeliveryCount.HasValue)
314-
context.Headers.Set(MessageHeaders.FaultRedeliveryCount, redeliveryCount);
315-
316-
return Task.CompletedTask;
317-
}
318-
319-
public void Probe(ProbeContext context)
320-
{
321-
}
322-
}
323271
}
324272
}

src/MassTransit/JobService/JobService/FinalizeJobConsumer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@ public class FinalizeJobConsumer<TJob> :
1212
IConsumer<CompleteJob>
1313
where TJob : class
1414
{
15-
readonly string _jobConsumerTypeName;
1615
readonly Guid _jobTypeId;
1716

18-
public FinalizeJobConsumer(Guid jobTypeId, string jobConsumerTypeName)
17+
public FinalizeJobConsumer(Guid jobTypeId)
1918
{
2019
_jobTypeId = jobTypeId;
21-
_jobConsumerTypeName = jobConsumerTypeName;
2220
}
2321

2422
public Task Consume(ConsumeContext<CompleteJob> context)
@@ -50,7 +48,7 @@ public Task Consume(ConsumeContext<FaultJob> context)
5048

5149
var jobContext = new FaultJobContext<TJob>(context, job);
5250

53-
return jobContext.NotifyFaulted(message.Duration ?? TimeSpan.Zero, _jobConsumerTypeName, new ExceptionInfoException(message.Exceptions));
51+
return jobContext.GenerateFault(new ExceptionInfoException(message.Exceptions));
5452
}
5553
}
5654
}

tests/MassTransit.Tests/JobConsumerFault_Specs.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ public class JobConsumerFault_Specs
4949
[Test]
5050
public async Task Should_detect_the_faulted_job()
5151
{
52-
await using var provider = SetupServiceCollection(x => x.AddConsumer<OddJobFaultConsumer>());
52+
await using var provider = SetupServiceCollection(x =>
53+
{
54+
x.AddConsumer<OddJobFaultConsumer>();
55+
x.AddScoped<ICustomDependency, CustomDependency>();
56+
});
5357

5458
var harness = provider.GetTestHarness();
5559

@@ -125,6 +129,7 @@ static ServiceProvider SetupServiceCollection(Action<IBusRegistrationConfigurato
125129
x.UsingInMemory((context, cfg) =>
126130
{
127131
cfg.UseDelayedMessageScheduler();
132+
cfg.UseMessageRetry(r => r.Immediate(2));
128133

129134
var options = new ServiceInstanceOptions()
130135
.SetEndpointNameFormatter(context.GetService<IEndpointNameFormatter>() ??

0 commit comments

Comments
 (0)