Skip to content

Commit c54cf31

Browse files
committed
Reworked the CancellationToken handling for RabbitMQ transport contexts (connection and channel) to ensure all asynchronous methods are invoked with the proper cancellation tokens.
1 parent 58cbcf4 commit c54cf31

18 files changed

Lines changed: 290 additions & 172 deletions

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/ChannelContext.cs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,32 @@ public interface ChannelContext :
3333
/// <param name="basicProperties">The message properties</param>
3434
/// <param name="body">The message body</param>
3535
/// <param name="awaitAck"></param>
36+
/// <param name="cancellationToken"></param>
3637
/// <returns>
3738
/// An awaitable Task that is completed when the message is acknowledged by the broker
3839
/// </returns>
39-
Task BasicPublishAsync(string exchange, string routingKey, bool mandatory, BasicProperties basicProperties, byte[] body, bool awaitAck);
40+
Task BasicPublishAsync(string exchange, string routingKey, bool mandatory, BasicProperties basicProperties, byte[] body, bool awaitAck, CancellationToken cancellationToken);
4041

41-
Task ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
42-
Task ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
43-
Task ExchangeDeclarePassive(string exchange);
42+
Task ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments, CancellationToken cancellationToken);
43+
Task ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments, CancellationToken cancellationToken);
44+
Task ExchangeDeclarePassive(string exchange, CancellationToken cancellationToken);
4445

45-
Task QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
46-
Task<QueueDeclareOk> QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
47-
Task<QueueDeclareOk> QueueDeclarePassive(string queue);
46+
Task QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments, CancellationToken cancellationToken);
47+
Task<QueueDeclareOk> QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments, CancellationToken cancellationToken);
48+
Task<QueueDeclareOk> QueueDeclarePassive(string queue, CancellationToken cancellationToken);
4849

49-
Task<uint> QueuePurge(string queue);
50+
Task<uint> QueuePurge(string queue, CancellationToken cancellationToken);
5051

51-
Task BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
52+
Task BasicQos(uint prefetchSize, ushort prefetchCount, bool global, CancellationToken cancellationToken);
5253

53-
ValueTask BasicAck(ulong deliveryTag, bool multiple);
54+
ValueTask BasicAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken);
5455

55-
Task BasicNack(ulong deliveryTag, bool multiple, bool requeue);
56+
Task BasicNack(ulong deliveryTag, bool multiple, bool requeue, CancellationToken cancellationToken);
5657

5758
Task<string> BasicConsume(string queue, bool noAck, bool exclusive, IDictionary<string, object> arguments, IAsyncBasicConsumer consumer,
5859
string consumerTag, CancellationToken cancellationToken);
5960

60-
Task BasicCancel(string consumerTag);
61+
Task BasicCancel(string consumerTag, CancellationToken cancellationToken);
6162

6263
void NotifyFaulted(Exception exception, Uri inputAddress);
6364
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/ChannelContextFactory.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ void RemoveHandlers()
6666
return asyncContext;
6767
}
6868

69-
public IActivePipeContextAgent<ChannelContext> CreateActiveContext(ISupervisor supervisor,
70-
PipeContextHandle<ChannelContext> context, CancellationToken cancellationToken)
69+
public IActivePipeContextAgent<ChannelContext> CreateActiveContext(ISupervisor supervisor, PipeContextHandle<ChannelContext> context,
70+
CancellationToken cancellationToken)
7171
{
7272
return supervisor.AddActiveContext(context, CreateSharedChannel(context.Context, cancellationToken));
7373
}
7474

7575
static async Task<ChannelContext> CreateSharedChannel(Task<ChannelContext> context, CancellationToken cancellationToken)
7676
{
77-
return context.IsCompletedSuccessfully()
77+
return context.Status == TaskStatus.RanToCompletion
7878
? new ScopeChannelContext(context.Result, cancellationToken)
7979
: new ScopeChannelContext(await context.OrCanceled(cancellationToken).ConfigureAwait(false), cancellationToken);
8080
}
@@ -84,7 +84,7 @@ Task<ChannelContext> CreateChannel(IAsyncPipeContextAgent<ChannelContext> asyncC
8484
Task<ChannelContext> CreateChannelContext(ConnectionContext connectionContext, CancellationToken createCancellationToken,
8585
ushort? concurrentMessageLimit)
8686
{
87-
return connectionContext.CreateChannelContext(createCancellationToken, concurrentMessageLimit, asyncContext);
87+
return connectionContext.CreateChannelContext(asyncContext, concurrentMessageLimit, createCancellationToken);
8888
}
8989

9090
return _supervisor.CreateAgent(asyncContext, (context, token) => CreateChannelContext(context, token, _concurrentMessageLimit), cancellationToken);

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/ChannelContextSupervisor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public ChannelContextSupervisor(IConnectionContextSupervisor connectionContextSu
1414
}
1515

1616
public ChannelContextSupervisor(IChannelContextSupervisor channelContextSupervisor)
17-
: base(new ScopeChannelContextFactory(channelContextSupervisor))
17+
: base(new SharedChannelContextFactory(channelContextSupervisor))
1818
{
1919
channelContextSupervisor.AddSendAgent(this);
2020
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/ConnectionContext.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ public interface ConnectionContext :
4242
/// Create a channel on the connection
4343
/// </summary>
4444
/// <returns></returns>
45-
Task<IChannel> CreateChannel(CancellationToken cancellationToken, ushort? concurrentMessageLimit1);
45+
Task<IChannel> CreateChannel(ushort? concurrentMessageLimit, CancellationToken cancellationToken);
4646

4747
/// <summary>
4848
/// Create a channel, and return the <see cref="ChannelContext" />.
4949
/// </summary>
50-
/// <param name="cancellationToken"></param>
51-
/// <param name="concurrentMessageLimit"></param>
5250
/// <param name="agent"></param>
51+
/// <param name="concurrentMessageLimit"></param>
52+
/// <param name="cancellationToken"></param>
5353
/// <returns></returns>
54-
Task<ChannelContext> CreateChannelContext(CancellationToken cancellationToken, ushort? concurrentMessageLimit, IAgent agent);
54+
Task<ChannelContext> CreateChannelContext(IAgent agent, ushort? concurrentMessageLimit, CancellationToken cancellationToken);
5555
}
5656
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/ConnectionContextFactory.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public ConnectionContextFactory(IRabbitMqHostConfiguration hostConfiguration)
2626
_connectionFactory = new Lazy<ConnectionFactory>(() => _hostConfiguration.Settings.GetConnectionFactory());
2727
}
2828

29-
IPipeContextAgent<ConnectionContext> IPipeContextFactory<ConnectionContext>.CreateContext(ISupervisor supervisor)
29+
public IPipeContextAgent<ConnectionContext> CreateContext(ISupervisor supervisor)
3030
{
3131
Task<ConnectionContext> context = Task.Run(() => CreateConnection(supervisor), supervisor.Stopped);
3232

@@ -62,15 +62,15 @@ void RemoveHandler(Task _)
6262
return contextHandle;
6363
}
6464

65-
IActivePipeContextAgent<ConnectionContext> IPipeContextFactory<ConnectionContext>.CreateActiveContext(ISupervisor supervisor,
66-
PipeContextHandle<ConnectionContext> context, CancellationToken cancellationToken)
65+
public IActivePipeContextAgent<ConnectionContext> CreateActiveContext(ISupervisor supervisor, PipeContextHandle<ConnectionContext> context,
66+
CancellationToken cancellationToken)
6767
{
6868
return supervisor.AddActiveContext(context, CreateSharedConnection(context.Context, cancellationToken));
6969
}
7070

7171
static async Task<ConnectionContext> CreateSharedConnection(Task<ConnectionContext> context, CancellationToken cancellationToken)
7272
{
73-
return context.IsCompletedSuccessfully()
73+
return context.Status == TaskStatus.RanToCompletion
7474
? new SharedConnectionContext(context.Result, cancellationToken)
7575
: new SharedConnectionContext(await context.OrCanceled(cancellationToken).ConfigureAwait(false), cancellationToken);
7676
}
@@ -96,7 +96,7 @@ async Task<ConnectionContext> CreateConnection(ISupervisor supervisor)
9696
}
9797
else
9898
{
99-
var hostNames = new List<string>(1) { _hostConfiguration.Settings.Host };
99+
List<string> hostNames = [_hostConfiguration.Settings.Host];
100100

101101
connection = await _connectionFactory.Value.CreateConnectionAsync(hostNames, _hostConfiguration.Settings.ClientProvidedName)
102102
.ConfigureAwait(false);

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Middleware/ConfigureRabbitMqTopologyFilter.cs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ namespace MassTransit.RabbitMqTransport.Middleware;
22

33
using System;
44
using System.Linq;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Topology;
78

@@ -25,7 +26,7 @@ public ConfigureRabbitMqTopologyFilter(TSettings settings, BrokerTopology broker
2526

2627
public async Task Send(ChannelContext context, IPipe<ChannelContext> next)
2728
{
28-
OneTimeContext<ConfigureTopologyContext<TSettings>> oneTimeContext = await Configure(context);
29+
OneTimeContext<ConfigureTopologyContext<TSettings>> oneTimeContext = await Configure(context, context.CancellationToken);
2930

3031
try
3132
{
@@ -39,12 +40,12 @@ public async Task Send(ChannelContext context, IPipe<ChannelContext> next)
3940
}
4041
}
4142

42-
public async Task<OneTimeContext<ConfigureTopologyContext<TSettings>>> Configure(ChannelContext context)
43+
public async Task<OneTimeContext<ConfigureTopologyContext<TSettings>>> Configure(ChannelContext context, CancellationToken cancellationToken)
4344
{
4445
return await context.OneTimeSetup<ConfigureTopologyContext<TSettings>>(() =>
4546
{
4647
context.GetOrAddPayload(() => _settings);
47-
return ConfigureTopology(context);
48+
return ConfigureTopology(context, cancellationToken);
4849
}).ConfigureAwait(false);
4950
}
5051

@@ -55,29 +56,29 @@ public void Probe(ProbeContext context)
5556
_brokerTopology.Probe(scope);
5657
}
5758

58-
async Task ConfigureTopology(ChannelContext context)
59+
async Task ConfigureTopology(ChannelContext context, CancellationToken cancellationToken)
5960
{
60-
await Task.WhenAll(_brokerTopology.Queues.Select(queue => Declare(context, queue))).ConfigureAwait(false);
61+
await Task.WhenAll(_brokerTopology.Queues.Select(queue => Declare(context, queue, cancellationToken))).ConfigureAwait(false);
6162

62-
await Task.WhenAll(_brokerTopology.Exchanges.Select(exchange => Declare(context, exchange))).ConfigureAwait(false);
63+
await Task.WhenAll(_brokerTopology.Exchanges.Select(exchange => Declare(context, exchange, cancellationToken))).ConfigureAwait(false);
6364

64-
await Task.WhenAll(_brokerTopology.QueueBindings.Select(binding => Bind(context, binding))).ConfigureAwait(false);
65+
await Task.WhenAll(_brokerTopology.QueueBindings.Select(binding => Bind(context, binding, cancellationToken))).ConfigureAwait(false);
6566

66-
await Task.WhenAll(_brokerTopology.ExchangeBindings.Select(binding => Bind(context, binding))).ConfigureAwait(false);
67+
await Task.WhenAll(_brokerTopology.ExchangeBindings.Select(binding => Bind(context, binding, cancellationToken))).ConfigureAwait(false);
6768
}
6869

69-
static Task Declare(ChannelContext context, Exchange exchange)
70+
static Task Declare(ChannelContext context, Exchange exchange, CancellationToken cancellationToken)
7071
{
7172
RabbitMqLogMessages.DeclareExchange(exchange);
7273

73-
return context.ExchangeDeclare(exchange.ExchangeName, exchange.ExchangeType, exchange.Durable, exchange.AutoDelete, exchange.ExchangeArguments);
74+
return context.ExchangeDeclare(exchange.ExchangeName, exchange.ExchangeType, exchange.Durable, exchange.AutoDelete, exchange.ExchangeArguments, cancellationToken);
7475
}
7576

76-
static async Task Declare(ChannelContext context, Queue queue)
77+
static async Task Declare(ChannelContext context, Queue queue, CancellationToken cancellationToken)
7778
{
7879
try
7980
{
80-
var ok = await context.QueueDeclare(queue.QueueName, queue.Durable, queue.Exclusive, queue.AutoDelete, queue.QueueArguments)
81+
var ok = await context.QueueDeclare(queue.QueueName, queue.Durable, queue.Exclusive, queue.AutoDelete, queue.QueueArguments, cancellationToken)
8182
.ConfigureAwait(false);
8283

8384
RabbitMqLogMessages.DeclareQueue(queue, ok.ConsumerCount, ok.MessageCount);
@@ -90,18 +91,18 @@ static async Task Declare(ChannelContext context, Queue queue)
9091
}
9192
}
9293

93-
static async Task Bind(ChannelContext context, ExchangeToExchangeBinding binding)
94+
static async Task Bind(ChannelContext context, ExchangeToExchangeBinding binding, CancellationToken cancellationToken)
9495
{
9596
RabbitMqLogMessages.BindToExchange(binding);
9697

97-
await context.ExchangeBind(binding.Destination.ExchangeName, binding.Source.ExchangeName, binding.RoutingKey, binding.Arguments)
98+
await context.ExchangeBind(binding.Destination.ExchangeName, binding.Source.ExchangeName, binding.RoutingKey, binding.Arguments, cancellationToken)
9899
.ConfigureAwait(false);
99100
}
100101

101-
static async Task Bind(ChannelContext context, ExchangeToQueueBinding binding)
102+
static async Task Bind(ChannelContext context, ExchangeToQueueBinding binding, CancellationToken cancellationToken)
102103
{
103104
RabbitMqLogMessages.BindToQueue(binding);
104105

105-
await context.QueueBind(binding.Destination.QueueName, binding.Source.ExchangeName, binding.RoutingKey, binding.Arguments).ConfigureAwait(false);
106+
await context.QueueBind(binding.Destination.QueueName, binding.Source.ExchangeName, binding.RoutingKey, binding.Arguments, cancellationToken).ConfigureAwait(false);
106107
}
107108
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Middleware/PrefetchCountFilter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ public PrefetchCountFilter(ushort prefetchCount)
1616
_prefetchCount = prefetchCount;
1717
}
1818

19-
void IProbeSite.Probe(ProbeContext context)
19+
public void Probe(ProbeContext context)
2020
{
2121
var scope = context.CreateFilterScope("prefetchCount");
2222
scope.Add("prefetchCount", _prefetchCount);
2323
}
2424

25-
async Task IFilter<ChannelContext>.Send(ChannelContext context, IPipe<ChannelContext> next)
25+
public async Task Send(ChannelContext context, IPipe<ChannelContext> next)
2626
{
27-
await context.BasicQos(0, _prefetchCount, false).ConfigureAwait(false);
27+
await context.BasicQos(0, _prefetchCount, false, context.CancellationToken).ConfigureAwait(false);
2828

2929
await next.Send(context).ConfigureAwait(false);
3030
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Middleware/PurgeOnStartupFilter.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
namespace MassTransit.RabbitMqTransport.Middleware
22
{
33
using System.Threading.Tasks;
4-
using RabbitMQ.Client;
54

65

76
/// <summary>
@@ -18,26 +17,26 @@ public PurgeOnStartupFilter(string queueName)
1817
_queueName = queueName;
1918
}
2019

21-
void IProbeSite.Probe(ProbeContext context)
20+
public void Probe(ProbeContext context)
2221
{
2322
context.CreateFilterScope("purgeOnStartup");
2423
}
2524

26-
async Task IFilter<ChannelContext>.Send(ChannelContext context, IPipe<ChannelContext> next)
25+
public async Task Send(ChannelContext context, IPipe<ChannelContext> next)
2726
{
28-
var queueOk = await context.QueueDeclarePassive(_queueName).ConfigureAwait(false);
27+
var queueOk = await context.QueueDeclarePassive(_queueName, context.CancellationToken).ConfigureAwait(false);
2928

3029
if (queueOk.ConsumerCount == 0 && queueOk.MessageCount > 0)
31-
await PurgeIfRequested(context, queueOk, _queueName).ConfigureAwait(false);
30+
await PurgeIfRequested(context, _queueName).ConfigureAwait(false);
3231

3332
await next.Send(context).ConfigureAwait(false);
3433
}
3534

36-
async Task PurgeIfRequested(ChannelContext context, QueueDeclareOk queueOk, string queueName)
35+
async Task PurgeIfRequested(ChannelContext context, string queueName)
3736
{
3837
if (!_queueAlreadyPurged)
3938
{
40-
var purgedMessageCount = await context.QueuePurge(queueName).ConfigureAwait(false);
39+
var purgedMessageCount = await context.QueuePurge(queueName, context.CancellationToken).ConfigureAwait(false);
4140

4241
LogContext.Debug?.Log("Purged {MessageCount} messages from queue {QueueName}", purgedMessageCount, queueName);
4342

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqBasicConsumer.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ public async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
103103
if (IsStopping)
104104
return;
105105

106-
await Dispatch(deliveryTag, context,
107-
_receiveSettings.NoAck ? NoLockReceiveContext.Instance : new RabbitMqReceiveLockContext(_channel, deliveryTag))
106+
await Dispatch(deliveryTag, context, _receiveSettings.NoAck
107+
? NoLockReceiveContext.Instance
108+
: new RabbitMqReceiveLockContext(_channel, deliveryTag, context.CancellationToken))
108109
.ConfigureAwait(false);
109110
}
110111
catch (OperationInterruptedException exception)
@@ -151,7 +152,7 @@ protected override async Task ActiveAndActualAgentsCompleted(StopContext context
151152
try
152153
{
153154
if (IsGracefulShutdown && _channel.Channel.IsOpen)
154-
await _channel.BasicCancel(_consumerTag).ConfigureAwait(false);
155+
await _channel.BasicCancel(_consumerTag, context.CancellationToken).ConfigureAwait(false);
155156
}
156157
catch (Exception exception)
157158
{

0 commit comments

Comments
 (0)