diff --git a/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs b/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs index aeb694c0c7..e4f73c2403 100644 --- a/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs +++ b/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs @@ -26,7 +26,11 @@ public async Task CheckHealthAsync(HealthCheckContext context { if (Options.UsePeekMode) await CheckWithReceiver().ConfigureAwait(false); - else + + if (Options.UseCreateMessageBatchAsyncMode) + await CheckWithSender().ConfigureAwait(false); + + if (Options.UsePeekMode is false && Options.UseCreateMessageBatchAsyncMode is false) await CheckWithManagement().ConfigureAwait(false); return HealthCheckResult.Healthy(); @@ -47,6 +51,17 @@ async Task CheckWithReceiver() await receiver.PeekMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } + async Task CheckWithSender() + { + var client = await ClientCache.GetOrAddAsyncDisposableAsync(ConnectionKey, _ => CreateClient()).ConfigureAwait(false); + var sender = await ClientCache.GetOrAddAsyncDisposableAsync( + _queueKey, + _ => client.CreateSender(Options.QueueName)) + .ConfigureAwait(false); + + await sender.CreateMessageBatchAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + } + Task CheckWithManagement() { var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient()); diff --git a/src/HealthChecks.AzureServiceBus/AzureServiceBusTopicHealthCheck.cs b/src/HealthChecks.AzureServiceBus/AzureServiceBusTopicHealthCheck.cs index 42c8398d9c..1ceeeba0fd 100644 --- a/src/HealthChecks.AzureServiceBus/AzureServiceBusTopicHealthCheck.cs +++ b/src/HealthChecks.AzureServiceBus/AzureServiceBusTopicHealthCheck.cs @@ -5,10 +5,14 @@ namespace HealthChecks.AzureServiceBus; public class AzureServiceBusTopicHealthCheck : AzureServiceBusHealthCheck, IHealthCheck { + private readonly string _topicKey; + public AzureServiceBusTopicHealthCheck(AzureServiceBusTopicHealthCheckOptions options, ServiceBusClientProvider clientProvider) : base(options, clientProvider) { Guard.ThrowIfNull(options.TopicName, true); + + _topicKey = $"{nameof(AzureServiceBusTopicHealthCheck)}_{ConnectionKey}_{Options.TopicName}"; } public AzureServiceBusTopicHealthCheck(AzureServiceBusTopicHealthCheckOptions options) @@ -21,9 +25,10 @@ public async Task CheckHealthAsync(HealthCheckContext context { try { - var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient()); - - _ = await managementClient.GetTopicRuntimePropertiesAsync(Options.TopicName, cancellationToken).ConfigureAwait(false); + if (Options.UseCreateMessageBatchAsyncMode) + await CheckWithSender().ConfigureAwait(false); + else + await CheckWithManagement().ConfigureAwait(false); return HealthCheckResult.Healthy(); } @@ -31,5 +36,23 @@ public async Task CheckHealthAsync(HealthCheckContext context { return new HealthCheckResult(context.Registration.FailureStatus, exception: ex); } + + async Task CheckWithSender() + { + var client = await ClientCache.GetOrAddAsyncDisposableAsync(ConnectionKey, _ => CreateClient()).ConfigureAwait(false); + var sender = await ClientCache.GetOrAddAsyncDisposableAsync( + _topicKey, + _ => client.CreateSender(Options.TopicName)) + .ConfigureAwait(false); + + await sender.CreateMessageBatchAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + } + + Task CheckWithManagement() + { + var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient()); + + return managementClient.GetTopicRuntimePropertiesAsync(Options.TopicName, cancellationToken); + } } } diff --git a/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusQueueHealthCheckOptions.cs b/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusQueueHealthCheckOptions.cs index d4969c3986..fe8bea5d9c 100644 --- a/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusQueueHealthCheckOptions.cs +++ b/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusQueueHealthCheckOptions.cs @@ -21,6 +21,17 @@ public class AzureServiceBusQueueHealthCheckOptions : AzureServiceBusHealthCheck /// public bool UsePeekMode { get; set; } = true; + /// + /// Will use CreateMessageBatchAsync method to determine status if set to (default), + /// otherwise; will use GetProperties* method. + /// + /// + /// CreateMessageBatch requires Send claim to work. However, if only Receiver claim using the Azure built-in roles (RBAC) + /// Azure Service Bus Data Receiver + /// is used set this to . By default . + /// + public bool UseCreateMessageBatchAsyncMode { get; set; } = true; + public AzureServiceBusQueueHealthCheckOptions(string queueName) { QueueName = queueName; diff --git a/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusTopicHealthCheckOptions.cs b/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusTopicHealthCheckOptions.cs index 2a6baf33f4..f3707d63e8 100644 --- a/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusTopicHealthCheckOptions.cs +++ b/src/HealthChecks.AzureServiceBus/Configuration/AzureServiceBusTopicHealthCheckOptions.cs @@ -10,6 +10,17 @@ public class AzureServiceBusTopicHealthCheckOptions : AzureServiceBusHealthCheck /// public string TopicName { get; set; } + /// + /// Will use CreateMessageBatchAsync method to determine status if set to (default), + /// otherwise; will use GetProperties* method. + /// + /// + /// CreateMessageBatch requires Send claim to work. However, if only Receiver claim using the Azure built-in roles (RBAC) + /// Azure Service Bus Data Receiver + /// is used set this to . By default . + /// + public bool UseCreateMessageBatchAsyncMode { get; set; } = true; + public AzureServiceBusTopicHealthCheckOptions(string topicName) { TopicName = topicName; diff --git a/src/HealthChecks.AzureServiceBus/HealthChecks.AzureServiceBus.csproj b/src/HealthChecks.AzureServiceBus/HealthChecks.AzureServiceBus.csproj index 9b95489fe9..9784cbe34c 100644 --- a/src/HealthChecks.AzureServiceBus/HealthChecks.AzureServiceBus.csproj +++ b/src/HealthChecks.AzureServiceBus/HealthChecks.AzureServiceBus.csproj @@ -10,5 +10,4 @@ - diff --git a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs index e186d5ae37..0feab6d5db 100644 --- a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs +++ b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs @@ -14,8 +14,12 @@ public class azureservicebusqueuehealthcheck_should private readonly string ConnectionString; private readonly string FullyQualifiedName; private readonly string QueueName; + private readonly string OtherQueueName; private readonly ServiceBusClient _serviceBusClient; - private readonly ServiceBusReceiver _serviceBusReceiver; + private readonly ServiceBusReceiver _serviceBusQueueReceiver; + private readonly ServiceBusSender _serviceBusQueueSender; + private readonly ServiceBusReceiver _serviceBusOtherQueueReceiver; + private readonly ServiceBusSender _serviceBusOtherQueueSender; private readonly ServiceBusClientProvider _clientProvider; private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient; private readonly TokenCredential _tokenCredential; @@ -25,9 +29,13 @@ public azureservicebusqueuehealthcheck_should() ConnectionString = Guid.NewGuid().ToString(); FullyQualifiedName = Guid.NewGuid().ToString(); QueueName = Guid.NewGuid().ToString(); + OtherQueueName = Guid.NewGuid().ToString(); _serviceBusClient = Substitute.For(); - _serviceBusReceiver = Substitute.For(); + _serviceBusQueueReceiver = Substitute.For(); + _serviceBusQueueSender = Substitute.For(); + _serviceBusOtherQueueReceiver = Substitute.For(); + _serviceBusOtherQueueSender = Substitute.For(); _clientProvider = Substitute.For(); _serviceBusAdministrationClient = Substitute.For(); _tokenCredential = Substitute.For(); @@ -36,28 +44,40 @@ public azureservicebusqueuehealthcheck_should() _clientProvider.CreateClient(FullyQualifiedName, _tokenCredential).Returns(_serviceBusClient); _clientProvider.CreateManagementClient(ConnectionString).Returns(_serviceBusAdministrationClient); _clientProvider.CreateManagementClient(FullyQualifiedName, _tokenCredential).Returns(_serviceBusAdministrationClient); - _serviceBusClient.CreateReceiver(QueueName).Returns(_serviceBusReceiver); + _serviceBusClient.CreateReceiver(QueueName).Returns(_serviceBusQueueReceiver); + _serviceBusClient.CreateSender(QueueName).Returns(_serviceBusQueueSender); + _serviceBusClient.CreateReceiver(OtherQueueName).Returns(_serviceBusOtherQueueReceiver); + _serviceBusClient.CreateSender(OtherQueueName).Returns(_serviceBusOtherQueueSender); } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task can_create_client_with_connection_string(bool peakMode) + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public async Task can_create_client_with_connection_string(bool peakMode, bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); await ExecuteHealthCheckAsync( QueueName, peakMode, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); - if (peakMode) + if (peakMode || createMessageBatchAsyncMode) { _clientProvider .Received(1) .CreateClient(ConnectionString); } + else if (peakMode && createMessageBatchAsyncMode) + { + _clientProvider + .Received(2) + .CreateClient(ConnectionString); + } else { _clientProvider @@ -67,38 +87,70 @@ await ExecuteHealthCheckAsync( } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task reuses_existing_client_when_using_same_connection_string_with_different_queue(bool peakMode) + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public async Task reuses_existing_client_when_using_same_connection_string_with_different_queue(bool peakMode, bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); - var otherQueueName = Guid.NewGuid().ToString(); await ExecuteHealthCheckAsync( QueueName, peakMode, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); await ExecuteHealthCheckAsync( - otherQueueName, + OtherQueueName, peakMode, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); - if (peakMode) + if (peakMode || createMessageBatchAsyncMode) { _clientProvider .Received(1) .CreateClient(ConnectionString); - _serviceBusClient - .Received(1) - .CreateReceiver(QueueName); - - _serviceBusClient - .Received(1) - .CreateReceiver(otherQueueName); + if (peakMode) + { + _serviceBusClient + .Received(1) + .CreateReceiver(QueueName); + + _serviceBusClient + .Received(1) + .CreateReceiver(OtherQueueName); + + await _serviceBusQueueReceiver + .Received(1) + .PeekMessageAsync(cancellationToken: tokenSource.Token); + + await _serviceBusOtherQueueReceiver + .Received(1) + .PeekMessageAsync(cancellationToken: tokenSource.Token); + } + else if (createMessageBatchAsyncMode) + { + _serviceBusClient + .Received(1) + .CreateSender(QueueName); + + _serviceBusClient + .Received(1) + .CreateSender(OtherQueueName); + + await _serviceBusQueueSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + + await _serviceBusOtherQueueSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } } else { @@ -112,24 +164,27 @@ await _serviceBusAdministrationClient await _serviceBusAdministrationClient .Received(1) - .GetQueueRuntimePropertiesAsync(otherQueueName, tokenSource.Token); + .GetQueueRuntimePropertiesAsync(OtherQueueName, tokenSource.Token); } } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task can_create_client_with_fully_qualified_endpoint(bool peakMode) + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public async Task can_create_client_with_fully_qualified_endpoint(bool peakMode, bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); await ExecuteHealthCheckAsync( QueueName, peakMode, + createMessageBatchAsyncMode, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); - if (peakMode) + if (peakMode || createMessageBatchAsyncMode) { _clientProvider .Received(1) @@ -144,38 +199,71 @@ await ExecuteHealthCheckAsync( } [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task reuses_existing_client_when_using_same_fully_qualified_name_with_different_queue(bool peakMode) + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public async Task reuses_existing_client_when_using_same_fully_qualified_name_with_different_queue(bool peakMode, bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); - var otherQueueName = Guid.NewGuid().ToString(); await ExecuteHealthCheckAsync( QueueName, peakMode, + createMessageBatchAsyncMode, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); await ExecuteHealthCheckAsync( - otherQueueName, + OtherQueueName, peakMode, + createMessageBatchAsyncMode, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); - if (peakMode) + if (peakMode || createMessageBatchAsyncMode) { _clientProvider .Received(1) .CreateClient(FullyQualifiedName, _tokenCredential); - _serviceBusClient - .Received(1) - .CreateReceiver(QueueName); - - _serviceBusClient - .Received(1) - .CreateReceiver(otherQueueName); + if (peakMode) + { + _serviceBusClient + .Received(1) + .CreateReceiver(QueueName); + + _serviceBusClient + .Received(1) + .CreateReceiver(OtherQueueName); + + await _serviceBusQueueReceiver + .Received(1) + .PeekMessageAsync(cancellationToken: tokenSource.Token); + + await _serviceBusOtherQueueReceiver + .Received(1) + .PeekMessageAsync(cancellationToken: tokenSource.Token); + } + else if (createMessageBatchAsyncMode) + { + _serviceBusClient + .Received(1) + .CreateSender(QueueName); + + _serviceBusClient + .Received(1) + .CreateSender(OtherQueueName); + + await _serviceBusQueueSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + + await _serviceBusOtherQueueSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + + } } else { @@ -189,7 +277,7 @@ await _serviceBusAdministrationClient await _serviceBusAdministrationClient .Received(1) - .GetQueueRuntimePropertiesAsync(otherQueueName, tokenSource.Token); + .GetQueueRuntimePropertiesAsync(OtherQueueName, tokenSource.Token); } } @@ -201,6 +289,7 @@ public async Task return_healthy_when_checking_healthy_service_through_peek_and_ var actual = await ExecuteHealthCheckAsync( QueueName, true, + false, connectionString: ConnectionString, cancellationToken: tokenSource.Token); @@ -210,11 +299,34 @@ public async Task return_healthy_when_checking_healthy_service_through_peek_and_ .Received(1) .CreateReceiver(QueueName); - await _serviceBusReceiver + await _serviceBusQueueReceiver .Received(1) .PeekMessageAsync(cancellationToken: tokenSource.Token); } + [Fact] + public async Task return_healthy_when_checking_healthy_service_through_createmessagebatch_and_connection_string() + { + using var tokenSource = new CancellationTokenSource(); + + var actual = await ExecuteHealthCheckAsync( + QueueName, + false, + true, + connectionString: ConnectionString, + cancellationToken: tokenSource.Token); + + actual.Status.ShouldBe(HealthStatus.Healthy); + + _serviceBusClient + .Received(1) + .CreateSender(QueueName); + + await _serviceBusQueueSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } + [Fact] public async Task return_healthy_when_checking_healthy_service_through_peek_and_fully_qualified_name() { @@ -223,6 +335,7 @@ public async Task return_healthy_when_checking_healthy_service_through_peek_and_ var actual = await ExecuteHealthCheckAsync( QueueName, true, + false, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); @@ -232,35 +345,87 @@ public async Task return_healthy_when_checking_healthy_service_through_peek_and_ .Received(1) .CreateReceiver(QueueName); - await _serviceBusReceiver + await _serviceBusQueueReceiver .Received(1) .PeekMessageAsync(cancellationToken: tokenSource.Token); } [Fact] - public async Task return_unhealthy_when_exception_is_thrown_by_client() + public async Task return_healthy_when_checking_healthy_service_through_createmessagebatch_and_fully_qualified_name() { using var tokenSource = new CancellationTokenSource(); - _serviceBusReceiver + var actual = await ExecuteHealthCheckAsync( + QueueName, + false, + true, + fullyQualifiedName: FullyQualifiedName, + cancellationToken: tokenSource.Token); + + actual.Status.ShouldBe(HealthStatus.Healthy); + + _serviceBusClient + .Received(1) + .CreateSender(QueueName); + + await _serviceBusQueueSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } + + [Theory] + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + public async Task return_unhealthy_when_exception_is_thrown_by_client(bool peakMode, bool createMessageBatchAsyncMode) + { + using var tokenSource = new CancellationTokenSource(); + + _serviceBusQueueReceiver .PeekMessageAsync(cancellationToken: tokenSource.Token) .ThrowsAsyncForAnyArgs(new InvalidOperationException()); + _serviceBusQueueSender + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token) + .ThrowsAsyncForAnyArgs(new InvalidOperationException()); + var actual = await ExecuteHealthCheckAsync( QueueName, - true, + peakMode, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); actual.Status.ShouldBe(HealthStatus.Unhealthy); - _serviceBusClient - .Received(1) - .CreateReceiver(QueueName); + if (peakMode || createMessageBatchAsyncMode) + { + _clientProvider + .Received(1) + .CreateClient(ConnectionString); + } - await _serviceBusReceiver - .Received(1) - .PeekMessageAsync(cancellationToken: tokenSource.Token); + if (peakMode) + { + _serviceBusClient + .ReceivedCalls().Count(call => call.GetMethodInfo().Name == "CreateReceiver" && call.GetArguments()[0]?.Equals(QueueName) == true) + .ShouldBeLessThanOrEqualTo(1); + + _serviceBusQueueReceiver + .ReceivedCalls().Count(call => call.GetMethodInfo().Name == "PeekMessageAsync") + .ShouldBeLessThanOrEqualTo(1); + } + + if (createMessageBatchAsyncMode) + { + _serviceBusClient + .ReceivedCalls().Count(call => call.GetMethodInfo().Name == "CreateSender" && call.GetArguments()[0]?.Equals(QueueName) == true) + .ShouldBeLessThanOrEqualTo(1); + + _serviceBusQueueSender + .ReceivedCalls().Count(call => call.GetMethodInfo().Name == "CreateMessageBatchAsync") + .ShouldBeLessThanOrEqualTo(1); + } } [Fact] @@ -271,6 +436,7 @@ public async Task return_healthy_when_checking_healthy_service_through_administr var actual = await ExecuteHealthCheckAsync( QueueName, false, + false, connectionString: ConnectionString, cancellationToken: tokenSource.Token); @@ -289,6 +455,7 @@ public async Task return_healthy_when_checking_healthy_service_through_administr var actual = await ExecuteHealthCheckAsync( QueueName, false, + false, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); @@ -311,6 +478,7 @@ public async Task return_unhealthy_when_exception_is_thrown_by_administration_cl var actual = await ExecuteHealthCheckAsync( QueueName, false, + false, connectionString: ConnectionString, cancellationToken: tokenSource.Token); @@ -324,6 +492,7 @@ await _serviceBusAdministrationClient private Task ExecuteHealthCheckAsync( string queueName, bool peakMode, + bool createMessageBatchAsyncMode, string? connectionString = null, string? fullyQualifiedName = null, CancellationToken cancellationToken = default) @@ -334,6 +503,7 @@ private Task ExecuteHealthCheckAsync( FullyQualifiedNamespace = fullyQualifiedName, Credential = fullyQualifiedName is null ? null : _tokenCredential, UsePeekMode = peakMode, + UseCreateMessageBatchAsyncMode = createMessageBatchAsyncMode }; var healthCheck = new AzureServiceBusQueueHealthCheck(options, _clientProvider); var context = new HealthCheckContext diff --git a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusTopicHealthCheckTests.cs b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusTopicHealthCheckTests.cs index a131d77788..ab845f1d7f 100644 --- a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusTopicHealthCheckTests.cs +++ b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusTopicHealthCheckTests.cs @@ -1,4 +1,5 @@ using Azure.Core; +using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using HealthChecks.AzureServiceBus.Configuration; using NSubstitute; @@ -13,7 +14,11 @@ public class azureservicebustopichealthcheck_should private readonly string ConnectionString; private readonly string FullyQualifiedName; private readonly string TopicName; + private readonly string OtherTopicName; + private readonly ServiceBusSender _serviceBusTopicSender; + private readonly ServiceBusSender _serviceBusOtherTopicSender; private readonly ServiceBusClientProvider _clientProvider; + private readonly ServiceBusClient _serviceBusClient; private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient; private readonly TokenCredential _tokenCredential; @@ -22,110 +27,202 @@ public azureservicebustopichealthcheck_should() ConnectionString = Guid.NewGuid().ToString(); FullyQualifiedName = Guid.NewGuid().ToString(); TopicName = Guid.NewGuid().ToString(); + OtherTopicName = Guid.NewGuid().ToString(); + _serviceBusClient = Substitute.For(); _clientProvider = Substitute.For(); + _serviceBusTopicSender = Substitute.For(); + _serviceBusOtherTopicSender = Substitute.For(); _serviceBusAdministrationClient = Substitute.For(); _tokenCredential = Substitute.For(); + + _clientProvider.CreateClient(ConnectionString).Returns(_serviceBusClient); + _clientProvider.CreateClient(FullyQualifiedName, _tokenCredential).Returns(_serviceBusClient); _clientProvider.CreateManagementClient(ConnectionString).Returns(_serviceBusAdministrationClient); _clientProvider.CreateManagementClient(FullyQualifiedName, _tokenCredential).Returns(_serviceBusAdministrationClient); + _serviceBusClient.CreateSender(TopicName).Returns(_serviceBusTopicSender); + _serviceBusClient.CreateSender(OtherTopicName).Returns(_serviceBusOtherTopicSender); } - [Fact] - public async Task can_create_client_with_connection_string() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task can_create_client_with_connection_string(bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); await ExecuteHealthCheckAsync( TopicName, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); - _clientProvider - .Received(1) - .CreateManagementClient(ConnectionString); + if (createMessageBatchAsyncMode) + { + _clientProvider + .Received(1) + .CreateClient(ConnectionString); + } + else + { + _clientProvider + .Received(1) + .CreateManagementClient(ConnectionString); + } } - [Fact] - public async Task reuses_existing_client_when_using_same_connection_string_with_different_topic() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task reuses_existing_client_when_using_same_connection_string_with_different_topic(bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); - var otherTopicName = Guid.NewGuid().ToString(); + await ExecuteHealthCheckAsync( TopicName, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); await ExecuteHealthCheckAsync( - otherTopicName, + OtherTopicName, + createMessageBatchAsyncMode, connectionString: ConnectionString, cancellationToken: tokenSource.Token); - _clientProvider - .Received(1) - .CreateManagementClient(ConnectionString); - - await _serviceBusAdministrationClient - .Received(1) - .GetTopicRuntimePropertiesAsync(TopicName, tokenSource.Token); - - await _serviceBusAdministrationClient - .Received(1) - .GetTopicRuntimePropertiesAsync(otherTopicName, tokenSource.Token); + if (createMessageBatchAsyncMode) + { + _clientProvider + .Received(1) + .CreateClient(ConnectionString); + + _serviceBusClient + .Received(1) + .CreateSender(TopicName); + + _serviceBusClient + .Received(1) + .CreateSender(OtherTopicName); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } + else + { + _clientProvider + .Received(1) + .CreateManagementClient(ConnectionString); + + await _serviceBusAdministrationClient + .Received(1) + .GetTopicRuntimePropertiesAsync(TopicName, tokenSource.Token); + + await _serviceBusAdministrationClient + .Received(1) + .GetTopicRuntimePropertiesAsync(OtherTopicName, tokenSource.Token); + } } - [Fact] - public async Task can_create_client_with_fully_qualified_name() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task can_create_client_with_fully_qualified_name(bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); await ExecuteHealthCheckAsync( TopicName, + createMessageBatchAsyncMode, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); - _clientProvider - .Received(1) - .CreateManagementClient(FullyQualifiedName, _tokenCredential); + + if (createMessageBatchAsyncMode) + { + _clientProvider + .Received(1) + .CreateClient(FullyQualifiedName, _tokenCredential); + } + else + { + _clientProvider + .Received(1) + .CreateManagementClient(FullyQualifiedName, _tokenCredential); + } } - [Fact] - public async Task reuses_existing_client_when_using_same_fully_qualified_name_with_different_topic() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task reuses_existing_client_when_using_same_fully_qualified_name_with_different_topic(bool createMessageBatchAsyncMode) { using var tokenSource = new CancellationTokenSource(); - var otherTopicName = Guid.NewGuid().ToString(); await ExecuteHealthCheckAsync( TopicName, + createMessageBatchAsyncMode, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); await ExecuteHealthCheckAsync( - otherTopicName, + OtherTopicName, + createMessageBatchAsyncMode, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); - _clientProvider - .Received(1) - .CreateManagementClient(FullyQualifiedName, _tokenCredential); - - await _serviceBusAdministrationClient - .Received(1) - .GetTopicRuntimePropertiesAsync(TopicName, tokenSource.Token); - - await _serviceBusAdministrationClient - .Received(1) - .GetTopicRuntimePropertiesAsync(otherTopicName, tokenSource.Token); + if (createMessageBatchAsyncMode) + { + _clientProvider + .Received(1) + .CreateClient(FullyQualifiedName, _tokenCredential); + + _serviceBusClient + .Received(1) + .CreateSender(TopicName); + + _serviceBusClient + .Received(1) + .CreateSender(OtherTopicName); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } + else + { + _clientProvider + .Received(1) + .CreateManagementClient(FullyQualifiedName, _tokenCredential); + + await _serviceBusAdministrationClient + .Received(1) + .GetTopicRuntimePropertiesAsync(TopicName, tokenSource.Token); + + await _serviceBusAdministrationClient + .Received(1) + .GetTopicRuntimePropertiesAsync(OtherTopicName, tokenSource.Token); + } } [Fact] - public async Task return_healthy_when_only_checking_healthy_service_through_administration_and_connection_string() + public async Task return_healthy_when_checking_healthy_service_through_administration_and_connection_string() { using var tokenSource = new CancellationTokenSource(); var actual = await ExecuteHealthCheckAsync( TopicName, + false, connectionString: ConnectionString, cancellationToken: tokenSource.Token); @@ -141,12 +238,13 @@ await _serviceBusAdministrationClient } [Fact] - public async Task return_healthy_when_only_checking_healthy_service_through_administration_and_fully_qualified_name() + public async Task return_healthy_when_checking_healthy_service_through_administration_and_fully_qualified_name() { using var tokenSource = new CancellationTokenSource(); var actual = await ExecuteHealthCheckAsync( TopicName, + false, fullyQualifiedName: FullyQualifiedName, cancellationToken: tokenSource.Token); @@ -161,6 +259,50 @@ await _serviceBusAdministrationClient .GetTopicRuntimePropertiesAsync(TopicName, cancellationToken: tokenSource.Token); } + [Fact] + public async Task return_healthy_when_checking_healthy_service_through_createmessagebatch_and_connection_string() + { + using var tokenSource = new CancellationTokenSource(); + + var actual = await ExecuteHealthCheckAsync( + TopicName, + true, + connectionString: ConnectionString, + cancellationToken: tokenSource.Token); + + actual.Status.ShouldBe(HealthStatus.Healthy); + + _serviceBusClient + .Received(1) + .CreateSender(TopicName); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } + + [Fact] + public async Task return_healthy_when_checking_healthy_service_through_createmessagebatch_and_fully_qualified_name() + { + using var tokenSource = new CancellationTokenSource(); + + var actual = await ExecuteHealthCheckAsync( + TopicName, + true, + fullyQualifiedName: FullyQualifiedName, + cancellationToken: tokenSource.Token); + + actual.Status.ShouldBe(HealthStatus.Healthy); + + _serviceBusClient + .Received(1) + .CreateSender(TopicName); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token); + } + [Fact] public async Task return_unhealthy_when_exception_is_thrown_by_administration_client() { @@ -172,6 +314,7 @@ public async Task return_unhealthy_when_exception_is_thrown_by_administration_cl var actual = await ExecuteHealthCheckAsync( TopicName, + false, connectionString: ConnectionString, cancellationToken: tokenSource.Token); @@ -182,8 +325,41 @@ await _serviceBusAdministrationClient .GetTopicRuntimePropertiesAsync(TopicName, cancellationToken: tokenSource.Token); } + [Fact] + public async Task return_unhealthy_when_exception_is_thrown_by_client() + { + using var tokenSource = new CancellationTokenSource(); + + _serviceBusTopicSender + .CreateMessageBatchAsync(cancellationToken: tokenSource.Token) + .ThrowsAsyncForAnyArgs(new InvalidOperationException()); + + var actual = await ExecuteHealthCheckAsync( + TopicName, + true, + connectionString: ConnectionString, + cancellationToken: tokenSource.Token); + + actual.Status.ShouldBe(HealthStatus.Unhealthy); + + _clientProvider + .Received(1) + .CreateClient(ConnectionString); + + + _serviceBusClient + .Received(1) + .CreateSender(TopicName); + + await _serviceBusTopicSender + .Received(1) + .CreateMessageBatchAsync(tokenSource.Token); + + } + private Task ExecuteHealthCheckAsync( string topicName, + bool createMessageBatchAsyncMode, string? connectionString = null, string? fullyQualifiedName = null, CancellationToken cancellationToken = default) @@ -193,6 +369,7 @@ private Task ExecuteHealthCheckAsync( ConnectionString = connectionString, FullyQualifiedNamespace = fullyQualifiedName, Credential = fullyQualifiedName is null ? null : _tokenCredential, + UseCreateMessageBatchAsyncMode = createMessageBatchAsyncMode }; var healthCheck = new AzureServiceBusTopicHealthCheck(options, _clientProvider); var context = new HealthCheckContext