Skip to content

Commit ad513f9

Browse files
committed
Updated Azure Session to pass proper cancellation tokens for all operations, including topology configuration
1 parent 79da359 commit ad513f9

19 files changed

Lines changed: 226 additions & 170 deletions

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/ConnectionContext.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace MassTransit.AzureServiceBusTransport
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Azure.Messaging.ServiceBus;
67
using Azure.Messaging.ServiceBus.Administration;
@@ -29,30 +30,35 @@ public interface ConnectionContext :
2930
/// Create a queue in the host namespace (which is scoped to the full ServiceUri)
3031
/// </summary>
3132
/// <param name="createQueueOptions"></param>
33+
/// <param name="cancellationToken"></param>
3234
/// <returns></returns>
33-
Task<QueueProperties> CreateQueue(CreateQueueOptions createQueueOptions);
35+
Task<QueueProperties> CreateQueue(CreateQueueOptions createQueueOptions, CancellationToken cancellationToken);
3436

3537
/// <summary>
3638
/// Create a topic in the root namespace
3739
/// </summary>
3840
/// <param name="createTopicOptions"></param>
41+
/// <param name="cancellationToken"></param>
3942
/// <returns></returns>
40-
Task<TopicProperties> CreateTopic(CreateTopicOptions createTopicOptions);
43+
Task<TopicProperties> CreateTopic(CreateTopicOptions createTopicOptions, CancellationToken cancellationToken);
4144

4245
/// <summary>
4346
/// Create a topic subscription
4447
/// </summary>
4548
/// <param name="createSubscriptionOptions"></param>
4649
/// <param name="rule"></param>
4750
/// <param name="filter"></param>
51+
/// <param name="cancellationToken"></param>
4852
/// <returns></returns>
49-
Task<SubscriptionProperties> CreateTopicSubscription(CreateSubscriptionOptions createSubscriptionOptions, CreateRuleOptions rule, RuleFilter filter);
53+
Task<SubscriptionProperties> CreateTopicSubscription(CreateSubscriptionOptions createSubscriptionOptions, CreateRuleOptions rule, RuleFilter filter,
54+
CancellationToken cancellationToken);
5055

5156
/// <summary>
5257
/// Delete a subscription from the topic
5358
/// </summary>
5459
/// <param name="subscriptionOptions"></param>
60+
/// <param name="cancellationToken"></param>
5561
/// <returns></returns>
56-
Task DeleteTopicSubscription(CreateSubscriptionOptions subscriptionOptions);
62+
Task DeleteTopicSubscription(CreateSubscriptionOptions subscriptionOptions, CancellationToken cancellationToken);
5763
}
5864
}

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/ConnectionContextSupervisor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public ISendEndpointContextSupervisor CreateSendEndpointContextSupervisor(SendSe
6767

6868
var configureTopology = new ConfigureServiceBusTopologyFilter<SendSettings>(settings, settings.GetBrokerTopology(), false);
6969

70-
var contextFactory = new SendEndpointContextFactory(this, configureTopology.ToPipe<SendEndpointContext>(), settings);
70+
var contextFactory = new SendEndpointContextFactory(this, configureTopology, settings);
7171

7272
return new SendEndpointContextSupervisor(contextFactory);
7373
}

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/MessageSendEndpointContext.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace MassTransit.AzureServiceBusTransport
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Azure.Messaging.ServiceBus;
67
using MassTransit.Middleware;
@@ -22,19 +23,19 @@ public MessageSendEndpointContext(ConnectionContext connectionContext, ServiceBu
2223

2324
public string EntityPath => _client.EntityPath;
2425

25-
public Task Send(ServiceBusMessage message)
26+
public Task Send(ServiceBusMessage message, CancellationToken cancellationToken)
2627
{
27-
return _client.SendMessageAsync(message);
28+
return _client.SendMessageAsync(message, cancellationToken);
2829
}
2930

30-
public Task<long> ScheduleSend(ServiceBusMessage message, DateTime scheduleEnqueueTimeUtc)
31+
public Task<long> ScheduleSend(ServiceBusMessage message, DateTime scheduleEnqueueTimeUtc, CancellationToken cancellationToken)
3132
{
32-
return _client.ScheduleMessageAsync(message, scheduleEnqueueTimeUtc);
33+
return _client.ScheduleMessageAsync(message, scheduleEnqueueTimeUtc, cancellationToken);
3334
}
3435

35-
public Task CancelScheduledSend(long sequenceNumber)
36+
public Task CancelScheduledSend(long sequenceNumber, CancellationToken cancellationToken)
3637
{
37-
return _client.CancelScheduledMessageAsync(sequenceNumber);
38+
return _client.CancelScheduledMessageAsync(sequenceNumber, cancellationToken);
3839
}
3940
}
4041
}

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/ServiceBusConnectionContext.cs

Lines changed: 48 additions & 53 deletions
Large diffs are not rendered by default.

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/ServiceBusMessageLockContext.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ namespace MassTransit.AzureServiceBusTransport
22
{
33
using System;
44
using System.Collections.Generic;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Azure.Messaging.ServiceBus;
78
using Util;
@@ -10,21 +11,23 @@ namespace MassTransit.AzureServiceBusTransport
1011
public class ServiceBusMessageLockContext :
1112
MessageLockContext
1213
{
14+
readonly CancellationToken _cancellationToken;
1315
readonly ProcessMessageEventArgs _eventArgs;
1416
readonly ServiceBusReceivedMessage _message;
1517
bool _deadLettered;
1618

17-
public ServiceBusMessageLockContext(ProcessMessageEventArgs eventArgs, ServiceBusReceivedMessage message)
19+
public ServiceBusMessageLockContext(ProcessMessageEventArgs eventArgs, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
1820
{
1921
_eventArgs = eventArgs;
2022
_message = message;
23+
_cancellationToken = cancellationToken;
2124
}
2225

2326
public Task Complete()
2427
{
2528
return _deadLettered
2629
? Task.CompletedTask
27-
: _eventArgs.CompleteMessageAsync(_message);
30+
: _eventArgs.CompleteMessageAsync(_message, _cancellationToken);
2831
}
2932

3033
public Task Abandon(Exception exception)
@@ -34,12 +37,12 @@ public Task Abandon(Exception exception)
3437

3538
(Dictionary<string, object> dictionary, _) = ExceptionUtil.GetExceptionHeaderDetail(exception, ServiceBusSendTransportContext.Adapter);
3639

37-
return _eventArgs.AbandonMessageAsync(_message, dictionary);
40+
return _eventArgs.AbandonMessageAsync(_message, dictionary, _cancellationToken);
3841
}
3942

4043
public async Task DeadLetter()
4144
{
42-
await _eventArgs.DeadLetterMessageAsync(_message, new Dictionary<string, object> { { MessageHeaders.Reason, "dead-letter" } })
45+
await _eventArgs.DeadLetterMessageAsync(_message, new Dictionary<string, object> { { MessageHeaders.Reason, "dead-letter" } }, _cancellationToken)
4346
.ConfigureAwait(false);
4447

4548
_deadLettered = true;
@@ -49,7 +52,7 @@ public async Task DeadLetter(Exception exception)
4952
{
5053
(Dictionary<string, object> dictionary, _) = ExceptionUtil.GetExceptionHeaderDetail(exception, ServiceBusSendTransportContext.Adapter);
5154

52-
await _eventArgs.DeadLetterMessageAsync(_message, dictionary).ConfigureAwait(false);
55+
await _eventArgs.DeadLetterMessageAsync(_message, dictionary, _cancellationToken).ConfigureAwait(false);
5356

5457
_deadLettered = true;
5558
}

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/ServiceBusMessageSessionContext.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace MassTransit.AzureServiceBusTransport
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Azure.Messaging.ServiceBus;
67

@@ -9,20 +10,22 @@ public class ServiceBusMessageSessionContext :
910
MessageSessionContext
1011
{
1112
readonly ProcessSessionMessageEventArgs _session;
13+
readonly CancellationToken _cancellationToken;
1214

13-
public ServiceBusMessageSessionContext(ProcessSessionMessageEventArgs session)
15+
public ServiceBusMessageSessionContext(ProcessSessionMessageEventArgs session, CancellationToken cancellationToken)
1416
{
1517
_session = session;
18+
_cancellationToken = cancellationToken;
1619
}
1720

1821
public Task<BinaryData> GetStateAsync()
1922
{
20-
return _session.GetSessionStateAsync();
23+
return _session.GetSessionStateAsync(_cancellationToken);
2124
}
2225

2326
public Task SetStateAsync(BinaryData state)
2427
{
25-
return _session.SetSessionStateAsync(state);
28+
return _session.SetSessionStateAsync(state, _cancellationToken);
2629
}
2730

2831
public Task RenewLockAsync(ServiceBusReceivedMessage message)

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/ServiceBusReceiveContext.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public ServiceBusReceiveContext(ServiceBusReceivedMessage message, ReceiveEndpoi
2828

2929
public override MessageBody Body { get; }
3030

31+
ulong? ITransportSequenceNumber.SequenceNumber => (ulong)SequenceNumber;
32+
3133
public string MessageId => _message.MessageId;
3234

3335
public string CorrelationId => _message.CorrelationId;
@@ -41,8 +43,6 @@ public ServiceBusReceiveContext(ServiceBusReceivedMessage message, ReceiveEndpoi
4143
public int DeliveryCount => _message.DeliveryCount;
4244

4345
public string Label => _message.Subject;
44-
45-
ulong? ITransportSequenceNumber.SequenceNumber => (ulong)SequenceNumber;
4646
public long SequenceNumber => _message.SequenceNumber;
4747

4848
public long EnqueuedSequenceNumber => _message.EnqueuedSequenceNumber;
@@ -85,7 +85,7 @@ public IDictionary<string, object> GetTransportProperties()
8585

8686
protected override ContentType GetContentType()
8787
{
88-
ContentType contentType = default;
88+
ContentType contentType = null;
8989
if (!string.IsNullOrWhiteSpace(_message.ContentType))
9090
contentType = ConvertToContentType(_message.ContentType);
9191

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/ServiceBusSessionMessageLockContext.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ namespace MassTransit.AzureServiceBusTransport
22
{
33
using System;
44
using System.Collections.Generic;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Azure.Messaging.ServiceBus;
78
using Util;
@@ -10,21 +11,24 @@ namespace MassTransit.AzureServiceBusTransport
1011
public class ServiceBusSessionMessageLockContext :
1112
MessageLockContext
1213
{
14+
readonly CancellationToken _cancellationToken;
1315
readonly ServiceBusReceivedMessage _message;
1416
readonly ProcessSessionMessageEventArgs _session;
1517
bool _deadLettered;
1618

17-
public ServiceBusSessionMessageLockContext(ProcessSessionMessageEventArgs session, ServiceBusReceivedMessage message)
19+
public ServiceBusSessionMessageLockContext(ProcessSessionMessageEventArgs session, ServiceBusReceivedMessage message,
20+
CancellationToken cancellationToken)
1821
{
1922
_session = session;
2023
_message = message;
24+
_cancellationToken = cancellationToken;
2125
}
2226

2327
public Task Complete()
2428
{
2529
return _deadLettered
2630
? Task.CompletedTask
27-
: _session.CompleteMessageAsync(_message);
31+
: _session.CompleteMessageAsync(_message, _cancellationToken);
2832
}
2933

3034
public Task Abandon(Exception exception)
@@ -34,7 +38,7 @@ public Task Abandon(Exception exception)
3438

3539
(Dictionary<string, object> dictionary, _) = ExceptionUtil.GetExceptionHeaderDetail(exception, ServiceBusSendTransportContext.Adapter);
3640

37-
return _session.AbandonMessageAsync(_message, dictionary);
41+
return _session.AbandonMessageAsync(_message, dictionary, _cancellationToken);
3842
}
3943

4044
public async Task DeadLetter()
@@ -43,7 +47,7 @@ public async Task DeadLetter()
4347

4448
var headers = new Dictionary<string, object> { { MessageHeaders.Reason, reason } };
4549

46-
await _session.DeadLetterMessageAsync(_message, headers, reason).ConfigureAwait(false);
50+
await _session.DeadLetterMessageAsync(_message, headers, reason, cancellationToken: _cancellationToken).ConfigureAwait(false);
4751

4852
_deadLettered = true;
4953
}
@@ -54,7 +58,7 @@ public async Task DeadLetter(Exception exception)
5458

5559
(Dictionary<string, object> dictionary, var message) = ExceptionUtil.GetExceptionHeaderDetail(exception, ServiceBusSendTransportContext.Adapter);
5660

57-
await _session.DeadLetterMessageAsync(_message, dictionary, reason, message).ConfigureAwait(false);
61+
await _session.DeadLetterMessageAsync(_message, dictionary, reason, message, _cancellationToken).ConfigureAwait(false);
5862

5963
_deadLettered = true;
6064
}

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/SharedConnectionContext.cs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,25 @@
1010

1111
public class SharedConnectionContext :
1212
ProxyPipeContext,
13-
ConnectionContext
13+
ConnectionContext,
14+
IDisposable
1415
{
16+
readonly CancellationToken _cancellationToken;
1517
readonly ConnectionContext _context;
18+
CancellationTokenSource _tokenSource;
1619

1720
public SharedConnectionContext(ConnectionContext context, CancellationToken cancellationToken)
1821
: base(context)
1922
{
2023
_context = context;
21-
CancellationToken = cancellationToken;
24+
25+
_cancellationToken = cancellationToken;
26+
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, cancellationToken);
2227
}
2328

24-
public override CancellationToken CancellationToken { get; }
29+
public override CancellationToken CancellationToken => _tokenSource?.Token ?? _cancellationToken;
30+
31+
public Uri Endpoint => _context.Endpoint;
2532

2633
public ServiceBusProcessor CreateQueueProcessor(ReceiveSettings settings)
2734
{
@@ -48,27 +55,39 @@ public ServiceBusSender CreateMessageSender(string entityPath)
4855
return _context.CreateMessageSender(entityPath);
4956
}
5057

51-
public Task<QueueProperties> CreateQueue(CreateQueueOptions createQueueOptions)
58+
public Task<QueueProperties> CreateQueue(CreateQueueOptions createQueueOptions, CancellationToken cancellationToken)
5259
{
53-
return _context.CreateQueue(createQueueOptions);
60+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
61+
62+
return _context.CreateQueue(createQueueOptions, tokenSource.Token);
5463
}
5564

56-
public Task<TopicProperties> CreateTopic(CreateTopicOptions createTopicOptions)
65+
public Task<TopicProperties> CreateTopic(CreateTopicOptions createTopicOptions, CancellationToken cancellationToken)
5766
{
58-
return _context.CreateTopic(createTopicOptions);
67+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
68+
69+
return _context.CreateTopic(createTopicOptions, tokenSource.Token);
5970
}
6071

6172
public Task<SubscriptionProperties> CreateTopicSubscription(CreateSubscriptionOptions createSubscriptionOptions, CreateRuleOptions rule,
62-
RuleFilter filter)
73+
RuleFilter filter, CancellationToken cancellationToken)
6374
{
64-
return _context.CreateTopicSubscription(createSubscriptionOptions, rule, filter);
75+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
76+
77+
return _context.CreateTopicSubscription(createSubscriptionOptions, rule, filter, tokenSource.Token);
6578
}
6679

67-
public Task DeleteTopicSubscription(CreateSubscriptionOptions subscriptionOptions)
80+
public Task DeleteTopicSubscription(CreateSubscriptionOptions subscriptionOptions, CancellationToken cancellationToken)
6881
{
69-
return _context.DeleteTopicSubscription(subscriptionOptions);
82+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
83+
84+
return _context.DeleteTopicSubscription(subscriptionOptions, tokenSource.Token);
7085
}
7186

72-
Uri ConnectionContext.Endpoint => _context.Endpoint;
87+
public void Dispose()
88+
{
89+
_tokenSource?.Dispose();
90+
_tokenSource = null;
91+
}
7392
}
7493
}

0 commit comments

Comments
 (0)