diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index e54407c9309c..dc757965f680 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -3,6 +3,9 @@ ## 1.0.0b54 (Unreleased) ### Features Added +- Add `StatsbeatManager.add_metric_callback` to let SDKs/distros add their own metric + observations to built-in statsbeat metrics + ([#47363](https://github.com/Azure/azure-sdk-for-python/pull/47363)) ### Breaking Changes - Customer Facing SDKStats: Renamed metric dimension attributes from snake_case/dotted to camelCase diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md index ffa8878c7e30..5b1f9d356d25 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md @@ -160,6 +160,14 @@ namespace azure.monitor.opentelemetry.exporter.statsbeat def __init__(self) -> None: ... + def add_additional_metric_callbacks( + self, + metric_name: str, + callback: Callable[[CallbackOptions], Iterable[Observation]] + ) -> None: ... + + def get_additional_metric_callbacks(self, metric_name: str) -> Iterable[Callable[[CallbackOptions], Iterable[Observation]]]: ... + def get_current_config(self) -> Optional[StatsbeatConfig]: ... def initialize(self, config: StatsbeatConfig) -> bool: ... diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml index b4d22b0534a0..1a672ccb930f 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml @@ -1,3 +1,3 @@ -apiMdSha256: 4e626c08830ccebb3dab6dab00472543831b70bae5cd17f3bf1432d938991f5b +apiMdSha256: e927f060406b601099194e0e8346efaed5f68fee04cc95eb441de90f67b1b3d6 parserVersion: 0.3.28 pythonVersion: 3.13.14 diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py index b435a95064db..3fea11c08c6c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py @@ -2,8 +2,9 @@ # Licensed under the MIT License. import logging import threading -from typing import Optional, Any, Dict +from typing import Callable, Iterable, List, Optional, Any, Dict +from opentelemetry.metrics import CallbackOptions, Observation from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -161,6 +162,38 @@ def __init__(self) -> None: # Set during first initialization, preserved in shutdown for potential re-initialization self._config: Optional[StatsbeatConfig] = None # type: ignore + # Extra observation callbacks contributed by SDKs/distros. + self._additional_callbacks: Dict[str, List[Callable[[CallbackOptions], Iterable[Observation]]]] = {} + + def add_additional_metric_callbacks( + self, + metric_name: str, + callback: Callable[[CallbackOptions], Iterable[Observation]], + ) -> None: + """Register additional callbacks for a built-in statsbeat metric. + + :param metric_name: Name of the built-in statsbeat metric. + :type metric_name: str + :param callback: Callback that yields observations for the metric. + :type callback: Callable[[~opentelemetry.metrics.CallbackOptions], Iterable[~opentelemetry.metrics.Observation]] + """ + callbacks = self._additional_callbacks.setdefault(metric_name, []) + if callback not in callbacks: + callbacks.append(callback) + + def get_additional_metric_callbacks( + self, + metric_name: str, + ) -> Iterable[Callable[[CallbackOptions], Iterable[Observation]]]: + """Return registered callbacks for a built-in statsbeat metric. + + :param metric_name: Name of the built-in statsbeat metric. + :type metric_name: str + :return: Registered callbacks for the provided metric name. + :rtype: Iterable[Callable[[~opentelemetry.metrics.CallbackOptions], Iterable[~opentelemetry.metrics.Observation]]] # pylint: disable=line-too-long + """ + return self._additional_callbacks.get(metric_name, ()) + @staticmethod def _validate_config(config: Optional[StatsbeatConfig]) -> bool: """Validate that a configuration has all required fields. diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py index d0f20511aadf..651ed2f83f1a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py @@ -40,6 +40,9 @@ get_statsbeat_customer_sdkstats_feature_set, get_statsbeat_browser_sdk_loader_feature_set, ) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + _get_additional_observations, +) from azure.monitor.opentelemetry.exporter import _utils @@ -379,6 +382,7 @@ def _get_success_count(self, options: CallbackOptions) -> Iterable[Observation]: if count != 0: observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 0 + observations.extend(_get_additional_observations(_REQ_SUCCESS_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -393,6 +397,7 @@ def _get_failure_count(self, options: CallbackOptions) -> Iterable[Observation]: attributes["statusCode"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_FAILURE_NAME[1]][code] = 0 # type: ignore + observations.extend(_get_additional_observations(_REQ_FAILURE_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -409,6 +414,7 @@ def _get_average_duration(self, options: CallbackOptions) -> Iterable[Observatio observations.append(Observation(result * 1000, dict(attributes))) _REQUESTS_MAP[_REQ_DURATION_NAME[1]] = 0 _REQUESTS_MAP["count"] = 0 + observations.extend(_get_additional_observations(_REQ_DURATION_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -423,6 +429,7 @@ def _get_retry_count(self, options: CallbackOptions) -> Iterable[Observation]: attributes["statusCode"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_RETRY_NAME[1]][code] = 0 # type: ignore + observations.extend(_get_additional_observations(_REQ_RETRY_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -437,6 +444,7 @@ def _get_throttle_count(self, options: CallbackOptions) -> Iterable[Observation] attributes["statusCode"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_THROTTLE_NAME[1]][code] = 0 # type: ignore + observations.extend(_get_additional_observations(_REQ_THROTTLE_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -451,6 +459,7 @@ def _get_exception_count(self, options: CallbackOptions) -> Iterable[Observation attributes["exceptionType"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_EXCEPTION_NAME[1]][code] = 0 # type: ignore + observations.extend(_get_additional_observations(_REQ_EXCEPTION_NAME[0], options)) return observations diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py index 07ed150cd33d..1feb90359d54 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py @@ -4,7 +4,8 @@ import logging import json from collections.abc import Iterable # pylint: disable=import-error -from typing import Optional, Dict +from typing import Optional, Dict, List +from opentelemetry.metrics import CallbackOptions, Observation from azure.monitor.opentelemetry.exporter._constants import ( _APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME, @@ -165,3 +166,39 @@ def _get_connection_string_for_region_from_config(target_region: str, settings: "Unexpected error getting stats connection string for region '%s': %s", target_region, str(ex) ) return None + + +def _get_additional_observations(metric_name: str, options: CallbackOptions) -> List[Observation]: + """Return observations contributed by extra callbacks registered on :class:`StatsbeatManager`. + + Invoked by the built-in ``_StatsbeatMetrics`` callbacks at collection time. + Reads callbacks registered on the singleton :class:`StatsbeatManager`. + Exceptions raised by individual callbacks are caught, logged, and skipped. + + :param metric_name: Name of the built-in statsbeat metric being collected. + :type metric_name: str + :param options: OpenTelemetry callback options forwarded to each registered callback. + :type options: ~opentelemetry.metrics.CallbackOptions + :returns: List of observations contributed by registered callbacks. + :rtype: list[~opentelemetry.metrics.Observation] + """ + # Lazy import to avoid a circular import between _manager and _utils. + from azure.monitor.opentelemetry.exporter.statsbeat._manager import ( # pylint: disable=import-outside-toplevel + StatsbeatManager, + ) + + callbacks = StatsbeatManager().get_additional_metric_callbacks(metric_name) + + observations: List[Observation] = [] + iter_logger = logging.getLogger(__name__) + for cb in callbacks: + try: + observations.extend(cb(options)) + except Exception: # pylint: disable=broad-except + iter_logger.debug( + "Extra statsbeat callback %r for %r raised; skipping.", + cb, + metric_name, + exc_info=True, + ) + return observations diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py index 572074e32b31..be3bdfad634c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py @@ -20,6 +20,9 @@ _REQ_SUCCESS_NAME, _REQ_THROTTLE_NAME, ) +from opentelemetry.metrics import Observation +from azure.monitor.opentelemetry.exporter.statsbeat import _utils as statsbeat_utils +from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager from azure.monitor.opentelemetry.exporter.statsbeat._state import ( _REQUESTS_MAP, _STATSBEAT_STATE, @@ -35,6 +38,9 @@ _AttachTypes, _RP_Names, ) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + _get_additional_observations, +) class MockResponse(object): @@ -967,4 +973,115 @@ def test_shorten_host(self): self.assertEqual(_shorten_host(url), "fakehost-5") +# pylint: disable=protected-access +class TestAdditionalObservationCallbacks(unittest.TestCase): + """Tests for statsbeat callback registration and _get_additional_observations.""" + + def setUp(self): + _REQUESTS_MAP.clear() + # Force a fresh StatsbeatManager so its __init__ runs again (which + # rebuilds an empty _additional_callbacks dict on the instance). + StatsbeatManager._instances.pop(StatsbeatManager, None) + + def tearDown(self): + _REQUESTS_MAP.clear() + StatsbeatManager._instances.pop(StatsbeatManager, None) + + @staticmethod + def _register(metric_name, callback): + StatsbeatManager().add_additional_metric_callbacks(metric_name, callback) + + def _make_metric(self): + return _StatsbeatMetrics( + MeterProvider(), + "1aa11111-bbbb-1ccc-8ddd-eeeeffff3334", + "https://westus-1.in.applicationinsights.azure.com/", + False, + 0, + False, + ) + + # ---- _get_additional_observations ---- + + def test_get_unregistered_name_returns_empty(self): + self.assertEqual(_get_additional_observations(_REQ_SUCCESS_NAME[0], None), []) + + def test_get_returns_observations_from_registered_callback(self): + obs = Observation(7, {"endpoint": "ep1"}) + + def cb(_options): + yield obs + + self._register(_REQ_SUCCESS_NAME[0], cb) + self.assertEqual(_get_additional_observations(_REQ_SUCCESS_NAME[0], None), [obs]) + + def test_get_aggregates_across_multiple_callbacks(self): + obs1 = Observation(1, {"endpoint": "ep1"}) + obs2 = Observation(2, {"endpoint": "ep2"}) + self._register(_REQ_SUCCESS_NAME[0], lambda _options: [obs1]) + self._register(_REQ_SUCCESS_NAME[0], lambda _options: [obs2]) + self.assertEqual( + _get_additional_observations(_REQ_SUCCESS_NAME[0], None), + [obs1, obs2], + ) + + def test_get_swallows_callback_exception_and_continues(self): + good_obs = Observation(42, {"endpoint": "ok"}) + + def bad_cb(_options): + raise RuntimeError("boom") + + self._register(_REQ_SUCCESS_NAME[0], bad_cb) + self._register(_REQ_SUCCESS_NAME[0], lambda _options: [good_obs]) + # Should not raise; should still emit the good observation. + self.assertEqual( + _get_additional_observations(_REQ_SUCCESS_NAME[0], None), + [good_obs], + ) + + def test_get_callbacks_for_other_metrics_not_invoked(self): + called = [] + self._register(_REQ_FAILURE_NAME[0], lambda _options: called.append("failure") or []) + _get_additional_observations(_REQ_SUCCESS_NAME[0], None) + self.assertEqual(called, []) + + # ---- integration with built-in callbacks ---- + + def test_success_count_callback_emits_extras(self): + metric = self._make_metric() + _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 5 + + extra = Observation(99, {"endpoint": "extra-ep", "statusCode": 200}) + self._register(_REQ_SUCCESS_NAME[0], lambda _options: [extra]) + + observations = metric._get_success_count(options=None) + + # Built-in observation followed by the extra one. + self.assertEqual(len(observations), 2) + self.assertEqual(observations[0].value, 5) + self.assertIs(observations[-1], extra) + + def test_success_count_callback_unchanged_without_extras(self): + metric = self._make_metric() + _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 3 + + observations = metric._get_success_count(options=None) + + self.assertEqual(len(observations), 1) + self.assertEqual(observations[0].value, 3) + + def test_extras_for_other_metric_do_not_leak_into_success(self): + metric = self._make_metric() + _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 1 + + unrelated = Observation(123, {"endpoint": "other"}) + self._register(_REQ_FAILURE_NAME[0], lambda _options: [unrelated]) + + observations = metric._get_success_count(options=None) + + self.assertEqual(len(observations), 1) + self.assertEqual(observations[0].value, 1) + self.assertNotIn(unrelated, observations) + + # cSpell:enable