diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/IBMMQQueryIntegrationTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/IBMMQQueryIntegrationTests.cs new file mode 100644 index 0000000000..76ae7c5828 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/IBMMQQueryIntegrationTests.cs @@ -0,0 +1,276 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using IBM.WMQ; +using IBM.WMQ.PCF; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; +using NUnit.Framework; +using Transports; +using Transports.IBMMQ; +using Transports.BrokerThroughput; + +[TestFixture] +[Category("IntegrationTests")] +class IBMMQQueryIntegrationTests +{ + const int StatisticsIntervalSeconds = 10; + const int StatisticsWaitSeconds = 25; + static readonly string TestQueueName = "SC.TEST.THROUGHPUT.Q"; + static readonly string ForwardingQueueName = "SC.TEST.STATS.FWD"; + + [OneTimeSetUp] + public async Task EnableStatistics() + { + var (qmName, props) = ConnectionProperties.Parse(ConnectionString); + using var qm = new MQQueueManager(qmName, props); + + // Enable queue-manager-wide statistics with a short interval so the integration + // test can validate stats events without waiting 30+ minutes (the broker default). + ChangeQueueManager(qm, MQC.MQIA_STATISTICS_Q, MQC.MQMON_ON); + ChangeQueueManager(qm, MQC.MQIA_STATISTICS_INTERVAL, StatisticsIntervalSeconds); + + EnsureQueue(qm, TestQueueName); + EnsureQueue(qm, ForwardingQueueName); + + // Drain anything left from prior runs so each test starts from a known empty state. + DrainQueue(qm, "SYSTEM.ADMIN.STATISTICS.QUEUE"); + DrainQueue(qm, ForwardingQueueName); + DrainQueue(qm, TestQueueName); + + await Task.CompletedTask.ConfigureAwait(false); + } + + [Test] + public async Task GetThroughputPerDay_returns_dequeue_counts_from_statistics_messages() + { + const int messagesToPut = 5; + var (qmName, props) = ConnectionProperties.Parse(ConnectionString); + using (var qm = new MQQueueManager(qmName, props)) + { + DrainQueue(qm, "SYSTEM.ADMIN.STATISTICS.QUEUE"); + PutAndGetMessages(qm, TestQueueName, messagesToPut); + } + + // Wait for the broker to emit a statistics interval covering the activity above. + await Task.Delay(TimeSpan.FromSeconds(StatisticsWaitSeconds)).ConfigureAwait(false); + + var query = CreateQuery(); + query.Initialize(EmptySettings()); + Assume.That(query.HasInitialisationErrors(out var initError), Is.False, initError); + + var queues = new List(); + await foreach (var q in query.GetQueueNames(default).ConfigureAwait(false)) + { + queues.Add(q); + } + + var testQueue = queues.FirstOrDefault(q => q.QueueName == TestQueueName); + Assert.That(testQueue, Is.Not.Null, $"GetQueueNames did not enumerate {TestQueueName}"); + + var rows = new List(); + await foreach (var row in query.GetThroughputPerDay(testQueue!, DateOnly.FromDateTime(DateTime.UtcNow.AddDays(-1)), default).ConfigureAwait(false)) + { + rows.Add(row); + } + + Assert.That(rows, Is.Not.Empty, $"No throughput rows returned for {TestQueueName}"); + var total = rows.Sum(r => r.TotalThroughput); + Assert.That(total, Is.GreaterThanOrEqualTo(messagesToPut), + $"Expected at least {messagesToPut} dequeues; got {total}. Rows: {string.Join(", ", rows.Select(r => $"{r.DateUTC}={r.TotalThroughput}"))}"); + } + + [Test] + public async Task Forwarding_publishes_a_copy_of_each_consumed_statistics_message() + { + const int messagesToPut = 3; + var (qmName, props) = ConnectionProperties.Parse(ConnectionString); + using (var qm = new MQQueueManager(qmName, props)) + { + DrainQueue(qm, "SYSTEM.ADMIN.STATISTICS.QUEUE"); + DrainQueue(qm, ForwardingQueueName); + PutAndGetMessages(qm, TestQueueName, messagesToPut); + } + + await Task.Delay(TimeSpan.FromSeconds(StatisticsWaitSeconds)).ConfigureAwait(false); + + var query = CreateQuery(); + query.Initialize(new ReadOnlyDictionary(new Dictionary + { + ["IBMMQ/StatisticsForwardingQueue"] = ForwardingQueueName + })); + Assume.That(query.HasInitialisationErrors(out var initError), Is.False, initError); + + await foreach (var _ in query.GetQueueNames(default).ConfigureAwait(false)) + { + // drain enumeration so the cache is populated as part of GetQueueNames + } + + // After draining, the forwarding queue should now contain a copy of each PCF message. + int forwardedCount; + using (var qm = new MQQueueManager(qmName, props)) + { + forwardedCount = CountMessagesViaBrowse(qm, ForwardingQueueName); + } + + Assert.That(forwardedCount, Is.GreaterThan(0), + $"Expected at least one forwarded statistics message on {ForwardingQueueName} but found {forwardedCount}."); + } + + [Test] + public async Task TestConnection_reports_actionable_error_when_statistics_disabled() + { + var (qmName, props) = ConnectionProperties.Parse(ConnectionString); + using (var qm = new MQQueueManager(qmName, props)) + { + ChangeQueueManager(qm, MQC.MQIA_STATISTICS_Q, MQC.MQMON_OFF); + } + + try + { + var query = CreateQuery(); + query.Initialize(EmptySettings()); + var (success, errors, diagnostics) = await query.TestConnection(default).ConfigureAwait(false); + + Assert.That(success, Is.False); + Assert.That(string.Join("\n", errors), Does.Contain("STATQ=OFF")); + Assert.That(diagnostics, Does.Contain("ALTER QMGR STATQ(ON)")); + } + finally + { + using var qm = new MQQueueManager(qmName, props); + ChangeQueueManager(qm, MQC.MQIA_STATISTICS_Q, MQC.MQMON_ON); + } + } + + static IBMMQQuery CreateQuery() => + new(NullLogger.Instance, new FakeTimeProvider(DateTimeOffset.UtcNow), + new TransportSettings { ConnectionString = ConnectionString }); + + static ReadOnlyDictionary EmptySettings() => + new(new Dictionary()); + + static void ChangeQueueManager(MQQueueManager qm, int parameter, int value) + { + var agent = new PCFMessageAgent(qm); + try + { + var request = new PCFMessage(MQC.MQCMD_CHANGE_Q_MGR); + request.AddParameter(parameter, value); + agent.Send(request); + } + finally + { + agent.Disconnect(); + } + } + + static void EnsureQueue(MQQueueManager qm, string queueName) + { + var agent = new PCFMessageAgent(qm); + try + { + try + { + var inquire = new PCFMessage(MQC.MQCMD_INQUIRE_Q); + inquire.AddParameter(MQC.MQCA_Q_NAME, queueName); + agent.Send(inquire); + return; // exists + } + catch (PCFException ex) when (ex.ReasonCode == MQC.MQRC_UNKNOWN_OBJECT_NAME) + { + // create + } + + var create = new PCFMessage(MQC.MQCMD_CREATE_Q); + create.AddParameter(MQC.MQCA_Q_NAME, queueName); + create.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_LOCAL); + agent.Send(create); + } + finally + { + agent.Disconnect(); + } + } + + static void PutAndGetMessages(MQQueueManager qm, string queueName, int count) + { + using var queue = qm.AccessQueue(queueName, + MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING); + + for (var i = 0; i < count; i++) + { + var msg = new MQMessage(); + msg.WriteString($"throughput-test-{i}"); + queue.Put(msg); + } + + var gmo = new MQGetMessageOptions { Options = MQC.MQGMO_NO_WAIT, WaitInterval = 0 }; + for (var i = 0; i < count; i++) + { + queue.Get(new MQMessage(), gmo); + } + } + + static void DrainQueue(MQQueueManager qm, string queueName) + { + try + { + using var queue = qm.AccessQueue(queueName, + MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING); + var gmo = new MQGetMessageOptions { Options = MQC.MQGMO_NO_WAIT, WaitInterval = 0 }; + while (true) + { + try + { + queue.Get(new MQMessage(), gmo); + } + catch (MQException e) when (e.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE) + { + break; + } + } + } + catch (MQException e) when (e.ReasonCode == MQC.MQRC_UNKNOWN_OBJECT_NAME) + { + // queue does not exist yet; nothing to drain + } + } + + static int CountMessagesViaBrowse(MQQueueManager qm, string queueName) + { + using var queue = qm.AccessQueue(queueName, + MQC.MQOO_BROWSE | MQC.MQOO_FAIL_IF_QUIESCING); + var count = 0; + var gmo = new MQGetMessageOptions + { + Options = MQC.MQGMO_BROWSE_FIRST | MQC.MQGMO_NO_WAIT, + WaitInterval = 0 + }; + while (true) + { + try + { + queue.Get(new MQMessage(), gmo); + count++; + gmo.Options = MQC.MQGMO_BROWSE_NEXT | MQC.MQGMO_NO_WAIT; + } + catch (MQException e) when (e.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE) + { + break; + } + } + return count; + } + + static readonly string ConnectionString = + Environment.GetEnvironmentVariable("ServiceControl_TransportTests_IBMMQ_ConnectionString") + ?? Environment.GetEnvironmentVariable("SERVICECONTROL_TRANSPORTTESTS_IBMMQ_CONNECTIONSTRING") + ?? "mq://admin:passw0rd@localhost:1414/QM1?channel=DEV.ADMIN.SVRCONN"; +} diff --git a/src/ServiceControl.Transports.IBMMQ.Tests/IBMMQQueryTests.cs b/src/ServiceControl.Transports.IBMMQ.Tests/IBMMQQueryTests.cs new file mode 100644 index 0000000000..455f14c8a1 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ.Tests/IBMMQQueryTests.cs @@ -0,0 +1,128 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; +using NUnit.Framework; +using Transports; +using Transports.IBMMQ; + +[TestFixture] +class IBMMQQueryTests +{ + [Test] + public void Settings_lists_three_keys() + { + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), new TransportSettings()); + Assert.That(query.Settings, Has.Length.EqualTo(3)); + Assert.That(query.Settings[0].Key, Is.EqualTo("IBMMQ/ConnectionString")); + Assert.That(query.Settings[1].Key, Is.EqualTo("IBMMQ/StatisticsQueue")); + Assert.That(query.Settings[2].Key, Is.EqualTo("IBMMQ/StatisticsForwardingQueue")); + } + + [Test] + public void MessageTransport_is_IBMMQ() + { + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), new TransportSettings()); + Assert.That(query.MessageTransport, Is.EqualTo("IBMMQ")); + } + + [Test] + public void Initialize_with_no_connection_string_records_actionable_error() + { + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), new TransportSettings()); + + query.Initialize(new ReadOnlyDictionary(new Dictionary())); + + Assert.That(query.HasInitialisationErrors(out var error), Is.True); + Assert.That(error, Does.Contain("connection string")); + } + + [Test] + public void Initialize_uses_setting_override_when_present() + { + var transportSettings = new TransportSettings { ConnectionString = "mq://transport-default-host:1414/QM1" }; + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), transportSettings); + + var settings = new ReadOnlyDictionary(new Dictionary + { + ["IBMMQ/ConnectionString"] = "mq://override-host:9999/QM2" + }); + + // Initialize attempts a real connection; we don't care if it fails — we care that the override + // is recorded in Diagnostics so operators can see which connection string was used. + query.Initialize(settings); + + var (_, _, diagnostics) = query.TestConnection(default).GetAwaiter().GetResult(); + Assert.That(diagnostics, Does.Contain("ConnectionString set via 'IBMMQ/ConnectionString'")); + } + + [Test] + public void Initialize_diagnostics_record_default_connection_source() + { + var transportSettings = new TransportSettings { ConnectionString = "mq://transport-default-host:1414/QM1" }; + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), transportSettings); + + query.Initialize(new ReadOnlyDictionary(new Dictionary())); + + var (_, _, diagnostics) = query.TestConnection(default).GetAwaiter().GetResult(); + Assert.That(diagnostics, Does.Contain("ConnectionString defaulted to the Primary instance's transport connection string")); + } + + [Test] + public void Initialize_records_statistics_queue_setting() + { + var transportSettings = new TransportSettings { ConnectionString = "mq://localhost:1414/QM1" }; + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), transportSettings); + + query.Initialize(new ReadOnlyDictionary(new Dictionary + { + ["IBMMQ/StatisticsQueue"] = "MY.CUSTOM.STATS.QUEUE" + })); + + var (_, _, diagnostics) = query.TestConnection(default).GetAwaiter().GetResult(); + Assert.That(diagnostics, Does.Contain("Statistics queue: MY.CUSTOM.STATS.QUEUE")); + } + + [Test] + public void Initialize_records_default_statistics_queue_when_not_set() + { + var transportSettings = new TransportSettings { ConnectionString = "mq://localhost:1414/QM1" }; + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), transportSettings); + + query.Initialize(new ReadOnlyDictionary(new Dictionary())); + + var (_, _, diagnostics) = query.TestConnection(default).GetAwaiter().GetResult(); + Assert.That(diagnostics, Does.Contain("Statistics queue: SYSTEM.ADMIN.STATISTICS.QUEUE")); + } + + [Test] + public void Initialize_records_forwarding_queue_when_set() + { + var transportSettings = new TransportSettings { ConnectionString = "mq://localhost:1414/QM1" }; + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), transportSettings); + + query.Initialize(new ReadOnlyDictionary(new Dictionary + { + ["IBMMQ/StatisticsForwardingQueue"] = "MY.FORWARD.QUEUE" + })); + + var (_, _, diagnostics) = query.TestConnection(default).GetAwaiter().GetResult(); + Assert.That(diagnostics, Does.Contain("Statistics forwarding queue: MY.FORWARD.QUEUE")); + } + + [Test] + public void Initialize_records_no_forwarding_when_unset() + { + var transportSettings = new TransportSettings { ConnectionString = "mq://localhost:1414/QM1" }; + var query = new IBMMQQuery(NullLogger.Instance, new FakeTimeProvider(), transportSettings); + + query.Initialize(new ReadOnlyDictionary(new Dictionary())); + + var (_, _, diagnostics) = query.TestConnection(default).GetAwaiter().GetResult(); + Assert.That(diagnostics, Does.Contain("Statistics forwarding queue: (not configured)")); + } +} diff --git a/src/ServiceControl.Transports.IBMMQ/IBMMQQuery.cs b/src/ServiceControl.Transports.IBMMQ/IBMMQQuery.cs new file mode 100644 index 0000000000..ba2a164f46 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/IBMMQQuery.cs @@ -0,0 +1,566 @@ +#nullable enable +namespace ServiceControl.Transports.IBMMQ; + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Globalization; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using BrokerThroughput; +using IBM.WMQ; +using IBM.WMQ.PCF; +using Microsoft.Extensions.Logging; + +class IBMMQQuery( + ILogger logger, + TimeProvider timeProvider, + TransportSettings transportSettings) : BrokerThroughputQuery(logger, "IBMMQ") +{ + string queueManagerName = string.Empty; + Hashtable connectionProperties = []; + string statisticsQueueName = IBMMQSettings.DefaultStatisticsQueue; + string? forwardingQueueName; + + readonly SemaphoreSlim cacheLock = new(1, 1); + readonly Dictionary> throughputCache = new(StringComparer.Ordinal); + bool cachePopulated; + + protected override void InitializeCore(ReadOnlyDictionary settings) + { + // Initialize is synchronous and runs at host startup, so it must not block on broker I/O. + // Connection-time validation (broker reachability, STATQ enablement, stats-queue access) is + // performed lazily by TestConnectionCore and the first GatherThroughput run, where failures + // are surfaced through Diagnostics and operational logs without preventing the host from + // starting. Audit-based throughput collection acts as the fallback in those cases. + var connectionString = settings.TryGetValue(IBMMQSettings.ConnectionString, out var overrideConnection) && !string.IsNullOrWhiteSpace(overrideConnection) + ? overrideConnection + : transportSettings.ConnectionString; + + if (string.IsNullOrWhiteSpace(connectionString)) + { + InitialiseErrors.Add("No IBM MQ connection string configured. Set 'IBMMQ/ConnectionString' or configure the Primary instance's transport connection string."); + Diagnostics.AppendLine("ConnectionString not set"); + return; + } + + Diagnostics.AppendLine(settings.ContainsKey(IBMMQSettings.ConnectionString) + ? "ConnectionString set via 'IBMMQ/ConnectionString'" + : "ConnectionString defaulted to the Primary instance's transport connection string"); + + try + { + (queueManagerName, connectionProperties) = ConnectionProperties.Parse(connectionString); + } + catch (Exception ex) + { + InitialiseErrors.Add($"Could not parse IBM MQ connection string: {ex.Message}"); + return; + } + + statisticsQueueName = settings.TryGetValue(IBMMQSettings.StatisticsQueue, out var statsQueue) && !string.IsNullOrWhiteSpace(statsQueue) + ? statsQueue + : IBMMQSettings.DefaultStatisticsQueue; + Diagnostics.AppendLine($"Statistics queue: {statisticsQueueName}"); + + if (settings.TryGetValue(IBMMQSettings.StatisticsForwardingQueue, out var fwdQueue) && !string.IsNullOrWhiteSpace(fwdQueue)) + { + forwardingQueueName = fwdQueue; + Diagnostics.AppendLine($"Statistics forwarding queue: {forwardingQueueName}"); + } + else + { + forwardingQueueName = null; + Diagnostics.AppendLine("Statistics forwarding queue: (not configured)"); + } + } + + public override async IAsyncEnumerable GetQueueNames([EnumeratorCancellation] CancellationToken cancellationToken) + { + ResetCache(); + await PopulateThroughputCacheAsync(cancellationToken).ConfigureAwait(false); + + var queues = await Task.Run(EnumerateUserQueues, cancellationToken).ConfigureAwait(false); + foreach (var queueName in queues) + { + yield return new DefaultBrokerQueue(queueName); + } + } + + public override async IAsyncEnumerable GetThroughputPerDay( + IBrokerQueue brokerQueue, + DateOnly startDate, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await PopulateThroughputCacheAsync(cancellationToken).ConfigureAwait(false); + + if (!throughputCache.TryGetValue(brokerQueue.QueueName, out var perDay)) + { + yield break; + } + + var today = DateOnly.FromDateTime(timeProvider.GetUtcNow().UtcDateTime); + foreach (var (day, total) in perDay.OrderBy(kv => kv.Key)) + { + if (day < startDate || day > today) + { + continue; + } + yield return new QueueThroughput { DateUTC = day, TotalThroughput = total }; + } + } + + public override KeyDescriptionPair[] Settings => + [ + new KeyDescriptionPair(IBMMQSettings.ConnectionString, IBMMQSettings.ConnectionStringDescription), + new KeyDescriptionPair(IBMMQSettings.StatisticsQueue, IBMMQSettings.StatisticsQueueDescription), + new KeyDescriptionPair(IBMMQSettings.StatisticsForwardingQueue, IBMMQSettings.StatisticsForwardingQueueDescription) + ]; + + protected override Task<(bool Success, List Errors)> TestConnectionCore(CancellationToken cancellationToken) => + Task.Run<(bool, List)>(() => + { + var errors = new List(); + try + { + var manager = new MQQueueManager(queueManagerName, connectionProperties); + try + { + Data["IBMMQVersion"] = manager.CommandLevel.ToString(CultureInfo.InvariantCulture); + + try + { + var statsQ = manager.AccessQueue(statisticsQueueName, MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING); + statsQ.Close(); + } + catch (Exception ex) + { + errors.Add( + $"Statistics queue '{statisticsQueueName}' is not accessible: {ex.Message}. " + + "Verify the queue exists and that the connecting user has +get +inq +browse permissions on it."); + } + + if (!InquireQueueManagerStatistics(manager)) + { + errors.Add( + "Statistics collection is disabled on the queue manager (STATQ=OFF). " + + "Broker-side throughput collection is unavailable. ServiceControl will " + + "continue to operate; audit-based and monitoring-based throughput collection " + + "(when configured) provide the same data without requiring queue manager " + + "changes. To enable broker-side collection, run `ALTER QMGR STATQ(ON)`."); + } + } + finally + { + try + { + manager.Disconnect(); + } + catch (Exception ex) + { + logger.LogDebug(ex, "Error disconnecting from queue manager during TestConnection"); + } + } + } + catch (Exception ex) + { + errors.Add($"Could not connect to queue manager '{queueManagerName}': {ex.Message}"); + } + + return (errors.Count == 0, errors); + }, cancellationToken); + + void ResetCache() + { + cacheLock.Wait(); + try + { + throughputCache.Clear(); + cachePopulated = false; + } + finally + { + cacheLock.Release(); + } + } + + async Task PopulateThroughputCacheAsync(CancellationToken cancellationToken) + { + if (cachePopulated) + { + return; + } + + await cacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + if (cachePopulated) + { + return; + } + await Task.Run(DrainStatisticsQueue, cancellationToken).ConfigureAwait(false); + cachePopulated = true; + } + finally + { + cacheLock.Release(); + } + } + + void DrainStatisticsQueue() + { + var manager = new MQQueueManager(queueManagerName, connectionProperties); + MQQueue? statsQueue = null; + MQQueue? forwardingQueue = null; + var processed = 0; + + try + { + statsQueue = manager.AccessQueue(statisticsQueueName, + MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING); + + if (forwardingQueueName is not null) + { + forwardingQueue = manager.AccessQueue(forwardingQueueName, + MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING); + } + + while (true) + { + var message = new MQMessage(); + var getOptions = new MQGetMessageOptions + { + Options = MQC.MQGMO_SYNCPOINT | MQC.MQGMO_FAIL_IF_QUIESCING | MQC.MQGMO_NO_WAIT + }; + + try + { + statsQueue.Get(message, getOptions); + } + catch (MQException e) when (e.ReasonCode == MQC.MQRC_NO_MSG_AVAILABLE) + { + break; + } + + ParseAndAggregate(message); + + if (forwardingQueue is not null) + { + var forwardMessage = CloneMessage(message); + var putOptions = new MQPutMessageOptions { Options = MQC.MQPMO_SYNCPOINT }; + forwardingQueue.Put(forwardMessage, putOptions); + } + + processed++; + } + + manager.Commit(); + if (processed > 0) + { + logger.LogInformation("Drained {Count} IBM MQ statistics messages from {Queue}", processed, statisticsQueueName); + } + } + catch + { + try + { + manager.Backout(); + } + catch (Exception backoutEx) + { + logger.LogWarning(backoutEx, "Backout of statistics drain failed"); + } + throw; + } + finally + { + forwardingQueue?.Close(); + statsQueue?.Close(); + try + { + manager.Disconnect(); + } + catch (Exception disconnectEx) + { + logger.LogDebug(disconnectEx, "Error disconnecting from queue manager after statistics drain"); + } + } + } + + void ParseAndAggregate(MQMessage message) + { + // The IBM .NET PCF API does not implement MQCFGR (group parameter) parsing — its + // PCFMessage(MQMessage) constructor throws "Unknown type" on any statistics message + // because per-queue stats are wrapped in MQGACF_Q_STATISTICS_DATA groups. We therefore + // walk the PCF binary layout directly using the documented MQCFH/MQCFIN/MQCFIN64/MQCFST + // /MQCFGR struct formats. See IBM MQ "Programmable Command Formats reference" for the + // wire layout. + try + { + ParseStatisticsMessage(message); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Skipping malformed PCF message on {Queue}", statisticsQueueName); + } + } + + void ParseStatisticsMessage(MQMessage message) + { + message.Seek(0); + + // MQCFH header: Type, StrucLength, Version, Command, MsgSeqNumber, Control, CompCode, Reason, ParameterCount. + var headerType = message.ReadInt4(); + var headerStrucLength = message.ReadInt4(); + message.SkipBytes(4); // Version + var headerCommand = message.ReadInt4(); + message.SkipBytes(16); // MsgSeqNumber, Control, CompCode, Reason + var paramsRemaining = message.ReadInt4(); + + if (headerStrucLength > 36) + { + message.SkipBytes(headerStrucLength - 36); + } + + if (headerType != MQC.MQCFT_STATISTICS || headerCommand != MQC.MQCMD_STATISTICS_Q) + { + return; + } + + DateOnly? endDate = null; + string? currentQueue = null; + + while (paramsRemaining > 0) + { + paramsRemaining--; + + var paramType = message.ReadInt4(); + var paramStrucLength = message.ReadInt4(); + var bytesConsumed = 8; + + switch (paramType) + { + case MQC.MQCFT_GROUP: + { + var groupParam = message.ReadInt4(); + var groupParamCount = message.ReadInt4(); + bytesConsumed += 8; + if (groupParam == MQGACF_Q_STATISTICS_DATA) + { + currentQueue = null; + } + paramsRemaining += groupParamCount; + break; + } + case MQC.MQCFT_INTEGER: + { + var paramId = message.ReadInt4(); + var value = message.ReadInt4(); + bytesConsumed += 8; + HandleInt(paramId, value, currentQueue, endDate); + break; + } + case MQC.MQCFT_INTEGER64: + { + var paramId = message.ReadInt4(); + message.SkipBytes(4); // reserved + var value = message.ReadInt8(); + bytesConsumed += 16; + HandleInt(paramId, value, currentQueue, endDate); + break; + } + case MQC.MQCFT_INTEGER_LIST: + { + var paramId = message.ReadInt4(); + var count = message.ReadInt4(); + bytesConsumed += 8; + long sum = 0; + for (var i = 0; i < count; i++) + { + sum += message.ReadInt4(); + bytesConsumed += 4; + } + HandleInt(paramId, sum, currentQueue, endDate); + break; + } + case MQC.MQCFT_INTEGER64_LIST: + { + var paramId = message.ReadInt4(); + var count = message.ReadInt4(); + bytesConsumed += 8; + long sum = 0; + for (var i = 0; i < count; i++) + { + sum += message.ReadInt8(); + bytesConsumed += 8; + } + HandleInt(paramId, sum, currentQueue, endDate); + break; + } + case MQC.MQCFT_STRING: + { + var paramId = message.ReadInt4(); + message.SkipBytes(4); // CodedCharSetId + var stringLength = message.ReadInt4(); + bytesConsumed += 12; + var raw = message.ReadString(stringLength); + bytesConsumed += stringLength; + var trimmed = raw.TrimEnd(); + if (paramId == MQC.MQCA_Q_NAME) + { + currentQueue = trimmed; + } + else if (paramId == MQC.MQCAMO_END_DATE) + { + endDate = TryParseStatisticsDate(trimmed); + } + break; + } + default: + // Unknown / uninteresting parameter — skip the rest of its bytes. + break; + } + + if (paramStrucLength > bytesConsumed) + { + message.SkipBytes(paramStrucLength - bytesConsumed); + } + } + + void HandleInt(int paramId, long value, string? queue, DateOnly? day) + { + if (paramId == MQC.MQIAMO_GETS && !string.IsNullOrEmpty(queue) && day.HasValue) + { + AddToCache(queue, day.Value, value); + } + } + } + + static DateOnly? TryParseStatisticsDate(string raw) + { + if (DateOnly.TryParseExact(raw, "yyyy-MM-dd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var d)) + { + return d; + } + return null; + } + + // The .NET MQC class does not expose group-parameter constants. The numeric value is + // taken from IBM MQ's CMQCFC.h: MQGACF_Q_STATISTICS_DATA = 8003. + const int MQGACF_Q_STATISTICS_DATA = 8003; + + void AddToCache(string queueName, DateOnly day, long gets) + { + if (queueName.StartsWith("SYSTEM.", StringComparison.Ordinal)) + { + return; + } + + if (!throughputCache.TryGetValue(queueName, out var perDay)) + { + perDay = []; + throughputCache[queueName] = perDay; + } + + perDay[day] = perDay.GetValueOrDefault(day, 0L) + gets; + } + + List EnumerateUserQueues() + { + var manager = new MQQueueManager(queueManagerName, connectionProperties); + var agent = new PCFMessageAgent(manager); + try + { + var request = new PCFMessage(MQC.MQCMD_INQUIRE_Q); + request.AddParameter(MQC.MQCA_Q_NAME, "*"); + request.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_LOCAL); + var responses = agent.Send(request); + + var queues = new List(responses.Length); + foreach (var response in responses) + { + string name; + try + { + name = response.GetStringParameterValue(MQC.MQCA_Q_NAME).TrimEnd(); + } + catch + { + continue; + } + + if (string.IsNullOrEmpty(name) || name.StartsWith("SYSTEM.", StringComparison.Ordinal)) + { + continue; + } + + queues.Add(name); + } + return queues; + } + finally + { + try + { + agent.Disconnect(); + } + catch (Exception ex) + { + logger.LogDebug(ex, "Error disconnecting PCF agent after queue enumeration"); + } + } + } + + bool InquireQueueManagerStatistics(MQQueueManager manager) + { + var agent = new PCFMessageAgent(manager); + try + { + var request = new PCFMessage(MQC.MQCMD_INQUIRE_Q_MGR); + request.AddParameter(MQC.MQIACF_Q_MGR_ATTRS, new[] { MQC.MQIA_STATISTICS_Q }); + var responses = agent.Send(request); + + foreach (var response in responses) + { + try + { + var value = response.GetIntParameterValue(MQC.MQIA_STATISTICS_Q); + return value == MQC.MQMON_ON; + } + catch + { + continue; + } + } + return false; + } + finally + { + try + { + agent.Disconnect(); + } + catch (Exception ex) + { + logger.LogDebug(ex, "Error disconnecting PCF agent after queue-manager attribute query"); + } + } + } + + static MQMessage CloneMessage(MQMessage source) + { + var clone = new MQMessage + { + Format = source.Format, + CharacterSet = source.CharacterSet, + Encoding = source.Encoding + }; + source.Seek(0); + var bytes = source.ReadBytes(source.MessageLength); + clone.Write(bytes); + return clone; + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ/IBMMQSettings.cs b/src/ServiceControl.Transports.IBMMQ/IBMMQSettings.cs new file mode 100644 index 0000000000..007d6f37d4 --- /dev/null +++ b/src/ServiceControl.Transports.IBMMQ/IBMMQSettings.cs @@ -0,0 +1,22 @@ +#nullable enable +namespace ServiceControl.Transports.IBMMQ; + +static class IBMMQSettings +{ + public const string ConnectionString = "IBMMQ/ConnectionString"; + + public const string ConnectionStringDescription = + "URI-style IBM MQ connection string used by throughput collection. When omitted, the connection string configured for the ServiceControl Primary instance is used. Format: ibmmq://user:password@host:port/QM_NAME?channel=..."; + + public const string StatisticsQueue = "IBMMQ/StatisticsQueue"; + + public const string StatisticsQueueDescription = + "Name of the queue from which IBM MQ statistics PCF messages are read. Defaults to SYSTEM.ADMIN.STATISTICS.QUEUE. Override when another consumer owns the system statistics queue and a forwarder or topic subscription delivers a per-consumer copy of stats messages to a dedicated queue."; + + public const string DefaultStatisticsQueue = "SYSTEM.ADMIN.STATISTICS.QUEUE"; + + public const string StatisticsForwardingQueue = "IBMMQ/StatisticsForwardingQueue"; + + public const string StatisticsForwardingQueueDescription = + "Optional. When set, each statistics message read by ServiceControl is also re-published to this queue in the same transactional unit, allowing other tools to consume their own copy. Mirrors NServiceBus error/audit forwarding semantics. Leave unset for single-consumer setups."; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs b/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs index 467867758e..07017f0acd 100644 --- a/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs +++ b/src/ServiceControl.Transports.IBMMQ/IBMMQTransportCustomization.cs @@ -3,6 +3,7 @@ using System; using System.Linq; using System.Web; +using BrokerThroughput; using Microsoft.Extensions.DependencyInjection; using NServiceBus; using NServiceBus.Transport.IBMMQ; @@ -18,6 +19,9 @@ protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, IBMMQTransport transportDefinition, TransportSettings transportSettings) => transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings) => + services.AddSingleton(); + protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings) { services.AddSingleton();