diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 3ace84f..0eedbcb 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -1457,6 +1457,8 @@ def _make_llm_event( prompt_tokens=usage.prompt_tokens if usage is not None else None, completion_tokens=usage.completion_tokens if usage is not None else None, total_tokens=usage.total_tokens if usage is not None else None, + cached_tokens=usage.cached_tokens if usage is not None else None, + cache_creation_tokens=usage.cache_creation_tokens if usage is not None else None, error_type=error_type, error_message=error_message, error_category=error_category, diff --git a/src/openarmature/observability/llm_event.py b/src/openarmature/observability/llm_event.py index e7c53c2..600e146 100644 --- a/src/openarmature/observability/llm_event.py +++ b/src/openarmature/observability/llm_event.py @@ -79,6 +79,16 @@ class LlmEventPayload(BaseModel): prompt_tokens: int | None = None completion_tokens: int | None = None total_tokens: int | None = None + # Cache-stat fields sourced from Response.usage per spec proposal + # 0047 (§5.5.3.1 OA-namespace cache attributes). Absent (None) + # when the provider does not report cache stats; set to 0 when + # the provider reports zero hits (the "reported miss" case, + # distinct from absent). The OTel observer emits the + # openarmature.llm.cache_read.input_tokens span attribute when + # cached_tokens is populated; same conditional for + # cache_creation. + cached_tokens: int | None = None + cache_creation_tokens: int | None = None # error_category is the canonical llm-provider §7 category # (provider_unavailable, etc.) when the failed exception carried # one — the provider caller doesn't have a graph-engine §4 diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 781dfbb..b654de0 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -1199,6 +1199,20 @@ def _handle_llm_event(self, event: NodeEvent) -> None: span.set_attribute("openarmature.llm.usage.completion_tokens", payload.completion_tokens) if payload.total_tokens is not None: span.set_attribute("openarmature.llm.usage.total_tokens", payload.total_tokens) + # Spec proposal 0047 §5.5.3.1: OA-namespace cache attributes. + # Conditional emission per the §5.5.3 convention — the + # absent-vs-zero distinction is preserved: absent (None) + # means the provider did not report cache stats; 0 means + # the provider reported zero hits. OA-namespace per the + # stable-only upstream adoption policy because the upstream + # OTel GenAI cache attributes are at Development status. + if payload.cached_tokens is not None: + span.set_attribute("openarmature.llm.cache_read.input_tokens", payload.cached_tokens) + if payload.cache_creation_tokens is not None: + span.set_attribute( + "openarmature.llm.cache_creation.input_tokens", + payload.cache_creation_tokens, + ) # §5.5.3 GenAI semconv response attributes (gated by # ``disable_genai_semconv``). Tokens mirror the baseline # OA-prefixed usage attributes; finish_reasons wraps the diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index bbe7d6f..d9a43c4 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -121,6 +121,13 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # gen_ai.*) MUST raise at the ``invoke()`` boundary before # any work begins. Two cases (one per reserved prefix). "028-caller-metadata-namespace-rejection", + # v0.41.0 — proposal 0047 (§5.5.3.1 OA-namespace cache + # attributes). Three fixtures cover cache-hit emission (040), + # absence (041 — no prompt_tokens_details on the wire), and + # reported-zero (042 — distinct from absent). + "040-llm-cache-attribute-emission", + "041-llm-cache-attribute-absence", + "042-llm-cache-attribute-reported-zero", # v0.41.0 — proposal 0049 (typed LlmCompletionEvent variant on # the observer event union). Seven fixtures exercise dispatch # shape (050), type discrimination (051), opt-in caller @@ -205,6 +212,12 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_028(spec) elif fixture_id == "038-otel-parallel-branches-dispatch-span": await _run_fixture_038(spec) + elif fixture_id in { + "040-llm-cache-attribute-emission", + "041-llm-cache-attribute-absence", + "042-llm-cache-attribute-reported-zero", + }: + await _run_llm_cache_fixture(spec) elif fixture_id == "050-llm-completion-event-dispatch": await _run_fixture_050(spec) elif fixture_id == "051-llm-completion-event-type-discrimination": @@ -2776,12 +2789,13 @@ def _build_simple_llm_graph( case: Mapping[str, Any], *, populate_caller_metadata: bool, -) -> tuple[Any, type[Any]]: +) -> tuple[Any, type[Any], Any]: """Build a single-node graph that calls the LLM provider against a mock transport. Matches the simple entry → ask → END pattern used by fixtures 050, 051, 052, 053, 056. Returns ``(compiled_graph, - state_cls)`` so the caller can construct State instances without - re-deriving the class. + state_cls, provider)`` — the caller owns the provider's lifecycle + and MUST call ``await provider.aclose()`` after invoke completes + to release the underlying httpx.AsyncClient connection pool. """ import json @@ -2837,7 +2851,7 @@ async def ask_body(_s: Any) -> dict[str, str]: builder = ( GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name) ) - return builder.compile(), state_cls + return builder.compile(), state_cls, provider def _make_state_instance(case: Mapping[str, Any], state_cls: type[Any]) -> Any: @@ -3154,6 +3168,138 @@ async def __call__(self, event: Any) -> None: self.events.append(event) +async def _run_llm_cache_fixture(spec: Mapping[str, Any]) -> None: + """Run the proposal 0047 §5.5.3.1 cache-attribute fixtures (040, + 041, 042). All three share the same simple-shape graph and assert + on ``Response.usage`` cache fields plus the LLM provider span's + ``openarmature.llm.cache_read.input_tokens`` / + ``openarmature.llm.cache_creation.input_tokens`` attribute set. + """ + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + case_name = cast("str", case["name"]) + try: + await _run_llm_cache_fixture_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_llm_cache_fixture_case(case: Mapping[str, Any]) -> None: + """Build a simple LLM-calling graph, capture the response, and + assert on response_usage + llm_span_attributes / + llm_span_attributes_absent expectations. + """ + import json + + import httpx + from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: PLC0415 + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: PLC0415 + InMemorySpanExporter, + ) + + from openarmature.graph import END, GraphBuilder + from openarmature.llm import OpenAIProvider, UserMessage + from openarmature.llm.response import Response + from openarmature.observability.otel import OTelObserver + + from .adapter import build_state_cls + + mock_responses = list(cast("list[dict[str, Any]]", case.get("mock_llm") or [])) + + def _handler(_request: httpx.Request) -> httpx.Response: + if not mock_responses: + raise AssertionError("mock_llm queue exhausted") + spec_resp = mock_responses.pop(0) + body = cast("dict[str, Any]", spec_resp.get("body") or {}) + return httpx.Response( + int(spec_resp.get("status", 200)), + content=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + + provider = OpenAIProvider( + base_url="http://mock-llm.test", + model=_mock_model_from_first_response(case) or "test-model", + api_key="test", + transport=httpx.MockTransport(_handler), + ) + + state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) + state_cls = build_state_cls("LlmCacheFixtureState", state_fields) + + nodes = cast("dict[str, Any]", case["nodes"]) + entry_name = cast("str", case["entry"]) + calls_llm_spec = cast("dict[str, Any]", nodes[entry_name]["calls_llm"]) + stores_in = cast("str", calls_llm_spec.get("stores_response_in", "answer")) + messages_spec = cast("list[dict[str, str]]", calls_llm_spec.get("messages", [])) + messages = [UserMessage(content=m["content"]) for m in messages_spec if m.get("role") == "user"] + + captured_responses: list[Response] = [] + + async def ask_body(_s: Any) -> dict[str, str]: + response = await provider.complete(messages) + captured_responses.append(response) + return {stores_in: response.message.content or ""} + + builder = ( + GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name) + ) + graph = builder.compile() + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + graph.attach_observer(observer) + try: + await graph.invoke(state_cls()) + finally: + await graph.drain() + observer.shutdown() + # OpenAIProvider owns an httpx.AsyncClient; closing it releases + # the connection pool. Matches the convention used by fixture + # 005 / 038 runners elsewhere in this file. + await provider.aclose() + + expected = cast("dict[str, Any]", case["expected"]) + + # ---- Response.usage assertion + expected_usage = cast("dict[str, Any] | None", expected.get("response_usage")) + if expected_usage is not None: + # The cache-attribute fixtures (040/041/042) are single-LLM-call + # by shape — one ``ask`` node, one mocked response. A future + # fixture extending to multi-call would need this assertion to + # loop over captured_responses rather than indexing [0]. + assert len(captured_responses) == 1, ( + f"response_usage assertion expects exactly one LLM call; captured {len(captured_responses)}" + ) + actual_usage = captured_responses[0].usage + for field_name, expected_value in expected_usage.items(): + actual = getattr(actual_usage, field_name) + assert actual == expected_value, ( + f"response_usage.{field_name}: expected {expected_value!r}, got {actual!r}" + ) + + # ---- LLM span attribute assertions + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 1, f"expected exactly one LLM provider span; got {len(llm_spans)}" + llm_span_attrs = dict(llm_spans[0].attributes or {}) + + expected_attrs = cast("dict[str, Any] | None", expected.get("llm_span_attributes")) + if expected_attrs is not None: + for attr_name, expected_value in expected_attrs.items(): + actual = llm_span_attrs.get(attr_name) + assert actual == expected_value, ( + f"llm_span_attributes[{attr_name!r}]: expected {expected_value!r}, got {actual!r}" + ) + + absent_attrs = cast("list[str] | None", expected.get("llm_span_attributes_absent")) + if absent_attrs is not None: + for attr_name in absent_attrs: + assert attr_name not in llm_span_attrs, ( + f"llm_span_attributes_absent: {attr_name!r} unexpectedly present " + f"with value {llm_span_attrs[attr_name]!r}" + ) + + async def _run_typed_event_fixture_case( case: Mapping[str, Any], *, @@ -3171,36 +3317,44 @@ async def _run_typed_event_fixture_case( the same surface. """ collectors, populate_caller_metadata = _parse_typed_observers(case) - graph, state_cls = _build_simple_llm_graph(case, populate_caller_metadata=populate_caller_metadata) - extra: _AllEventsCollector | None = None - if expect_failure and not any(c.filter_event_type is None for c in collectors.values()): - extra = _AllEventsCollector() - final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra) - - expected = cast("dict[str, Any]", case.get("expected") or {}) - if expect_failure: - assert exc is not None, "failure-path fixture expected an exception" - node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error")) - if node_completed: - # Source for the assertion: an unfiltered named collector - # when present, otherwise the failure-path-only extra - # ``_AllEventsCollector``. - unfiltered_named = next((c for c in collectors.values() if c.filter_event_type is None), None) - source = ( - unfiltered_named.events - if unfiltered_named is not None - else (extra.events if extra is not None else []) - ) - _assert_node_completed_event_carries_error(source, node_completed) - else: - if final is None: - raise AssertionError("expected a non-None final state on success path") - observer_expectations = cast("dict[str, Any]", expected.get("observers") or {}) - for name, expectations in observer_expectations.items(): - collector = collectors.get(name) - if collector is None: - raise AssertionError(f"fixture references unknown observer {name!r}") - _assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations)) + graph, state_cls, provider = _build_simple_llm_graph( + case, populate_caller_metadata=populate_caller_metadata + ) + try: + extra: _AllEventsCollector | None = None + if expect_failure and not any(c.filter_event_type is None for c in collectors.values()): + extra = _AllEventsCollector() + final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra) + + expected = cast("dict[str, Any]", case.get("expected") or {}) + if expect_failure: + assert exc is not None, "failure-path fixture expected an exception" + node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error")) + if node_completed: + # Source for the assertion: an unfiltered named collector + # when present, otherwise the failure-path-only extra + # ``_AllEventsCollector``. + unfiltered_named = next((c for c in collectors.values() if c.filter_event_type is None), None) + source = ( + unfiltered_named.events + if unfiltered_named is not None + else (extra.events if extra is not None else []) + ) + _assert_node_completed_event_carries_error(source, node_completed) + else: + if final is None: + raise AssertionError("expected a non-None final state on success path") + observer_expectations = cast("dict[str, Any]", expected.get("observers") or {}) + for name, expectations in observer_expectations.items(): + collector = collectors.get(name) + if collector is None: + raise AssertionError(f"fixture references unknown observer {name!r}") + _assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations)) + finally: + # _build_simple_llm_graph hands ownership of the provider's + # httpx.AsyncClient to the runner; close it to release the + # connection pool. + await provider.aclose() async def _run_fixture_050(spec: Mapping[str, Any]) -> None: @@ -3375,6 +3529,8 @@ async def _ask_body(_s: Any) -> dict[str, str]: for handle in handles: handle.remove() await outer_compiled.drain() + # Release the underlying httpx.AsyncClient connection pool. + await provider.aclose() expected = cast("dict[str, Any]", case.get("expected") or {}) observer_expectations = cast("dict[str, Any]", expected.get("observers") or {}) @@ -3477,6 +3633,8 @@ async def _body(_s: Any, _msgs: Any = msgs, _stores: str = stores_in) -> dict[st for handle in handles: handle.remove() await outer_compiled.drain() + # Release the underlying httpx.AsyncClient connection pool. + await provider.aclose() expected = cast("dict[str, Any]", case.get("expected") or {}) observer_expectations = cast("dict[str, Any]", expected.get("observers") or {}) diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index 469480a..aba457c 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -768,6 +768,74 @@ async def test_complete_negative_cached_tokens_surfaces_as_invalid_response() -> await provider.aclose() +async def test_complete_populates_cached_tokens_on_llm_event_payload() -> None: + # Proposal 0047: _make_llm_event MUST propagate + # Response.usage.cached_tokens onto the sentinel LlmEventPayload + # so observers driving §5.5.3.1 cache attribute emission off the + # sentinel NodeEvent pair have the cache stat available. The + # conformance fixture 040 covers this end-to-end via OTel span + # attribute assertion; this test localizes the assertion to the + # provider-payload boundary so a regression here surfaces + # independently of the observer rendering layer. + from openarmature.observability.llm_event import LlmEventPayload + + events, token = _collecting_dispatch() + transport = _make_openai_response_with_usage( + { + "prompt_tokens": 100, + "completion_tokens": 5, + "total_tokens": 105, + "prompt_tokens_details": {"cached_tokens": 42}, + } + ) + provider = OpenAIProvider(base_url="http://test", model="m", api_key="k", transport=transport) + try: + await provider.complete([UserMessage(content="hi")]) + finally: + await provider.aclose() + _release_dispatch(token) + + completed_payloads = [ + e.pre_state + for e in events + if isinstance(e, NodeEvent) and e.phase == "completed" and isinstance(e.pre_state, LlmEventPayload) + ] + assert len(completed_payloads) == 1 + payload = completed_payloads[0] + assert payload.cached_tokens == 42 + # The OpenAI-compat mapping leaves cache_creation_tokens absent + # per spec §8.1.2; verify the field stays None on the payload. + assert payload.cache_creation_tokens is None + + +async def test_complete_leaves_cached_tokens_none_when_provider_silent() -> None: + # Companion to the populated case: when the wire response omits + # prompt_tokens_details, Response.usage.cached_tokens stays None + # and the LlmEventPayload reflects that. Locks the absent path + # at the provider-payload boundary. + from openarmature.observability.llm_event import LlmEventPayload + + events, token = _collecting_dispatch() + transport = _make_openai_response_with_usage( + {"prompt_tokens": 100, "completion_tokens": 5, "total_tokens": 105} + ) + provider = OpenAIProvider(base_url="http://test", model="m", api_key="k", transport=transport) + try: + await provider.complete([UserMessage(content="hi")]) + finally: + await provider.aclose() + _release_dispatch(token) + + completed_payloads = [ + e.pre_state + for e in events + if isinstance(e, NodeEvent) and e.phase == "completed" and isinstance(e.pre_state, LlmEventPayload) + ] + assert len(completed_payloads) == 1 + assert completed_payloads[0].cached_tokens is None + assert completed_payloads[0].cache_creation_tokens is None + + # RuntimeConfig.from_partial — Python ergonomic introduced alongside # proposal 0032. Wire-layer null-skip already drops Nones; this just # lets callers splat a partial dict without filtering at the call site. diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 0cbf7c9..bb4c2d4 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -643,6 +643,100 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None assert not any(k.startswith("openarmature.prompt.") for k in attrs) +async def _drive_llm_span_with_cached_tokens( + *, + cached_tokens: int | None, + cache_creation_tokens: int | None = None, +) -> dict[str, Any]: + """Drive the OTel observer through a sentinel started/completed + NodeEvent pair with the supplied cache-stat fields on the + completed-phase payload. Returns the LLM-span's attribute map. + """ + from openarmature.graph.events import NodeEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from openarmature.observability.llm_event import LlmEventPayload + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + token = _set_invocation_id("inv-cache") + try: + started = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="started", + pre_state=LlmEventPayload(call_id="cc-cache", model="test-m"), + post_state=None, + error=None, + parent_states=(), + ) + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="cc-cache", + model="test-m", + finish_reason="stop", + prompt_tokens=100, + completion_tokens=5, + total_tokens=105, + cached_tokens=cached_tokens, + cache_creation_tokens=cache_creation_tokens, + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(started) + await observer(completed) + finally: + _reset_invocation_id(token) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 1 + return dict(llm_spans[0].attributes or {}) + + +async def test_llm_span_emits_cache_read_attribute_when_provider_reports_hit() -> None: + # Proposal 0047 §5.5.3.1: openarmature.llm.cache_read.input_tokens + # is set on the LLM span when the payload carries a non-None + # cached_tokens value sourced from Response.usage.cached_tokens. + attrs = await _drive_llm_span_with_cached_tokens(cached_tokens=42) + assert attrs.get("openarmature.llm.cache_read.input_tokens") == 42 + assert "openarmature.llm.cache_creation.input_tokens" not in attrs + + +async def test_llm_span_emits_cache_read_attribute_with_reported_zero() -> None: + # The absent-vs-reported-zero distinction is observable on the + # span: a payload with cached_tokens=0 produces the attribute + # with value 0 (not omitted). + attrs = await _drive_llm_span_with_cached_tokens(cached_tokens=0) + assert attrs.get("openarmature.llm.cache_read.input_tokens") == 0 + + +async def test_llm_span_omits_cache_attribute_when_provider_silent() -> None: + # When the provider doesn't report cache stats (cached_tokens=None + # on the payload), the OTel observer does NOT emit the attribute + # per the §5.5.3 conditional-emission convention. + attrs = await _drive_llm_span_with_cached_tokens(cached_tokens=None) + assert "openarmature.llm.cache_read.input_tokens" not in attrs + assert "openarmature.llm.cache_creation.input_tokens" not in attrs + + +async def test_llm_span_emits_cache_creation_attribute_when_payload_carries_it() -> None: + # The OpenAI-compatible mapping never sources cache_creation_tokens + # (per spec §8.1.2), but the observer side honors the field when + # any future provider populates it. + attrs = await _drive_llm_span_with_cached_tokens(cached_tokens=20, cache_creation_tokens=5) + assert attrs.get("openarmature.llm.cache_read.input_tokens") == 20 + assert attrs.get("openarmature.llm.cache_creation.input_tokens") == 5 + + async def test_disable_llm_spans_skips_llm_provider_span() -> None: """Spec §5.5: ``disable_llm_spans=True`` MUST suppress the LLM-provider span emission while leaving all other spans intact."""