Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/openarmature/llm/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,8 @@ def _make_llm_event(
prompt_tokens=usage.prompt_tokens if usage is not None else None,
completion_tokens=usage.completion_tokens if usage is not None else None,
total_tokens=usage.total_tokens if usage is not None else None,
cached_tokens=usage.cached_tokens if usage is not None else None,
cache_creation_tokens=usage.cache_creation_tokens if usage is not None else None,
error_type=error_type,
error_message=error_message,
error_category=error_category,
Expand Down
10 changes: 10 additions & 0 deletions src/openarmature/observability/llm_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ class LlmEventPayload(BaseModel):
prompt_tokens: int | None = None
completion_tokens: int | None = None
total_tokens: int | None = None
# Cache-stat fields sourced from Response.usage per spec proposal
# 0047 (§5.5.3.1 OA-namespace cache attributes). Absent (None)
# when the provider does not report cache stats; set to 0 when
# the provider reports zero hits (the "reported miss" case,
# distinct from absent). The OTel observer emits the
# openarmature.llm.cache_read.input_tokens span attribute when
# cached_tokens is populated; same conditional for
# cache_creation.
cached_tokens: int | None = None
cache_creation_tokens: int | None = None
# error_category is the canonical llm-provider §7 category
# (provider_unavailable, etc.) when the failed exception carried
# one — the provider caller doesn't have a graph-engine §4
Expand Down
14 changes: 14 additions & 0 deletions src/openarmature/observability/otel/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,20 @@ def _handle_llm_event(self, event: NodeEvent) -> None:
span.set_attribute("openarmature.llm.usage.completion_tokens", payload.completion_tokens)
if payload.total_tokens is not None:
span.set_attribute("openarmature.llm.usage.total_tokens", payload.total_tokens)
# Spec proposal 0047 §5.5.3.1: OA-namespace cache attributes.
# Conditional emission per the §5.5.3 convention — the
# absent-vs-zero distinction is preserved: absent (None)
# means the provider did not report cache stats; 0 means
# the provider reported zero hits. OA-namespace per the
# stable-only upstream adoption policy because the upstream
# OTel GenAI cache attributes are at Development status.
if payload.cached_tokens is not None:
span.set_attribute("openarmature.llm.cache_read.input_tokens", payload.cached_tokens)
if payload.cache_creation_tokens is not None:
span.set_attribute(
"openarmature.llm.cache_creation.input_tokens",
payload.cache_creation_tokens,
)
# §5.5.3 GenAI semconv response attributes (gated by
# ``disable_genai_semconv``). Tokens mirror the baseline
# OA-prefixed usage attributes; finish_reasons wraps the
Expand Down
226 changes: 192 additions & 34 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
# gen_ai.*) MUST raise at the ``invoke()`` boundary before
# any work begins. Two cases (one per reserved prefix).
"028-caller-metadata-namespace-rejection",
# v0.41.0 — proposal 0047 (§5.5.3.1 OA-namespace cache
# attributes). Three fixtures cover cache-hit emission (040),
# absence (041 — no prompt_tokens_details on the wire), and
# reported-zero (042 — distinct from absent).
"040-llm-cache-attribute-emission",
"041-llm-cache-attribute-absence",
"042-llm-cache-attribute-reported-zero",
# v0.41.0 — proposal 0049 (typed LlmCompletionEvent variant on
# the observer event union). Seven fixtures exercise dispatch
# shape (050), type discrimination (051), opt-in caller
Expand Down Expand Up @@ -205,6 +212,12 @@ async def test_observability_fixture(fixture_path: Path) -> None:
await _run_fixture_028(spec)
elif fixture_id == "038-otel-parallel-branches-dispatch-span":
await _run_fixture_038(spec)
elif fixture_id in {
"040-llm-cache-attribute-emission",
"041-llm-cache-attribute-absence",
"042-llm-cache-attribute-reported-zero",
}:
await _run_llm_cache_fixture(spec)
elif fixture_id == "050-llm-completion-event-dispatch":
await _run_fixture_050(spec)
elif fixture_id == "051-llm-completion-event-type-discrimination":
Expand Down Expand Up @@ -2776,12 +2789,13 @@ def _build_simple_llm_graph(
case: Mapping[str, Any],
*,
populate_caller_metadata: bool,
) -> tuple[Any, type[Any]]:
) -> tuple[Any, type[Any], Any]:
"""Build a single-node graph that calls the LLM provider against a
mock transport. Matches the simple entry → ask → END pattern used
by fixtures 050, 051, 052, 053, 056. Returns ``(compiled_graph,
state_cls)`` so the caller can construct State instances without
re-deriving the class.
state_cls, provider)`` — the caller owns the provider's lifecycle
and MUST call ``await provider.aclose()`` after invoke completes
to release the underlying httpx.AsyncClient connection pool.
"""
import json

Expand Down Expand Up @@ -2837,7 +2851,7 @@ async def ask_body(_s: Any) -> dict[str, str]:
builder = (
GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name)
)
return builder.compile(), state_cls
return builder.compile(), state_cls, provider


def _make_state_instance(case: Mapping[str, Any], state_cls: type[Any]) -> Any:
Expand Down Expand Up @@ -3154,6 +3168,138 @@ async def __call__(self, event: Any) -> None:
self.events.append(event)


async def _run_llm_cache_fixture(spec: Mapping[str, Any]) -> None:
"""Run the proposal 0047 §5.5.3.1 cache-attribute fixtures (040,
041, 042). All three share the same simple-shape graph and assert
on ``Response.usage`` cache fields plus the LLM provider span's
``openarmature.llm.cache_read.input_tokens`` /
``openarmature.llm.cache_creation.input_tokens`` attribute set.
"""
cases = cast("list[dict[str, Any]]", spec["cases"])
for case in cases:
case_name = cast("str", case["name"])
try:
await _run_llm_cache_fixture_case(case)
except AssertionError as e:
raise AssertionError(f"case {case_name!r}: {e}") from e


async def _run_llm_cache_fixture_case(case: Mapping[str, Any]) -> None:
"""Build a simple LLM-calling graph, capture the response, and
assert on response_usage + llm_span_attributes /
llm_span_attributes_absent expectations.
"""
import json

import httpx
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: PLC0415
from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: PLC0415
InMemorySpanExporter,
)

from openarmature.graph import END, GraphBuilder
from openarmature.llm import OpenAIProvider, UserMessage
from openarmature.llm.response import Response
from openarmature.observability.otel import OTelObserver

from .adapter import build_state_cls

mock_responses = list(cast("list[dict[str, Any]]", case.get("mock_llm") or []))

def _handler(_request: httpx.Request) -> httpx.Response:
if not mock_responses:
raise AssertionError("mock_llm queue exhausted")
spec_resp = mock_responses.pop(0)
body = cast("dict[str, Any]", spec_resp.get("body") or {})
return httpx.Response(
int(spec_resp.get("status", 200)),
content=json.dumps(body).encode("utf-8"),
headers={"Content-Type": "application/json"},
)

provider = OpenAIProvider(
base_url="http://mock-llm.test",
model=_mock_model_from_first_response(case) or "test-model",
api_key="test",
transport=httpx.MockTransport(_handler),
)

state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"])
state_cls = build_state_cls("LlmCacheFixtureState", state_fields)

nodes = cast("dict[str, Any]", case["nodes"])
entry_name = cast("str", case["entry"])
calls_llm_spec = cast("dict[str, Any]", nodes[entry_name]["calls_llm"])
stores_in = cast("str", calls_llm_spec.get("stores_response_in", "answer"))
messages_spec = cast("list[dict[str, str]]", calls_llm_spec.get("messages", []))
messages = [UserMessage(content=m["content"]) for m in messages_spec if m.get("role") == "user"]

captured_responses: list[Response] = []

async def ask_body(_s: Any) -> dict[str, str]:
response = await provider.complete(messages)
captured_responses.append(response)
return {stores_in: response.message.content or ""}

builder = (
GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name)
)
graph = builder.compile()

exporter = InMemorySpanExporter()
observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter))
graph.attach_observer(observer)
try:
await graph.invoke(state_cls())
finally:
await graph.drain()
observer.shutdown()
# OpenAIProvider owns an httpx.AsyncClient; closing it releases
# the connection pool. Matches the convention used by fixture
# 005 / 038 runners elsewhere in this file.
await provider.aclose()

Comment thread
chris-colinsky marked this conversation as resolved.
expected = cast("dict[str, Any]", case["expected"])

# ---- Response.usage assertion
expected_usage = cast("dict[str, Any] | None", expected.get("response_usage"))
if expected_usage is not None:
# The cache-attribute fixtures (040/041/042) are single-LLM-call
# by shape — one ``ask`` node, one mocked response. A future
# fixture extending to multi-call would need this assertion to
# loop over captured_responses rather than indexing [0].
assert len(captured_responses) == 1, (
f"response_usage assertion expects exactly one LLM call; captured {len(captured_responses)}"
)
actual_usage = captured_responses[0].usage
for field_name, expected_value in expected_usage.items():
actual = getattr(actual_usage, field_name)
assert actual == expected_value, (
f"response_usage.{field_name}: expected {expected_value!r}, got {actual!r}"
)

# ---- LLM span attribute assertions
llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"]
assert len(llm_spans) == 1, f"expected exactly one LLM provider span; got {len(llm_spans)}"
llm_span_attrs = dict(llm_spans[0].attributes or {})

expected_attrs = cast("dict[str, Any] | None", expected.get("llm_span_attributes"))
if expected_attrs is not None:
for attr_name, expected_value in expected_attrs.items():
actual = llm_span_attrs.get(attr_name)
assert actual == expected_value, (
f"llm_span_attributes[{attr_name!r}]: expected {expected_value!r}, got {actual!r}"
)

absent_attrs = cast("list[str] | None", expected.get("llm_span_attributes_absent"))
if absent_attrs is not None:
for attr_name in absent_attrs:
assert attr_name not in llm_span_attrs, (
f"llm_span_attributes_absent: {attr_name!r} unexpectedly present "
f"with value {llm_span_attrs[attr_name]!r}"
)


async def _run_typed_event_fixture_case(
case: Mapping[str, Any],
*,
Expand All @@ -3171,36 +3317,44 @@ async def _run_typed_event_fixture_case(
the same surface.
"""
collectors, populate_caller_metadata = _parse_typed_observers(case)
graph, state_cls = _build_simple_llm_graph(case, populate_caller_metadata=populate_caller_metadata)
extra: _AllEventsCollector | None = None
if expect_failure and not any(c.filter_event_type is None for c in collectors.values()):
extra = _AllEventsCollector()
final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra)

expected = cast("dict[str, Any]", case.get("expected") or {})
if expect_failure:
assert exc is not None, "failure-path fixture expected an exception"
node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error"))
if node_completed:
# Source for the assertion: an unfiltered named collector
# when present, otherwise the failure-path-only extra
# ``_AllEventsCollector``.
unfiltered_named = next((c for c in collectors.values() if c.filter_event_type is None), None)
source = (
unfiltered_named.events
if unfiltered_named is not None
else (extra.events if extra is not None else [])
)
_assert_node_completed_event_carries_error(source, node_completed)
else:
if final is None:
raise AssertionError("expected a non-None final state on success path")
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
for name, expectations in observer_expectations.items():
collector = collectors.get(name)
if collector is None:
raise AssertionError(f"fixture references unknown observer {name!r}")
_assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations))
graph, state_cls, provider = _build_simple_llm_graph(
case, populate_caller_metadata=populate_caller_metadata
)
try:
extra: _AllEventsCollector | None = None
if expect_failure and not any(c.filter_event_type is None for c in collectors.values()):
extra = _AllEventsCollector()
final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra)

expected = cast("dict[str, Any]", case.get("expected") or {})
if expect_failure:
assert exc is not None, "failure-path fixture expected an exception"
node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error"))
if node_completed:
# Source for the assertion: an unfiltered named collector
# when present, otherwise the failure-path-only extra
# ``_AllEventsCollector``.
unfiltered_named = next((c for c in collectors.values() if c.filter_event_type is None), None)
source = (
unfiltered_named.events
if unfiltered_named is not None
else (extra.events if extra is not None else [])
)
_assert_node_completed_event_carries_error(source, node_completed)
else:
if final is None:
raise AssertionError("expected a non-None final state on success path")
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
for name, expectations in observer_expectations.items():
collector = collectors.get(name)
if collector is None:
raise AssertionError(f"fixture references unknown observer {name!r}")
_assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations))
finally:
# _build_simple_llm_graph hands ownership of the provider's
# httpx.AsyncClient to the runner; close it to release the
# connection pool.
await provider.aclose()


async def _run_fixture_050(spec: Mapping[str, Any]) -> None:
Expand Down Expand Up @@ -3375,6 +3529,8 @@ async def _ask_body(_s: Any) -> dict[str, str]:
for handle in handles:
handle.remove()
await outer_compiled.drain()
# Release the underlying httpx.AsyncClient connection pool.
await provider.aclose()

expected = cast("dict[str, Any]", case.get("expected") or {})
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
Expand Down Expand Up @@ -3477,6 +3633,8 @@ async def _body(_s: Any, _msgs: Any = msgs, _stores: str = stores_in) -> dict[st
for handle in handles:
handle.remove()
await outer_compiled.drain()
# Release the underlying httpx.AsyncClient connection pool.
await provider.aclose()

expected = cast("dict[str, Any]", case.get("expected") or {})
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
Expand Down
Loading