Skip to content

Commit 6803ed9

Browse files
committed
Changed AWS SQS/SNS to pass cancellationToken through all transport APIs, including topology
1 parent ad513f9 commit 6803ed9

14 files changed

Lines changed: 277 additions & 179 deletions

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/AmazonSqsClientContext.cs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,20 @@ public AmazonSqsClientContext(ConnectionContext connectionContext,
4040

4141
public ConnectionContext ConnectionContext { get; }
4242

43-
public Task<TopicInfo> CreateTopic(Topology.Topic topic)
43+
public Task<TopicInfo> CreateTopic(Topology.Topic topic, CancellationToken cancellationToken)
4444
{
45-
return ConnectionContext.GetTopic(topic);
45+
return ConnectionContext.GetTopic(topic, cancellationToken);
4646
}
4747

48-
public Task<QueueInfo> CreateQueue(Queue queue)
48+
public Task<QueueInfo> CreateQueue(Queue queue, CancellationToken cancellationToken)
4949
{
50-
return ConnectionContext.GetQueue(queue);
50+
return ConnectionContext.GetQueue(queue, cancellationToken);
5151
}
5252

53-
public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queue)
53+
public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queue, CancellationToken cancellationToken)
5454
{
55-
var topicInfo = await ConnectionContext.GetTopic(topic).ConfigureAwait(false);
56-
var queueInfo = await ConnectionContext.GetQueue(queue).ConfigureAwait(false);
55+
var topicInfo = await ConnectionContext.GetTopic(topic, cancellationToken).ConfigureAwait(false);
56+
var queueInfo = await ConnectionContext.GetQueue(queue, cancellationToken).ConfigureAwait(false);
5757

5858
Dictionary<string, string> subscriptionAttributes = topic.TopicSubscriptionAttributes.MergeLeft(queue.QueueSubscriptionAttributes)
5959
.ToDictionary(x => x.Key, x => x.Value.ToString()!);
@@ -69,7 +69,7 @@ public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queu
6969
string? subscriptionArn = null;
7070
try
7171
{
72-
var response = await _snsClient.SubscribeAsync(subscribeRequest, CancellationToken).ConfigureAwait(false);
72+
var response = await _snsClient.SubscribeAsync(subscribeRequest, cancellationToken).ConfigureAwait(false);
7373

7474
response.EnsureSuccessfulResponse();
7575

@@ -79,7 +79,7 @@ public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queu
7979
{
8080
try
8181
{
82-
var existingSubscriptions = await _snsClient.ListSubscriptionsByTopicAsync(topicInfo.Arn, CancellationToken).ConfigureAwait(false);
82+
var existingSubscriptions = await _snsClient.ListSubscriptionsByTopicAsync(topicInfo.Arn, cancellationToken).ConfigureAwait(false);
8383
existingSubscriptions.EnsureSuccessfulResponse();
8484

8585
var existingSubscription = existingSubscriptions.Subscriptions.SingleOrDefault(x =>
@@ -88,7 +88,7 @@ public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queu
8888
if (existingSubscription != null)
8989
{
9090
subscriptionArn = existingSubscription.SubscriptionArn;
91-
var attributes = await _snsClient.GetSubscriptionAttributesAsync(subscriptionArn, CancellationToken)
91+
var attributes = await _snsClient.GetSubscriptionAttributesAsync(subscriptionArn, cancellationToken)
9292
.ConfigureAwait(false);
9393

9494
if (attributes.HttpStatusCode is >= HttpStatusCode.OK and < HttpStatusCode.MultipleChoices)
@@ -102,7 +102,7 @@ public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queu
102102
SubscriptionArn = subscriptionArn
103103
};
104104

105-
var updated = await _snsClient.SetSubscriptionAttributesAsync(request, CancellationToken).ConfigureAwait(false);
105+
var updated = await _snsClient.SetSubscriptionAttributesAsync(request, cancellationToken).ConfigureAwait(false);
106106
updated.EnsureSuccessfulResponse();
107107

108108
LogContext.Debug?.Log("Updated subscription attribute: {SubscriptionArn} {Name}={Value}", subscriptionArn, name,
@@ -124,36 +124,36 @@ public async Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queu
124124

125125
var sqsQueueArn = queueInfo.Arn;
126126

127-
return await queueInfo.UpdatePolicy(sqsQueueArn, topicInfo.Arn, CancellationToken).ConfigureAwait(false);
127+
return await queueInfo.UpdatePolicy(sqsQueueArn, topicInfo.Arn, cancellationToken).ConfigureAwait(false);
128128
}
129129

130-
public async Task DeleteTopic(Topology.Topic topic)
130+
public async Task DeleteTopic(Topology.Topic topic, CancellationToken cancellationToken)
131131
{
132-
var topicInfo = await ConnectionContext.GetTopic(topic).ConfigureAwait(false);
132+
var topicInfo = await ConnectionContext.GetTopic(topic, cancellationToken).ConfigureAwait(false);
133133

134134
TransportLogMessages.DeleteTopic(topicInfo.Arn);
135135

136-
var response = await _snsClient.DeleteTopicAsync(topicInfo.Arn, CancellationToken.None).ConfigureAwait(false);
136+
var response = await _snsClient.DeleteTopicAsync(topicInfo.Arn, cancellationToken).ConfigureAwait(false);
137137

138138
response.EnsureSuccessfulResponse();
139139

140140
await ConnectionContext.RemoveTopicByName(topic.EntityName).ConfigureAwait(false);
141141
}
142142

143-
public async Task DeleteQueue(Queue queue)
143+
public async Task DeleteQueue(Queue queue, CancellationToken cancellationToken)
144144
{
145-
var queueInfo = await ConnectionContext.GetQueue(queue).ConfigureAwait(false);
145+
var queueInfo = await ConnectionContext.GetQueue(queue, cancellationToken).ConfigureAwait(false);
146146

147147
TransportLogMessages.DeleteQueue(queueInfo.Url);
148148

149149
foreach (var subscriptionArn in queueInfo.SubscriptionArns)
150150
{
151151
TransportLogMessages.DeleteSubscription(queueInfo.Url, subscriptionArn);
152152

153-
await DeleteQueueSubscription(subscriptionArn).ConfigureAwait(false);
153+
await DeleteQueueSubscription(subscriptionArn, cancellationToken).ConfigureAwait(false);
154154
}
155155

156-
var response = await _sqsClient.DeleteQueueAsync(queueInfo.Url, CancellationToken.None).ConfigureAwait(false);
156+
var response = await _sqsClient.DeleteQueueAsync(queueInfo.Url, cancellationToken).ConfigureAwait(false);
157157

158158
response.EnsureSuccessfulResponse();
159159

@@ -162,28 +162,28 @@ public async Task DeleteQueue(Queue queue)
162162

163163
public async Task Publish(string topicName, PublishBatchRequestEntry request, CancellationToken cancellationToken)
164164
{
165-
var topicInfo = await ConnectionContext.GetTopicByName(topicName).ConfigureAwait(false);
165+
var topicInfo = await ConnectionContext.GetTopicByName(topicName, cancellationToken).ConfigureAwait(false);
166166

167167
await topicInfo.Publish(request, cancellationToken).ConfigureAwait(false);
168168
}
169169

170170
public async Task SendMessage(string queueName, SendMessageBatchRequestEntry request, CancellationToken cancellationToken)
171171
{
172-
var queueInfo = await ConnectionContext.GetQueueByName(queueName).ConfigureAwait(false);
172+
var queueInfo = await ConnectionContext.GetQueueByName(queueName, cancellationToken).ConfigureAwait(false);
173173

174174
await queueInfo.Send(request, cancellationToken).ConfigureAwait(false);
175175
}
176176

177177
public async Task DeleteMessage(string queueName, string receiptHandle, CancellationToken cancellationToken)
178178
{
179-
var queueInfo = await ConnectionContext.GetQueueByName(queueName).ConfigureAwait(false);
179+
var queueInfo = await ConnectionContext.GetQueueByName(queueName, cancellationToken).ConfigureAwait(false);
180180

181181
await queueInfo.Delete(receiptHandle, cancellationToken).ConfigureAwait(false);
182182
}
183183

184184
public async Task PurgeQueue(string queueName, CancellationToken cancellationToken)
185185
{
186-
var queueInfo = await ConnectionContext.GetQueueByName(queueName).ConfigureAwait(false);
186+
var queueInfo = await ConnectionContext.GetQueueByName(queueName, cancellationToken).ConfigureAwait(false);
187187

188188
var response = await _sqsClient.PurgeQueueAsync(queueInfo.Url, cancellationToken).ConfigureAwait(false);
189189

@@ -192,7 +192,7 @@ public async Task PurgeQueue(string queueName, CancellationToken cancellationTok
192192

193193
public async Task<IList<Message>> ReceiveMessages(string queueName, int messageLimit, int waitTime, CancellationToken cancellationToken)
194194
{
195-
var queueInfo = await ConnectionContext.GetQueueByName(queueName).ConfigureAwait(false);
195+
var queueInfo = await ConnectionContext.GetQueueByName(queueName, cancellationToken).ConfigureAwait(false);
196196

197197
var request = new ReceiveMessageRequest(queueInfo.Url)
198198
{
@@ -211,28 +211,28 @@ public async Task<IList<Message>> ReceiveMessages(string queueName, int messageL
211211
return response.Messages ?? new List<Message>();
212212
}
213213

214-
public Task<QueueInfo> GetQueueInfo(string queueName)
214+
public Task<QueueInfo> GetQueueInfo(string queueName, CancellationToken cancellationToken)
215215
{
216-
return ConnectionContext.GetQueueByName(queueName);
216+
return ConnectionContext.GetQueueByName(queueName, cancellationToken);
217217
}
218218

219-
public async Task ChangeMessageVisibility(string queueUrl, string receiptHandle, int seconds)
219+
public async Task ChangeMessageVisibility(string queueUrl, string receiptHandle, int seconds, CancellationToken cancellationToken)
220220
{
221221
var response = await _sqsClient.ChangeMessageVisibilityAsync(new ChangeMessageVisibilityRequest
222222
{
223223
QueueUrl = queueUrl,
224224
ReceiptHandle = receiptHandle,
225225
VisibilityTimeout = seconds
226-
}, CancellationToken).ConfigureAwait(false);
226+
}, cancellationToken).ConfigureAwait(false);
227227

228228
response.EnsureSuccessfulResponse();
229229
}
230230

231-
async Task DeleteQueueSubscription(string subscriptionArn)
231+
async Task DeleteQueueSubscription(string subscriptionArn, CancellationToken cancellationToken)
232232
{
233233
var unsubscribeRequest = new UnsubscribeRequest { SubscriptionArn = subscriptionArn };
234234

235-
var response = await _snsClient.UnsubscribeAsync(unsubscribeRequest, CancellationToken.None).ConfigureAwait(false);
235+
var response = await _snsClient.UnsubscribeAsync(unsubscribeRequest, cancellationToken).ConfigureAwait(false);
236236

237237
response.EnsureSuccessfulResponse();
238238
}

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/AmazonSqsConnectionContext.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public AmazonSqsConnectionContext(IConnection connection, IAmazonSqsHostConfigur
2626

2727
Topology = hostConfiguration.Topology;
2828

29-
_queueCache = new QueueCache(Connection.SqsClient, cancellationToken);
29+
_queueCache = new QueueCache(Connection.SqsClient);
3030
_topicCache = new TopicCache(Connection.SnsClient, cancellationToken);
3131
}
3232

@@ -35,29 +35,29 @@ public AmazonSqsConnectionContext(IConnection connection, IAmazonSqsHostConfigur
3535

3636
public Uri HostAddress => _hostConfiguration.HostAddress;
3737

38-
public Task<QueueInfo> GetQueue(Queue queue)
38+
public Task<QueueInfo> GetQueue(Queue queue, CancellationToken cancellationToken)
3939
{
40-
return _queueCache.Get(queue);
40+
return _queueCache.Get(queue, cancellationToken);
4141
}
4242

43-
public Task<QueueInfo> GetQueueByName(string name)
43+
public Task<QueueInfo> GetQueueByName(string name, CancellationToken cancellationToken)
4444
{
45-
return _queueCache.GetByName(name);
45+
return _queueCache.GetByName(name, cancellationToken);
4646
}
4747

4848
public Task<bool> RemoveQueueByName(string name)
4949
{
5050
return _queueCache.RemoveByName(name);
5151
}
5252

53-
public Task<TopicInfo> GetTopic(Topic topic)
53+
public Task<TopicInfo> GetTopic(Topic topic, CancellationToken cancellationToken)
5454
{
55-
return _topicCache.Get(topic);
55+
return _topicCache.Get(topic, cancellationToken);
5656
}
5757

58-
public Task<TopicInfo> GetTopicByName(string name)
58+
public Task<TopicInfo> GetTopicByName(string name, CancellationToken cancellationToken)
5959
{
60-
return _topicCache.GetByName(name);
60+
return _topicCache.GetByName(name, cancellationToken);
6161
}
6262

6363
public Task<bool> RemoveTopicByName(string name)

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/AmazonSqsReceiveLockContext.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class AmazonSqsReceiveLockContext :
1414
{
1515
readonly CancellationTokenSource _activeTokenSource;
1616
readonly ClientContext _clientContext;
17+
readonly CancellationToken _cancellationToken;
1718
readonly Uri _inputAddress;
1819
readonly Message _message;
1920

@@ -22,13 +23,15 @@ public class AmazonSqsReceiveLockContext :
2223
readonly Task _visibilityTask;
2324
bool _locked;
2425

25-
public AmazonSqsReceiveLockContext(Uri inputAddress, Message message, ReceiveSettings settings, ClientContext clientContext)
26+
public AmazonSqsReceiveLockContext(Uri inputAddress, Message message, ReceiveSettings settings, ClientContext clientContext,
27+
CancellationToken cancellationToken)
2628
{
2729
_startedAt = DateTime.UtcNow;
2830
_inputAddress = inputAddress;
2931
_message = message;
3032
_settings = settings;
3133
_clientContext = clientContext;
34+
_cancellationToken = cancellationToken;
3235
_activeTokenSource = new CancellationTokenSource();
3336
_locked = true;
3437

@@ -41,7 +44,7 @@ public async Task Complete()
4144

4245
try
4346
{
44-
await _clientContext.DeleteMessage(_settings.EntityName, _message.ReceiptHandle).ConfigureAwait(false);
47+
await _clientContext.DeleteMessage(_settings.EntityName, _message.ReceiptHandle, _cancellationToken).ConfigureAwait(false);
4548

4649
await _visibilityTask.ConfigureAwait(false);
4750
}
@@ -72,8 +75,8 @@ public async Task Faulted(Exception exception)
7275

7376
if (!_clientContext.CancellationToken.IsCancellationRequested && _settings.QueueUrl != null)
7477
{
75-
await _clientContext.ChangeMessageVisibility(_settings.QueueUrl, _message.ReceiptHandle, _settings.RedeliverVisibilityTimeout)
76-
.ConfigureAwait(false);
78+
await _clientContext.ChangeMessageVisibility(_settings.QueueUrl, _message.ReceiptHandle, _settings.RedeliverVisibilityTimeout,
79+
_cancellationToken).ConfigureAwait(false);
7780
}
7881

7982
_locked = false;
@@ -135,7 +138,9 @@ await Task.Delay(delay, _activeTokenSource.Token)
135138
break;
136139

137140
if (_settings.QueueUrl != null)
138-
await _clientContext.ChangeMessageVisibility(_settings.QueueUrl, _message.ReceiptHandle, visibilityTimeout).ConfigureAwait(false);
141+
await _clientContext
142+
.ChangeMessageVisibility(_settings.QueueUrl, _message.ReceiptHandle, visibilityTimeout, _cancellationToken)
143+
.ConfigureAwait(false);
139144

140145
if (DateTime.UtcNow - _startedAt.AddSeconds(visibilityTimeout) >= _settings.MaxVisibilityTimeout)
141146
{

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/ClientContext.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,27 @@ public interface ClientContext :
1313
{
1414
ConnectionContext ConnectionContext { get; }
1515

16-
Task<TopicInfo> CreateTopic(Topology.Topic topic);
16+
Task<TopicInfo> CreateTopic(Topology.Topic topic, CancellationToken cancellationToken);
1717

18-
Task<QueueInfo> CreateQueue(Queue queue);
18+
Task<QueueInfo> CreateQueue(Queue queue, CancellationToken cancellationToken);
1919

20-
Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queue);
20+
Task<bool> CreateQueueSubscription(Topology.Topic topic, Queue queue, CancellationToken cancellationToken);
2121

22-
Task DeleteTopic(Topology.Topic topic);
22+
Task DeleteTopic(Topology.Topic topic, CancellationToken cancellationToken);
2323

24-
Task DeleteQueue(Queue queue);
24+
Task DeleteQueue(Queue queue, CancellationToken cancellationToken);
2525

26-
Task Publish(string topicName, PublishBatchRequestEntry request, CancellationToken cancellationToken = default);
26+
Task Publish(string topicName, PublishBatchRequestEntry request, CancellationToken cancellationToken);
2727

2828
Task SendMessage(string queueName, SendMessageBatchRequestEntry request, CancellationToken cancellationToken);
2929

30-
Task DeleteMessage(string queueUrl, string receiptHandle, CancellationToken cancellationToken = default);
30+
Task DeleteMessage(string queueUrl, string receiptHandle, CancellationToken cancellationToken);
3131

3232
Task PurgeQueue(string queueName, CancellationToken cancellationToken);
3333

3434
Task<IList<Message>> ReceiveMessages(string queueName, int messageLimit, int waitTime, CancellationToken cancellationToken);
3535

36-
Task<QueueInfo> GetQueueInfo(string queueName);
36+
Task<QueueInfo> GetQueueInfo(string queueName, CancellationToken cancellationToken);
3737

38-
Task ChangeMessageVisibility(string queueUrl, string receiptHandle, int seconds);
38+
Task ChangeMessageVisibility(string queueUrl, string receiptHandle, int seconds, CancellationToken cancellationToken);
3939
}

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/ConnectionContext.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ public interface ConnectionContext :
2121

2222
IAmazonSqsBusTopology Topology { get; }
2323

24-
Task<QueueInfo> GetQueue(Queue queue);
25-
Task<QueueInfo> GetQueueByName(string name);
24+
Task<QueueInfo> GetQueue(Queue queue, CancellationToken cancellationToken);
25+
Task<QueueInfo> GetQueueByName(string name, CancellationToken cancellationToken);
2626
Task<bool> RemoveQueueByName(string name);
2727

28-
Task<TopicInfo> GetTopic(Topic topic);
29-
Task<TopicInfo> GetTopicByName(string name);
28+
Task<TopicInfo> GetTopic(Topic topic, CancellationToken cancellationToken);
29+
Task<TopicInfo> GetTopicByName(string name, CancellationToken cancellationToken);
3030
Task<bool> RemoveTopicByName(string name);
3131

3232
ClientContext CreateClientContext(CancellationToken cancellationToken);

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/Middleware/AmazonSqsMessageReceiver.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ protected override async Task ActiveAndActualAgentsCompleted(StopContext context
5151

5252
async Task Consume()
5353
{
54-
await GetQueueAttributes().ConfigureAwait(false);
54+
await GetQueueAttributes(_context.CancellationToken).ConfigureAwait(false);
5555

5656
using var algorithm = new RequestRateAlgorithm(new RequestRateAlgorithmOptions
5757
{
@@ -64,7 +64,7 @@ async Task Consume()
6464

6565
Task Handle(Message message, CancellationToken cancellationToken)
6666
{
67-
var lockContext = new AmazonSqsReceiveLockContext(_context.InputAddress, message, _receiveSettings, _client);
67+
var lockContext = new AmazonSqsReceiveLockContext(_context.InputAddress, message, _receiveSettings, _client, cancellationToken);
6868

6969
return _receiveSettings.IsOrdered
7070
? _executorPool.Run(message, () => HandleMessage(message, lockContext), cancellationToken)
@@ -85,9 +85,9 @@ Task Handle(Message message, CancellationToken cancellationToken)
8585
}
8686
}
8787

88-
async Task GetQueueAttributes()
88+
async Task GetQueueAttributes(CancellationToken cancellationToken)
8989
{
90-
var queueInfo = await _client.GetQueueInfo(_receiveSettings.EntityName).ConfigureAwait(false);
90+
var queueInfo = await _client.GetQueueInfo(_receiveSettings.EntityName, cancellationToken).ConfigureAwait(false);
9191

9292
_receiveSettings.QueueUrl = queueInfo.Url;
9393

0 commit comments

Comments
 (0)