Add OTelMetricsExporter step for OTLP metrics export#641
Conversation
Introduces OTelMetricsExporter, a pass-through Storey step that records OpenTelemetry metrics from event body fields as a side-effect and forwards each event downstream unchanged. - Supports all 4 synchronous OTel instrument types: gauge (set), counter (add), updown_counter (add), histogram (record) - Both flush modes: periodic (background timer) and immediate (force_flush after every event via run_in_executor) - Single and multi-metric events; per-metric instrument type override via event body field - Configurable field mapping: metric_name_field, value_field, attribute_fields, metrics_field, instrument_type_field - Lazy MeterProvider init on first event; force_flush + shutdown on termination - install: pip install storey[otel] 54 tests: 39 unit (mock exporter) + 15 integration (in-process gRPC receiver, no Docker required), covering all 4 types x both flush modes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds an OpenTelemetry metrics exporting step to Storey pipelines (OTelMetricsExporter) that records metrics as a side effect while forwarding events downstream unchanged, plus unit/integration tests and an optional storey[otel] dependency group.
Changes:
- Introduces
OTelMetricsExporterwith support for gauge/counter/updown_counter/histogram and immediate vs periodic flush modes. - Exposes the step via
storey.__init__and addsotelextras for OpenTelemetry SDK + OTLP gRPC exporter. - Adds comprehensive unit tests (mock exporter) and integration tests (embedded in-process OTLP gRPC receiver).
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
storey/otel_metrics_exporter.py |
New Flow step that records metrics and exports via OTLP gRPC (side-effect), then passes events through. |
storey/__init__.py |
Exposes OTelMetricsExporter from the package top-level. |
setup.py |
Adds storey[otel] optional dependencies for OpenTelemetry metrics + OTLP gRPC exporter. |
tests/test_otel_metrics_exporter.py |
Unit tests for instrument dispatch, validation, pass-through behavior, termination behavior, and field mapping. |
integration/test_otel_metrics_exporter.py |
End-to-end tests using an embedded OTLP gRPC receiver (no Docker). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…erver teardown - Validate per-event instrument type in _register_instrument before creating instrument - Replace math.inf with sys.maxsize for export_interval_millis in immediate mode - Bind integration test gRPC server to port 0 (dynamic) to avoid port conflicts - Call .wait() on server.stop() to ensure full teardown before next test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Wrap force_flush/shutdown in try/except so termination never raises - Validate export_interval_millis and max_instruments are positive in __init__
This reverts commit bee3a66.
| return | ||
| if OTLPMetricExporter is None: | ||
| raise ImportError("Install with: pip install storey[otel]") | ||
| interval = sys.maxsize if self._flush_mode == "immediate" else self._export_interval_millis |
There was a problem hiding this comment.
sys.maxsize as interval for immediate mode — using an astronomically large interval to suppress periodic exports is a hack. The OTel SDK provides ManualReader precisely for this case and makes the intent explicit
There was a problem hiding this comment.
from opentelemetry.sdk.metrics.export import ManualMetricReader
if self._flush_mode == "immediate":
reader = ManualMetricReader(exporter)
else:
reader = PeriodicExportingMetricReader(exporter, export_interval_millis=self._export_interval_millis)
- immediate mode: force_flush per event is expensive — calling force_flush(timeout_millis=5_000) on every single event will be a throughput killer. The docstring should warn that immediate mode is intended for low-volume /
debugging scenarios, and periodic is the recommended default for production. - Missing required field raises KeyError, not ValueError — if metric_name_field or value_field is absent from the event body, the user gets a raw KeyError. A ValueError with a clear message about which field is missing would be
more helpful.
There was a problem hiding this comment.
There is no ManualMetricReader I used math.inf like they metntion on :
- OpenTelemetry Python SDK metrics.export docs (https://opentelemetry-python.readthedocs.io/en/latest/sdk/metrics.export.html)
- opentelemetry-python v1.41.1 source (https://github.com/open-telemetry/opentelemetry-python/blob/v1.41.1/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py)
…g fields - Replace sys.maxsize hack with math.inf to disable periodic collection (officially documented PeriodicExportingMetricReader behavior). - Log shutdown failures via self.logger (fall back to print) instead of silently swallowing exceptions. - Raise ValueError naming the missing field instead of bare KeyError when metric_name_field or value_field is absent from the event body. - Document that flush_mode="immediate" is intended for low-volume or debugging use only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
OTelMetricsExporter, a pass-through pipeline step that exports user-defined domain metrics via OTLP gRPC as a side-effect, forwarding events downstream unchangedgauge,counter,updown_counter,histogram; bothperiodicandimmediateflush modes; single and multi-metric events; configurable field mappingpip install storey[otel]Design notes
Not a target — pass-through only.
OTelMetricsExporteris a mid-pipeline step, not a terminal target. Every event is forwarded downstream unchanged after the metric is recorded. This allows metrics export to be composed with any downstream step (e.g.ParquetTarget,Map, etc.).No ACK / delivery guarantee — by design.
Export is best-effort, matching the standard OTel contract. The OTel SDK handles batching, retries, and delivery to the collector internally. A failed export cycle does not block, error, or retry the event in the Storey pipeline. This is consistent with how every other OTel SDK integration works — the pipeline should not be coupled to metrics delivery.
Test plan
pytest tests/test_otel_metrics_exporter.py(39 tests, mock OTLP exporter)pytest integration/test_otel_metrics_exporter.py(15 tests, embedded in-process gRPC receiver, no Docker required)🤖 Generated with Claude Code