Skip to content

Commit 79da359

Browse files
committed
Cleanup from cancellation for RabbitMQ
1 parent 3141829 commit 79da359

6 files changed

Lines changed: 23 additions & 22 deletions

File tree

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Middleware/ConfigureRabbitMqTopologyFilter.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ public async Task Send(ChannelContext context, IPipe<ChannelContext> next)
4040
}
4141
}
4242

43+
public void Probe(ProbeContext context)
44+
{
45+
var scope = context.CreateFilterScope("configureTopology");
46+
47+
_brokerTopology.Probe(scope);
48+
}
49+
4350
public async Task<OneTimeContext<ConfigureTopologyContext<TSettings>>> Configure(ChannelContext context, CancellationToken cancellationToken)
4451
{
4552
return await context.OneTimeSetup<ConfigureTopologyContext<TSettings>>(() =>
@@ -49,13 +56,6 @@ public async Task<OneTimeContext<ConfigureTopologyContext<TSettings>>> Configure
4956
}).ConfigureAwait(false);
5057
}
5158

52-
public void Probe(ProbeContext context)
53-
{
54-
var scope = context.CreateFilterScope("configureTopology");
55-
56-
_brokerTopology.Probe(scope);
57-
}
58-
5959
async Task ConfigureTopology(ChannelContext context, CancellationToken cancellationToken)
6060
{
6161
await Task.WhenAll(_brokerTopology.Queues.Select(queue => Declare(context, queue, cancellationToken))).ConfigureAwait(false);
@@ -71,7 +71,8 @@ static Task Declare(ChannelContext context, Exchange exchange, CancellationToken
7171
{
7272
RabbitMqLogMessages.DeclareExchange(exchange);
7373

74-
return context.ExchangeDeclare(exchange.ExchangeName, exchange.ExchangeType, exchange.Durable, exchange.AutoDelete, exchange.ExchangeArguments, cancellationToken);
74+
return context.ExchangeDeclare(exchange.ExchangeName, exchange.ExchangeType, exchange.Durable, exchange.AutoDelete, exchange.ExchangeArguments,
75+
cancellationToken);
7576
}
7677

7778
static async Task Declare(ChannelContext context, Queue queue, CancellationToken cancellationToken)
@@ -103,6 +104,7 @@ static async Task Bind(ChannelContext context, ExchangeToQueueBinding binding, C
103104
{
104105
RabbitMqLogMessages.BindToQueue(binding);
105106

106-
await context.QueueBind(binding.Destination.QueueName, binding.Source.ExchangeName, binding.RoutingKey, binding.Arguments, cancellationToken).ConfigureAwait(false);
107+
await context.QueueBind(binding.Destination.QueueName, binding.Source.ExchangeName, binding.RoutingKey, binding.Arguments, cancellationToken)
108+
.ConfigureAwait(false);
107109
}
108110
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqChannelContext.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@ public RabbitMqChannelContext(ConnectionContext connectionContext, IChannel chan
2727
{
2828
ConnectionContext = connectionContext;
2929

30-
_cancellationToken = cancellationToken;
31-
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(connectionContext.CancellationToken, cancellationToken);
32-
3330
_channel = channel;
3431
_agent = agent;
3532

36-
_channel.ContinuationTimeout = ConnectionContext.ContinuationTimeout;
33+
_cancellationToken = cancellationToken;
34+
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(connectionContext.CancellationToken, cancellationToken);
3735
}
3836

3937
public override CancellationToken CancellationToken => _tokenSource?.Token ?? _cancellationToken;
@@ -162,9 +160,8 @@ public async ValueTask DisposeAsync()
162160

163161
await _channel.Cleanup(200, message, CancellationToken).ConfigureAwait(false);
164162

165-
var tokenSource = _tokenSource;
163+
_tokenSource?.Dispose();
166164
_tokenSource = null;
167-
tokenSource.Dispose();
168165
}
169166
}
170167
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/RabbitMqConnectionContext.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ public async Task<IChannel> CreateChannel(ushort? concurrentMessageLimit, Cancel
4949
{
5050
var options = new CreateChannelOptions(PublisherConfirmation, PublisherConfirmation, consumerDispatchConcurrency: concurrentMessageLimit);
5151

52-
return await Connection.CreateChannelAsync(options, cancellationToken).ConfigureAwait(false);
52+
var channel = await Connection.CreateChannelAsync(options, cancellationToken).ConfigureAwait(false);
53+
54+
channel.ContinuationTimeout = ContinuationTimeout;
55+
56+
return channel;
5357
}
5458

5559
public async Task<ChannelContext> CreateChannelContext(IAgent agent, ushort? concurrentMessageLimit, CancellationToken cancellationToken)

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/ScopeChannelContext.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,8 @@ public void NotifyFaulted(Exception exception, Uri contextInputAddress)
135135

136136
public void Dispose()
137137
{
138-
var tokenSource = _tokenSource;
138+
_tokenSource?.Dispose();
139139
_tokenSource = null;
140-
tokenSource.Dispose();
141140
}
142141
}
143142
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/SharedChannelContext.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,8 @@ public void NotifyFaulted(Exception exception, Uri contextInputAddress)
135135

136136
public void Dispose()
137137
{
138-
var tokenSource = _tokenSource;
138+
_tokenSource?.Dispose();
139139
_tokenSource = null;
140-
tokenSource.Dispose();
141140
}
142141
}
143142
}

src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/SharedConnectionContext.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public SharedConnectionContext(ConnectionContext context, CancellationToken canc
2222
{
2323
_context = context;
2424
_cancellationToken = cancellationToken;
25+
2526
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, cancellationToken);
2627
}
2728

@@ -53,9 +54,8 @@ public async Task<ChannelContext> CreateChannelContext(IAgent agent, ushort? con
5354

5455
public void Dispose()
5556
{
56-
var tokenSource = _tokenSource;
57+
_tokenSource?.Dispose();
5758
_tokenSource = null;
58-
tokenSource.Dispose();
5959
}
6060
}
6161
}

0 commit comments

Comments
 (0)