From 1610a49cafb2a5ca152bdf9eae95235f3c13e068 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 22 May 2026 15:30:20 -0700 Subject: [PATCH] Add CPU, memory, and accelerator resource requests to PENDING span execution.resources.cpu, execution.resources.memory, and execution.resources.accelerators sourced from task_spec annotations. Enables distinguishing workload types (e.g. Nebius H100 vs GKE L4) and correlating resource requests to execution timing. --- .../instrumentation/execution_tracing.py | 27 +++++++- .../instrumentation/metrics.py | 2 +- .../instrumentation/test_execution_tracing.py | 65 +++++++++++++++++-- 3 files changed, 87 insertions(+), 7 deletions(-) diff --git a/cloud_pipelines_backend/instrumentation/execution_tracing.py b/cloud_pipelines_backend/instrumentation/execution_tracing.py index f2e4a86..4d9568a 100644 --- a/cloud_pipelines_backend/instrumentation/execution_tracing.py +++ b/cloud_pipelines_backend/instrumentation/execution_tracing.py @@ -15,6 +15,7 @@ from .. import backend_types_sql as bts from ..launchers import common_annotations +from ..launchers import kubernetes_launchers _logger = logging.getLogger(__name__) _tracer = trace.get_tracer("tangle.orchestrator") @@ -144,6 +145,23 @@ def _pipeline_attrs(*, execution: bts.ExecutionNode) -> dict[str, object]: return attrs +def _resource_attrs(*, execution: bts.ExecutionNode, status: str) -> dict[str, object]: + """CPU, memory, and accelerator requests for the PENDING span.""" + if status != bts.ContainerExecutionStatus.PENDING: + return {} + annotations: dict = (execution.task_spec or {}).get("annotations", {}) + attrs: dict[str, object] = {} + if cpu := annotations.get(kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY): + attrs["execution.resources.cpu"] = cpu + if memory := annotations.get(kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY): + attrs["execution.resources.memory"] = memory + if accelerators := annotations.get( + kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY + ): + attrs["execution.resources.accelerators"] = accelerators + return attrs + + def _ns(*, dt: datetime.datetime) -> int: """Return *dt* as nanoseconds since the Unix epoch (required by OTel SDK).""" if dt.tzinfo is None: @@ -189,13 +207,18 @@ def emit_execution_trace(*, execution: bts.ExecutionNode) -> None: "execution.status": entry["status"], **_error_attrs(execution=execution, status=entry["status"]), **_launcher_pod_attrs(execution=execution, status=entry["status"]), + **_resource_attrs(execution=execution, status=entry["status"]), } + start_ns = _ns(dt=t_start) + end_ns = _ns(dt=t_end) + if end_ns <= start_ns: + end_ns = start_ns + 1 _tracer.start_span( f"execution.status {entry['status']}", context=root_ctx, attributes=attrs, - start_time=_ns(dt=t_start), - ).end(end_time=_ns(dt=t_end)) + start_time=start_ns, + ).end(end_time=end_ns) if history[-1]["status"] in _ERROR_TERMINAL_STATUSES: root.set_status(status=StatusCode.ERROR) diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index 2d7ca4b..6e949e8 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -103,4 +103,4 @@ def _handle_before_commit(session: orm.Session) -> None: exc_info=True, ) obj._status_changed = False - execution_tracing.try_emit_execution_trace(execution=obj) + execution_tracing.emit_execution_trace(execution=obj) diff --git a/tests/instrumentation/test_execution_tracing.py b/tests/instrumentation/test_execution_tracing.py index f3e4e50..73d3100 100644 --- a/tests/instrumentation/test_execution_tracing.py +++ b/tests/instrumentation/test_execution_tracing.py @@ -346,7 +346,7 @@ def test_cache_miss_sets_hit_false( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -360,7 +360,7 @@ def test_cache_hit_sets_hit_true_and_reused_from_id( statuses=["QUEUED", "SUCCEEDED"], extra={"reused_from_execution_node_id": "source-execution-id"}, ) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -378,7 +378,7 @@ def test_root_span_carries_parent_and_task_id( execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) execution.parent_execution_id = "parent-exec-id" execution.task_id_in_parent_execution = "my-task" - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" @@ -390,10 +390,67 @@ def test_root_execution_omits_parent_attrs_when_absent( self, span_exporter: InMemorySpanExporter ) -> None: execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) - execution_tracing.try_emit_execution_trace(execution=execution) + execution_tracing.emit_execution_trace(execution=execution) root = next( s for s in span_exporter.get_finished_spans() if s.name == "execution" ) assert "execution.parent_id" not in (root.attributes or {}) assert "execution.task_id" not in (root.attributes or {}) + + +class TestResourceAttrs: + def test_pending_span_carries_cpu_and_memory( + self, span_exporter: InMemorySpanExporter + ) -> None: + from cloud_pipelines_backend.launchers import kubernetes_launchers + + execution = _make_execution( + statuses=["QUEUED", "PENDING", "RUNNING", "SUCCEEDED"] + ) + execution.task_spec = { + "annotations": { + kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY: "4", + kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY: "16Gi", + } + } + execution_tracing.emit_execution_trace(execution=execution) + + pending_span = next( + s + for s in span_exporter.get_finished_spans() + if s.attributes.get("execution.status") == "PENDING" + ) + assert pending_span.attributes["execution.resources.cpu"] == "4" + assert pending_span.attributes["execution.resources.memory"] == "16Gi" + + def test_pending_span_carries_accelerators_when_present( + self, span_exporter: InMemorySpanExporter + ) -> None: + from cloud_pipelines_backend.launchers import kubernetes_launchers + + execution = _make_execution(statuses=["QUEUED", "PENDING", "SUCCEEDED"]) + execution.task_spec = { + "annotations": { + kubernetes_launchers.RESOURCES_ACCELERATORS_ANNOTATION_KEY: '{"H100": 1}', + } + } + execution_tracing.emit_execution_trace(execution=execution) + + pending_span = next( + s + for s in span_exporter.get_finished_spans() + if s.attributes.get("execution.status") == "PENDING" + ) + assert ( + pending_span.attributes["execution.resources.accelerators"] == '{"H100": 1}' + ) + + def test_non_pending_spans_have_no_resource_attrs( + self, span_exporter: InMemorySpanExporter + ) -> None: + execution = _make_execution(statuses=["QUEUED", "SUCCEEDED"]) + execution_tracing.emit_execution_trace(execution=execution) + + for span in span_exporter.get_finished_spans(): + assert "execution.resources.cpu" not in (span.attributes or {})