-
Notifications
You must be signed in to change notification settings - Fork 866
Expand file tree
/
Copy pathAzureServiceBusSubscriptionHealthCheck.cs
More file actions
67 lines (56 loc) · 2.75 KB
/
AzureServiceBusSubscriptionHealthCheck.cs
File metadata and controls
67 lines (56 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
using HealthChecks.AzureServiceBus.Configuration;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace HealthChecks.AzureServiceBus;
public class AzureServiceBusSubscriptionHealthCheck : AzureServiceBusHealthCheck<AzureServiceBusSubscriptionHealthCheckHealthCheckOptions>, IHealthCheck
{
private readonly string _subscriptionKey;
public AzureServiceBusSubscriptionHealthCheck(AzureServiceBusSubscriptionHealthCheckHealthCheckOptions options, ServiceBusClientProvider clientProvider)
: base(options, clientProvider)
{
Guard.ThrowIfNull(options.TopicName, true);
Guard.ThrowIfNull(options.SubscriptionName, true);
_subscriptionKey = $"{nameof(AzureServiceBusSubscriptionHealthCheck)}_{ConnectionKey}_{Options.TopicName}_{Options.SubscriptionName}";
}
public AzureServiceBusSubscriptionHealthCheck(AzureServiceBusSubscriptionHealthCheckHealthCheckOptions options)
: this(options, new ServiceBusClientProvider())
{ }
/// <inheritdoc />
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
if (Options.UsePeekMode)
await CheckWithReceiver().ConfigureAwait(false);
else
await CheckWithManagement().ConfigureAwait(false);
return HealthCheckResult.Healthy();
}
catch (Exception ex)
{
return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
}
async Task CheckWithReceiver()
{
var client = await ClientCache.GetOrAddAsyncDisposableAsync(ConnectionKey, _ => CreateClient()).ConfigureAwait(false);
var receiver = await ClientCache.GetOrAddAsyncDisposableAsync(
_subscriptionKey,
_ => client.CreateReceiver(Options.TopicName, Options.SubscriptionName))
.ConfigureAwait(false);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var cancel_task = Task.Delay(Timeout.Infinite, cts.Token);
var peek_task = receiver.PeekMessageAsync(cancellationToken: cancellationToken);
var winner = await Task.WhenAny(peek_task, cancel_task).ConfigureAwait(false);
if (winner == peek_task)
{
cts.Cancel();
}
await winner.ConfigureAwait(false);
}
Task CheckWithManagement()
{
var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient());
return managementClient.GetSubscriptionRuntimePropertiesAsync(
Options.TopicName, Options.SubscriptionName, cancellationToken);
}
}
}