diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a14405..d751cdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Changed -- **OTel observer drives the `openarmature.llm.complete` span lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the span in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so the span duration reflects the actual adapter-boundary measurement rather than dispatcher queue delay. Failure-path spans continue to fire from the sentinel `NodeEvent` pair (the typed event is success-only per the proposal). The §5.5 attribute set is unchanged. Dual-emit window: the provider still emits both the sentinel pair AND the typed event during v0.13.0; the sentinel pair drops in v0.15.0. +- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged. +- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path. +- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. - **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. ### Added diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 7a6a054..9c2d310 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -441,21 +441,6 @@ async def complete( serialized_messages = _serialize_messages_for_payload(messages) request_params = _request_params_from_config(config) request_extras = _request_extras_from_config(config) - if dispatch is not None: - dispatch( - _make_llm_event( - "started", - call_id=call_id, - model=self.model, - genai_system=self._genai_system, - input_messages=serialized_messages, - request_params=request_params, - request_extras=request_extras, - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, - ) - ) - # Wall-clock latency measured at the adapter boundary per # proposal 0049's LlmCompletionEvent.latency_ms contract. The # boundary spans from "just before _do_complete is called" to @@ -473,13 +458,15 @@ async def complete( response = await self._do_complete(body, schema_dict, schema_class) except Exception as exc: if dispatch is not None: - # Failure path: only the sentinel NodeEvent pair fires. - # Per proposal 0049 §3 (alternative 3): LlmCompletionEvent - # is completion-only; failures flow through the - # llm-provider §7 exception path. The error continues - # to surface through the existing observer chain via - # the sentinel NodeEvent's error_type / error_category - # fields on LlmEventPayload. + # Failure path: the sentinel NodeEvent carries the + # error fields per llm-provider §7. LlmCompletionEvent + # is success-only per proposal 0049 §3 alternative 3, + # so failures continue to surface through a sentinel + # ``completed`` event until the spec extends the typed + # event with error semantics. Only ``completed`` fires + # — no started counterpart, since both bundled + # observers' handlers ignore sentinel-started after + # the v0.13.0 migration. dispatch( _make_llm_event( "completed", @@ -498,33 +485,14 @@ async def complete( latency_ms = (time.perf_counter() - adapter_start) * 1000.0 if dispatch is not None: - # Sentinel NodeEvent pair stays during the dual-emit window - # per proposal 0049 §5.5.7 SHOULD-emit-both transition. The - # window stays open through v0.13.0 with the sentinel - # emission removed in v0.15.0 (CHANGELOG callout pinned to - # the v0.13.0 release notes). - dispatch( - _make_llm_event( - "completed", - call_id=call_id, - model=self.model, - genai_system=self._genai_system, - finish_reason=response.finish_reason, - usage=response.usage, - input_messages=serialized_messages, - output_content=response.message.content or None, - request_params=request_params, - request_extras=request_extras, - response_id=response.response_id, - response_model=response.response_model, - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, - ) - ) - # The new typed LlmCompletionEvent — observers filtering via - # isinstance(event, LlmCompletionEvent) receive this; legacy - # observers filtering on the sentinel namespace see the - # NodeEvent pair above. Failure path doesn't reach here. + # Success path: emit only the typed LlmCompletionEvent. + # The sentinel NodeEvent pair previously emitted on success + # for compatibility with pre-typed-event observers was + # dropped in v0.13.0; bundled observers (OTel + Langfuse) + # consume the typed event directly, and external custom + # observers should migrate to type discrimination via + # ``isinstance(event, LlmCompletionEvent)`` if they need + # LLM call notifications. dispatch( self._build_llm_completion_event( response, diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index fcbdbc3..ed2964b 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -35,6 +35,7 @@ import json from contextlib import ExitStack +from datetime import datetime from typing import TYPE_CHECKING, Any, cast from .client import LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, ObservationLevel @@ -129,13 +130,20 @@ def update(self, **fields: Any) -> None: kwargs[key] = value self._obs.update(**kwargs) - def end(self, **fields: Any) -> None: + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: # Apply any field updates first (so they're set BEFORE the # observation closes), then call end(). v4's end() takes only # an optional ``end_time``; field mutation happens via update(). + # The SDK's end_time is typed Optional[int] nanoseconds — + # convert from the Protocol's datetime surface before passing + # through. Without the conversion the OTel span_processor's + # formatter raises TypeError when it tries ``end_time / 1e9``. if fields: self.update(**fields) - self._obs.end() + if end_time is not None: + self._obs.end(end_time=int(end_time.timestamp() * 1_000_000_000)) + else: + self._obs.end() class LangfuseSDKAdapter: @@ -337,6 +345,7 @@ def generation( output: Any = None, usage: LangfuseUsage | None = None, prompt: Any = None, + start_time: datetime | None = None, ) -> LangfuseGenerationHandle: extra_kwargs: dict[str, Any] = { "model": model, @@ -356,18 +365,98 @@ def generation( if usage.total is not None: usage_details["total"] = usage.total extra_kwargs["usage_details"] = usage_details - obs = self._start_observation( - as_type="generation", - trace_id=trace_id, - name=name, - metadata=metadata, - parent_observation_id=parent_observation_id, - level=level, - status_message=status_message, - **{k: v for k, v in extra_kwargs.items() if v is not None}, - ) + if start_time is not None: + # v4's public ``start_observation`` does NOT accept a + # ``start_time`` kwarg — only the internal OTel tracer + # does. Mirror the SDK's own ``create_event`` precedent + # (langfuse/_client/client.py:1518-1551): open the OTel + # span directly via the private ``_otel_tracer`` with the + # back-dated timestamp, then wrap it in LangfuseGeneration. + # This is the only path to a back-dated Generation in + # v4.7; the live-account integration test catches a future + # SDK break. + obs = self._start_back_dated_generation( + trace_id=trace_id, + name=name, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + start_time=start_time, + **{k: v for k, v in extra_kwargs.items() if v is not None}, + ) + else: + obs = self._start_observation( + as_type="generation", + trace_id=trace_id, + name=name, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + **{k: v for k, v in extra_kwargs.items() if v is not None}, + ) return _SpanHandle(obs) + def _start_back_dated_generation( + self, + *, + trace_id: str, + name: str | None, + metadata: dict[str, Any] | None, + parent_observation_id: str | None, + level: ObservationLevel, + status_message: str | None, + start_time: datetime, + **extra: Any, + ) -> Any: + """Open a LangfuseGeneration at a back-dated timestamp by going + through the private OTel tracer rather than the public + ``start_observation`` API (which doesn't accept ``start_time`` + in v4.7). Mirrors the SDK's ``create_event`` precedent.""" + from langfuse._client.span import LangfuseGeneration + from opentelemetry import trace as otel_trace_api + + trace_entry = self._trace_info.get(trace_id) + trace_context: TraceContext = {"trace_id": _to_otel_trace_id(trace_id)} + if parent_observation_id is not None: + trace_context["parent_span_id"] = parent_observation_id + + # OTel's ``start_span(start_time=...)`` takes int nanoseconds + # since epoch. The SDK uses ``time_ns()`` for its instant + # events; for back-dating, convert from the supplied datetime. + start_time_ns = int(start_time.timestamp() * 1_000_000_000) + + remote_parent_span = self._client._create_remote_parent_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001 + trace_id=trace_context["trace_id"], + parent_span_id=trace_context.get("parent_span_id"), + ) + + with ExitStack() as stack: + if trace_entry is not None: + stack.enter_context( + propagate_attributes( + trace_name=trace_entry["name"], + metadata=_stringify_metadata(trace_entry["metadata"]), + ) + ) + stack.enter_context(otel_trace_api.use_span(remote_parent_span)) + otel_span = self._client._otel_tracer.start_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001 + name=name or "observation", + start_time=start_time_ns, + ) + generation_kwargs: dict[str, Any] = { + "otel_span": otel_span, + "langfuse_client": self._client, + "metadata": metadata, + } + if level != "DEFAULT": + generation_kwargs["level"] = level + if status_message is not None: + generation_kwargs["status_message"] = status_message + generation_kwargs.update(extra) + return LangfuseGeneration(**generation_kwargs) + def force_flush(self, timeout_ms: int = 30_000) -> bool: """Best-effort flush of the underlying Langfuse client. diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 2132da3..8091b76 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -30,6 +30,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime from typing import Any, Literal, Protocol, runtime_checkable ObservationType = Literal["span", "generation", "event"] @@ -65,6 +66,13 @@ class LangfuseObservation: level: ObservationLevel = "DEFAULT" status_message: str | None = None ended: bool = False + # Optional caller-supplied timestamps. Populated when a typed-event + # consumer (e.g., the LangfuseObserver on the typed LLM completion + # path) back-dates the observation using a wall-clock measurement + # rather than letting the SDK record open/close moments. ``None`` + # means "use the SDK's default"; both fields are independent. + start_time: datetime | None = None + end_time: datetime | None = None # Generation-specific (None / empty on Span and Event observations) model: str | None = None @@ -125,7 +133,7 @@ def id(self) -> str: ... def update(self, **fields: Any) -> None: ... - def end(self, **fields: Any) -> None: ... + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ... @runtime_checkable @@ -138,7 +146,7 @@ def id(self) -> str: ... def update(self, **fields: Any) -> None: ... - def end(self, **fields: Any) -> None: ... + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ... @runtime_checkable @@ -224,6 +232,7 @@ def generation( output: Any = None, usage: LangfuseUsage | None = None, prompt: Any = None, + start_time: datetime | None = None, ) -> LangfuseGenerationHandle: ... def force_flush(self, timeout_ms: int = 30_000) -> bool: @@ -268,7 +277,9 @@ def id(self) -> str: def update(self, **fields: Any) -> None: _apply_fields(self.observation, fields) - def end(self, **fields: Any) -> None: + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: + if end_time is not None: + self.observation.end_time = end_time _apply_fields(self.observation, fields) self.observation.ended = True @@ -286,7 +297,9 @@ def id(self) -> str: def update(self, **fields: Any) -> None: _apply_fields(self.observation, fields) - def end(self, **fields: Any) -> None: + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: + if end_time is not None: + self.observation.end_time = end_time _apply_fields(self.observation, fields) self.observation.ended = True @@ -428,6 +441,7 @@ def generation( output: Any = None, usage: LangfuseUsage | None = None, prompt: Any = None, + start_time: datetime | None = None, ) -> LangfuseGenerationHandle: trace = self._get_trace(trace_id) observation = LangfuseObservation( @@ -444,6 +458,7 @@ def generation( output=output, usage=usage, prompt_entity_link=prompt, + start_time=start_time, ) trace.observations.append(observation) return _InMemoryGenerationHandle(observation=observation) diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 55016ce..d239121 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -26,6 +26,7 @@ import uuid from collections.abc import Callable, Mapping from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta from typing import Any, cast from openarmature.graph.events import ( @@ -191,7 +192,6 @@ class _InvState: open_observations: dict[_StackKey, _OpenObservation] = field( default_factory=dict[_StackKey, _OpenObservation] ) - open_llm_observations: dict[str, _OpenObservation] = field(default_factory=dict[str, _OpenObservation]) # Synthetic subgraph dispatch Span observations, keyed by namespace # prefix. Per spec §8.3 each subgraph wrapper produces a Span # observation in its parent's Trace; descendant node observations @@ -366,23 +366,24 @@ async def __call__( if isinstance(event, InvocationCompletedEvent): self._handle_invocation_completed(event) return - # Proposal 0049 typed LlmCompletionEvent: ignored during the - # dual-emit window — the Langfuse mapping continues to drive - # its §5.5 Generation observation lifecycle off the sentinel - # NodeEvent pair the provider emits alongside the typed event. - # Migration to type discrimination lands in a subsequent PR; - # this early-return keeps the observer Protocol-compatible - # without changing behavior. + # Proposal 0049 typed LlmCompletionEvent (success path). Drives + # the §5.5 Generation observation lifecycle for successful + # provider calls. Failures don't emit this variant; they flow + # through the sentinel error path below (a single sentinel + # ``completed`` event — no started counterpart in v0.13.0+). if isinstance(event, LlmCompletionEvent): + if not self.disable_llm_spans: + self._handle_typed_llm_completion(event) return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider events use a sentinel namespace per §5.5; route - # them to the dedicated Generation path. + # LLM provider sentinel events: failure-path completed opens + + # closes an ERROR-level Generation; everything else is a no-op + # (success-path typed handler above owns the Generation). if event.namespace == LLM_NAMESPACE: if not self.disable_llm_spans: - self._handle_llm_event(event) + self._handle_llm_error_event(event) return if event.phase == "started": self._open_started_observation(event) @@ -1244,13 +1245,12 @@ def close_invocation(self, invocation_id: str) -> None: if inv_state is None: return # Order: deepest leaves first so parents see all children - # closed before they end. LLM observations → leaf nodes - # (sorted deepest-first by namespace length) → per-instance - # fan-out dispatches → subgraph dispatches. - for call_id in list(inv_state.open_llm_observations.keys()): - obs = inv_state.open_llm_observations.pop(call_id, None) - if obs is not None: - obs.handle.end() + # closed before they end. Leaf node observations (sorted + # deepest-first by namespace length) → per-instance fan-out + # dispatches → subgraph dispatches. LLM observations don't + # appear here — both the success and error paths open + close + # the Generation in one shot at handler-time, so there are no + # in-flight LLM observations to drain. for key in sorted( inv_state.open_observations.keys(), key=lambda k: -len(k[0]), @@ -1321,82 +1321,157 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> # Generation observation lifecycle (LLM provider events) # ------------------------------------------------------------------ - def _handle_llm_event(self, event: NodeEvent) -> None: + # v0.13.0 (proposal 0049 + 0057): success-path Generation lifecycle + # is driven by the typed LlmCompletionEvent — opened and closed in + # one shot at typed-event arrival, with start_time back-dated by + # latency_ms so the observation's duration reflects the adapter- + # boundary measurement rather than dispatcher queue delay. Failure + # path keeps a single sentinel NodeEvent (``completed`` phase + # carrying error fields on its LlmEventPayload — LlmCompletionEvent + # is success-only per proposal 0049 §3 alternative 3). The provider + # dropped success-path sentinel emission entirely in this release, + # so on success the typed event is the only signal the Generation + # observation has to fire from; the failure path's sentinel + # ``started`` was also dropped, leaving only ``completed``. + def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: + """Open + close the Generation observation from the typed + LlmCompletionEvent (success path).""" + # Mid-call metadata augmentation can't reach this observation: + # the typed event arrives only after complete() returns, and + # the observation is back-dated past any augmentation event + # that fired while the call was in flight. Since complete() + # is awaited, node bodies can't actually run augmentation + # mid-call, so this is theoretical only. from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, ) - if not isinstance(event.pre_state, LlmEventPayload): - # Defensive — sentinel-namespaced events MUST carry an - # LlmEventPayload per llm-provider / observability §5.5. - return invocation_id = current_invocation_id() if invocation_id is None: return - payload = event.pre_state + correlation_id = current_correlation_id() # The Trace MAY not exist yet if the LLM call fires before any - # node `started` event has hit this observer (race-y under - # tests that prepare via `prepare_sync` only). The in-memory - # client tolerates create-on-demand; production SDK adapters - # should too. + # node `started` event has hit this observer. Create-on-demand + # mirrors the sentinel-pair handler's behavior. if invocation_id not in self._inv_states: - self._open_trace(invocation_id, current_correlation_id(), event) + self._open_trace_for_typed_event(invocation_id, correlation_id, event) inv_state = self._inv_states[invocation_id] - correlation_id = current_correlation_id() + # Back-date start_time using latency_ms; fall back to end_time + # for both when latency is missing (zero-duration observation, + # mirroring the OTel path). + end_time = datetime.now(UTC) + if event.latency_ms is not None: + start_time = end_time - timedelta(milliseconds=event.latency_ms) + else: + start_time = end_time + parent_observation_id = self._resolve_llm_parent_observation_id( + inv_state, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, + ) + metadata = self._typed_event_metadata(event, correlation_id) + model_parameters: dict[str, Any] = dict(event.request_params or {}) + input_value: Any = None + output_value: Any = None + if not self.disable_llm_payload: + if event.input_messages: + input_value = self._maybe_truncate_for_input(event.input_messages) + if event.output_content is not None: + output_value = self._maybe_truncate_for_output(event.output_content) + if event.request_extras: + metadata["request_extras"] = self._maybe_truncate_for_extras(dict(event.request_extras)) + target_trace_id = self._trace_id_for(inv_state, event.namespace, event.fan_out_index) + handle = self.client.generation( + trace_id=target_trace_id, + name="openarmature.llm.complete", + model=event.model, + model_parameters=model_parameters, + input=input_value, + output=output_value, + metadata=metadata, + parent_observation_id=parent_observation_id, + prompt=self._resolve_prompt_link_from_typed_event(event), + start_time=start_time, + ) + usage = self._usage_from_typed_event(event) + end_kwargs: dict[str, Any] = {} + if usage is not None: + end_kwargs["usage"] = usage + handle.end(end_time=end_time, **end_kwargs) - if event.phase == "started": - parent_observation_id = self._resolve_llm_parent_observation_id(inv_state, payload) - metadata, model_parameters, input_value, output_value = self._llm_metadata_and_payload( - payload, correlation_id, phase="started" - ) - target_trace_id = self._trace_id_for( - inv_state, payload.calling_namespace_prefix, payload.calling_fan_out_index - ) - handle = self.client.generation( - trace_id=target_trace_id, - name="openarmature.llm.complete", - model=payload.model, - model_parameters=model_parameters, - input=input_value, - output=output_value, - metadata=metadata, - parent_observation_id=parent_observation_id, - prompt=self._resolve_prompt_link(payload), - ) - inv_state.open_llm_observations[payload.call_id] = _OpenObservation( - handle=handle, - fan_out_index_chain=event.fan_out_index_chain, - branch_name_chain=event.branch_name_chain, - ) - return + def _handle_llm_error_event(self, event: NodeEvent) -> None: + """Emit an ERROR-level Generation observation from the sentinel + NodeEvent on the failure path. Success-path sentinel completion + is no longer emitted by the provider in v0.13.0; this handler + only fires for failures.""" + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) - # completed: pop the started handle and finalize. - observation = inv_state.open_llm_observations.pop(payload.call_id, None) - if observation is None: + if event.phase != "completed": + # Sentinel started becomes a no-op once the success-side + # emission drops. Failures only emit the completed half. + return + if not isinstance(event.pre_state, LlmEventPayload): return - metadata, _model_parameters, _input_value, output_value = self._llm_metadata_and_payload( - payload, correlation_id, phase="completed" + payload = event.pre_state + if payload.error_type is None: + # Defensive — success path no longer emits the sentinel + # pair; if a non-error sentinel completion slips through + # (e.g., legacy custom provider not yet migrated), the + # typed event handler owns the Generation. + return + invocation_id = current_invocation_id() + if invocation_id is None: + return + correlation_id = current_correlation_id() + if invocation_id not in self._inv_states: + self._open_trace(invocation_id, correlation_id, event) + inv_state = self._inv_states[invocation_id] + parent_observation_id = self._resolve_llm_parent_observation_id( + inv_state, + calling_namespace_prefix=payload.calling_namespace_prefix, + calling_attempt_index=payload.calling_attempt_index, + calling_fan_out_index=payload.calling_fan_out_index, + calling_branch_name=payload.calling_branch_name, ) - end_kwargs: dict[str, Any] = {"metadata": metadata} - if output_value is not None: - end_kwargs["output"] = output_value - usage = self._usage_from_payload(payload) - if usage is not None: - end_kwargs["usage"] = usage - # Error-category mapping: §8.4.2 + §8.4.3 (an LLM provider - # error_category lands on the Generation observation's level - # and statusMessage the same as on a Span observation). - if payload.error_category is not None: - end_kwargs["level"] = "ERROR" - end_kwargs["status_message"] = payload.error_category - observation.handle.end(**end_kwargs) + metadata, model_parameters, input_value, _ = self._llm_metadata_and_payload( + payload, correlation_id, phase="started" + ) + target_trace_id = self._trace_id_for( + inv_state, payload.calling_namespace_prefix, payload.calling_fan_out_index + ) + handle = self.client.generation( + trace_id=target_trace_id, + name="openarmature.llm.complete", + model=payload.model, + model_parameters=model_parameters, + input=input_value, + metadata=metadata, + parent_observation_id=parent_observation_id, + prompt=self._resolve_prompt_link(payload), + ) + # Error-category mapping: §8.4.2 + §8.4.3. + end_kwargs: dict[str, Any] = { + "level": "ERROR", + "status_message": payload.error_category or payload.error_type, + } + handle.end(**end_kwargs) def _resolve_llm_parent_observation_id( - self, inv_state: _InvState, payload: LlmEventPayload + self, + inv_state: _InvState, + *, + calling_namespace_prefix: tuple[str, ...], + calling_attempt_index: int, + calling_fan_out_index: int | None, + calling_branch_name: str | None, ) -> str | None: - # Calling-node identity comes from the payload (set at - # dispatch time per llm-provider §5.5). Precedence: + # Calling-node identity precedence: # 1. Exact-match leaf node at the calling key. # 2. Per-instance fan-out dispatch observation when the # call originated inside a fan-out instance. @@ -1407,28 +1482,115 @@ def _resolve_llm_parent_observation_id( # exact-match miss would otherwise need a leaf-ancestor walk # to handle. key: _StackKey = ( - payload.calling_namespace_prefix, - payload.calling_attempt_index, - payload.calling_fan_out_index, - payload.calling_branch_name, + calling_namespace_prefix, + calling_attempt_index, + calling_fan_out_index, + calling_branch_name, ) observation = inv_state.open_observations.get(key) if observation is not None: return observation.handle.id # Per-instance fan-out dispatch. - if payload.calling_fan_out_index is not None and payload.calling_namespace_prefix: - instance_key = payload.calling_namespace_prefix[:1] + (str(payload.calling_fan_out_index),) + if calling_fan_out_index is not None and calling_namespace_prefix: + instance_key = calling_namespace_prefix[:1] + (str(calling_fan_out_index),) dispatch = inv_state.fan_out_instance_observations.get(instance_key) if dispatch is not None: return dispatch.handle.id # Subgraph dispatch, longest-prefix-first. - for prefix_len in range(len(payload.calling_namespace_prefix), 0, -1): - prefix = payload.calling_namespace_prefix[:prefix_len] + for prefix_len in range(len(calling_namespace_prefix), 0, -1): + prefix = calling_namespace_prefix[:prefix_len] sg = inv_state.subgraph_observations.get(prefix) if sg is not None: return sg.handle.id return None + def _typed_event_metadata(self, event: LlmCompletionEvent, correlation_id: str | None) -> dict[str, Any]: + """Build the Generation observation's metadata dict from the + typed event. Mirrors _llm_metadata_and_payload's metadata + construction but reads from LlmCompletionEvent fields, and + combines started + completed phases into a single populated + dict (the typed event carries everything at once).""" + metadata: dict[str, Any] = {} + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + metadata["system"] = event.provider + active_prompt = event.active_prompt + if active_prompt is not None: + metadata["prompt"] = { + "name": active_prompt.name, + "version": active_prompt.version, + "label": active_prompt.label, + "template_hash": active_prompt.template_hash, + "rendered_hash": active_prompt.rendered_hash, + } + active_group = event.active_prompt_group + if active_group is not None: + metadata["prompt_group_name"] = active_group.group_name + # Asymmetric guard with _llm_metadata_and_payload below: the + # typed event types caller_invocation_metadata as Mapping | None + # while LlmEventPayload defaults to an empty mapping (never + # None). Don't "normalize" the two paths without normalizing + # the source types. + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(metadata, event.caller_invocation_metadata) + if event.finish_reason is not None: + metadata["finish_reason"] = event.finish_reason + if event.response_model is not None: + metadata["response_model"] = event.response_model + if event.response_id is not None: + metadata["response_id"] = event.response_id + return metadata + + def _usage_from_typed_event(self, event: LlmCompletionEvent) -> LangfuseUsage | None: + """Map the typed event's Usage onto the Langfuse Usage record + per §8.4.3. Returns None when no usage was reported.""" + usage = event.usage + if usage is None: + return None + if usage.prompt_tokens is None and usage.completion_tokens is None and usage.total_tokens is None: + return None + return LangfuseUsage( + input=usage.prompt_tokens, + output=usage.completion_tokens, + total=usage.total_tokens, + ) + + def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent) -> Any: + """§8.4.4 case discrimination on the typed event's active_prompt + snapshot. Same logic as _resolve_prompt_link but reads from + LlmCompletionEvent instead of LlmEventPayload.""" + active_prompt = event.active_prompt + if active_prompt is None: + return None + entities = getattr(active_prompt, "observability_entities", None) + if not isinstance(entities, dict): + return None + return cast("dict[str, Any]", entities).get("langfuse_prompt") + + def _open_trace_for_typed_event( + self, invocation_id: str, correlation_id: str | None, event: LlmCompletionEvent + ) -> None: + """Trace open path for a typed LlmCompletionEvent arriving + before any node-started event reached this observer. + Synthesizes the minimal trace shape from the typed event's + scoping fields.""" + if event.namespace: + entry_node = event.namespace[0] + else: + entry_node = event.node_name or "openarmature.llm.complete" + metadata: dict[str, Any] = { + "entry_node": entry_node, + "spec_version": self.spec_version, + "implementation_name": self.implementation_name, + "implementation_version": self.implementation_version, + } + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(metadata, event.caller_invocation_metadata) + self.client.trace(id=invocation_id, name=entry_node, metadata=metadata) + self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) + def _llm_metadata_and_payload( self, payload: LlmEventPayload, diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 18bdb63..5265876 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -595,11 +595,12 @@ async def __call__( if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider sentinel events: success-path is a no-op (the - # typed event handler above owns the span). Failure-path - # completed events open + close an error span; sentinel-started - # and successful-completed are no-ops here. Dual-emit window - # closes in v0.15.0 per the CHANGELOG note pinned to v0.13.0. + # LLM provider sentinel events: the typed event handler above + # owns the success-path span. Failure-path ``completed`` events + # open + close an error span. v0.13.0 dropped sentinel-pair + # emission on the success path entirely; the only remaining + # sentinel emission is failure-path ``completed`` (until the + # spec extends the typed event with error semantics). if event.namespace == _LLM_NAMESPACE: if not self.disable_llm_spans: self._handle_llm_error_event(event) @@ -1086,9 +1087,11 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: # shot at typed-event arrival, with start_time back-dated by # latency_ms so the span duration matches the adapter-boundary # measurement. The error-path span is still driven by the sentinel - # NodeEvent pair the provider emits (LlmCompletionEvent is success- - # only per proposal 0049 §3 alternative 3). Dual-emit window closes - # in v0.15.0 — the sentinel pair will then drop entirely. + # NodeEvent (LlmCompletionEvent is success-only per proposal 0049 + # §3 alternative 3). Success-side sentinel emission was dropped + # from the provider in v0.13.0; the failure-side sentinel emission + # stays until the spec extends LlmCompletionEvent with error + # semantics (coord-thread tracked). # # v0.17.0 attribute set (proposal 0024) preserved unchanged: # - Baseline openarmature.llm.* attributes diff --git a/tests/integration/test_langfuse_sdk_adapter.py b/tests/integration/test_langfuse_sdk_adapter.py index 5018d14..972cda6 100644 --- a/tests/integration/test_langfuse_sdk_adapter.py +++ b/tests/integration/test_langfuse_sdk_adapter.py @@ -138,3 +138,112 @@ async def test_sdk_adapter_handles_invocation_with_no_real_observation() -> None assert trace.output == expected_output assert len(trace.observations) == 1 assert trace.observations[0].name == "openarmature.trace_io" + + +@pytest.mark.integration +async def test_sdk_adapter_generation_timestamps_round_trip_through_langfuse() -> None: + """End-to-end verification that explicit start_time / end_time on + the adapter's generation(...) / handle.end(...) calls actually + land on the Langfuse-side observation. The unit tests cover the + SDK call-site shape (the back-dated path bypasses the public + start_observation API and routes through the private + _otel_tracer.start_span instead, because v4.7's start_observation + rejects start_time with TypeError); this test closes the loop by + reading the projected timestamps back from the REST API and + asserting they reflect the back-dated values. + + Catches the failure mode the v0.13.0 Langfuse migration is + susceptible to: if a future SDK release renames _otel_tracer, + moves LangfuseGeneration, or otherwise breaks the private-API + surface the adapter relies on, the back-dating routing fails + silently — the Langfuse UI shows call-time timestamps instead + of the back-dated latency_ms-based ones, with no error to + surface the misconfiguration. + """ + from datetime import UTC, datetime, timedelta + + from langfuse import Langfuse + + from openarmature.observability.langfuse.adapter import LangfuseSDKAdapter + + client = Langfuse() + adapter = LangfuseSDKAdapter(client) + + invocation_id = str(uuid.uuid4()) + adapter.trace(id=invocation_id, name="test_sdk_adapter_generation_timestamps") + # Back-date by 250 ms — a value far enough above the SDK's own + # call-time jitter that a passthrough failure would show up + # clearly in the projected start/end timestamps. + end_time = datetime.now(UTC) + start_time = end_time - timedelta(milliseconds=250) + handle = adapter.generation( + trace_id=invocation_id, + name="openarmature.llm.complete", + model="test-model", + start_time=start_time, + ) + handle.end(end_time=end_time) + + adapter.force_flush() + time.sleep(2) + + hex_id = invocation_id.replace("-", "") + try: + _poll_trace_with_retry(client, hex_id) + # Pull observations via REST. The observations-list endpoint + # lags the headline trace fields by an "indeterminate window" + # per Langfuse's own caveat (mirrored in the trace_io test + # above), so the retry budget here is wider (180s vs the + # trace_io test's 60s). Filter server-side by name + type so + # the response is small + scoped; track seen names across + # polls so a name-mismatch failure surfaces with diagnostics + # rather than a generic "not found". + observation: Any = None + last_exc: Exception | None = None + seen_names: set[str] = set() + for _ in range(36): + try: + response = client.api.observations.get_many( + trace_id=hex_id, + name="openarmature.llm.complete", + type="GENERATION", + ) + if response.data: + observation = response.data[0] + break + # Fall back to an unfiltered query so we know whether + # ANY observations have projected — distinguishes "REST + # lag" from "observation projected with unexpected name". + fallback = client.api.observations.get_many(trace_id=hex_id) + seen_names.update(o.name or "" for o in fallback.data) + except Exception as exc: # noqa: BLE001 + last_exc = exc + time.sleep(5) + assert observation is not None, ( + f"openarmature.llm.complete Generation for trace {hex_id} did not appear via REST " + f"after 180s retry budget. Observations seen under trace (any name): {seen_names or 'none'}. " + f"Last error: {last_exc!r}" + ) + + # The REST projection's start/end timestamps MUST match the + # back-dated values within a small tolerance (the SDK rounds + # to microseconds; Langfuse's REST projection may round + # further). + assert observation.start_time is not None + assert observation.end_time is not None + start_delta = abs((observation.start_time - start_time).total_seconds()) + end_delta = abs((observation.end_time - end_time).total_seconds()) + assert start_delta < 0.01, ( + f"observation.start_time drift {start_delta * 1000:.3f}ms exceeds 10ms tolerance — " + f"sent {start_time.isoformat()}, got {observation.start_time.isoformat()}" + ) + assert end_delta < 0.01, ( + f"observation.end_time drift {end_delta * 1000:.3f}ms exceeds 10ms tolerance — " + f"sent {end_time.isoformat()}, got {observation.end_time.isoformat()}" + ) + finally: + # Match the existing module's clean-exit pattern: shutdown + # releases the SDK's background OTel exporter + ingestion + # queues. Without this, a long-running pytest process could + # accumulate background threads across integration tests. + client.shutdown() diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index a604442..db985ba 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -768,17 +768,13 @@ async def test_complete_negative_cached_tokens_surfaces_as_invalid_response() -> await provider.aclose() -async def test_complete_populates_cached_tokens_on_llm_event_payload() -> None: - # Proposal 0047: _make_llm_event MUST propagate - # Response.usage.cached_tokens onto the sentinel LlmEventPayload - # so observers driving §5.5.3.1 cache attribute emission off the - # sentinel NodeEvent pair have the cache stat available. The - # conformance fixture 040 covers this end-to-end via OTel span - # attribute assertion; this test localizes the assertion to the - # provider-payload boundary so a regression here surfaces - # independently of the observer rendering layer. - from openarmature.observability.llm_event import LlmEventPayload - +async def test_complete_populates_cached_tokens_on_typed_event() -> None: + # Proposal 0047: Response.usage.cached_tokens MUST flow onto the + # typed LlmCompletionEvent's usage record so observers driving + # §5.5.3.1 cache attribute emission have the cache stat available. + # This locks the field at the provider-event boundary; the + # conformance fixture 040 covers the end-to-end OTel span + # attribute path. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( { @@ -795,26 +791,18 @@ async def test_complete_populates_cached_tokens_on_llm_event_payload() -> None: await provider.aclose() _release_dispatch(token) - completed_payloads = [ - e.pre_state - for e in events - if isinstance(e, NodeEvent) and e.phase == "completed" and isinstance(e.pre_state, LlmEventPayload) - ] - assert len(completed_payloads) == 1 - payload = completed_payloads[0] - assert payload.cached_tokens == 42 + typed = next(e for e in events if isinstance(e, LlmCompletionEvent)) + assert typed.usage is not None + assert typed.usage.cached_tokens == 42 # The OpenAI-compat mapping leaves cache_creation_tokens absent - # per spec §8.1.2; verify the field stays None on the payload. - assert payload.cache_creation_tokens is None + # per spec §8.1.2; verify the field stays None on the typed event. + assert typed.usage.cache_creation_tokens is None async def test_complete_leaves_cached_tokens_none_when_provider_silent() -> None: # Companion to the populated case: when the wire response omits # prompt_tokens_details, Response.usage.cached_tokens stays None - # and the LlmEventPayload reflects that. Locks the absent path - # at the provider-payload boundary. - from openarmature.observability.llm_event import LlmEventPayload - + # and the typed event's Usage record reflects that. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 100, "completion_tokens": 5, "total_tokens": 105} @@ -826,14 +814,10 @@ async def test_complete_leaves_cached_tokens_none_when_provider_silent() -> None await provider.aclose() _release_dispatch(token) - completed_payloads = [ - e.pre_state - for e in events - if isinstance(e, NodeEvent) and e.phase == "completed" and isinstance(e.pre_state, LlmEventPayload) - ] - assert len(completed_payloads) == 1 - assert completed_payloads[0].cached_tokens is None - assert completed_payloads[0].cache_creation_tokens is None + typed = next(e for e in events if isinstance(e, LlmCompletionEvent)) + assert typed.usage is not None + assert typed.usage.cached_tokens is None + assert typed.usage.cache_creation_tokens is None # RuntimeConfig.from_partial — Python ergonomic introduced alongside @@ -1290,11 +1274,13 @@ def _release_dispatch(token: _DispatchToken) -> None: _reset_active_dispatch(token) -async def test_complete_success_emits_both_sentinel_and_typed_event() -> None: - # Dual-emit window per proposal 0049 §5.5.7 SHOULD-emit-both - # transition: the provider emits BOTH the existing sentinel - # NodeEvent pair (started + completed) AND the new typed - # LlmCompletionEvent on success. +async def test_complete_success_emits_only_typed_event() -> None: + # v0.13.0 dropped the success-side sentinel emission. The provider + # now emits a single typed LlmCompletionEvent on success; no + # sentinel NodeEvent pair (started or completed) fires for + # successful calls. External observers consuming LLM events on + # the success path MUST filter via isinstance(event, + # LlmCompletionEvent). events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} @@ -1308,20 +1294,17 @@ async def test_complete_success_emits_both_sentinel_and_typed_event() -> None: node_events = [e for e in events if isinstance(e, NodeEvent)] typed_events = [e for e in events if isinstance(e, LlmCompletionEvent)] - # Sentinel pair: started + completed (the existing pattern). - assert len(node_events) == 2 - assert [e.phase for e in node_events] == ["started", "completed"] - # One typed event (success-only emission). + assert node_events == [] assert len(typed_events) == 1 -async def test_complete_failure_emits_only_sentinel_no_typed_event() -> None: +async def test_complete_failure_emits_only_sentinel_completed_no_typed_event() -> None: # Per proposal 0049 §3 alternative 3: LlmCompletionEvent fires on # successful structured-response completion only. Provider - # exceptions (provider_unavailable etc.) flow through the - # existing exception path; the sentinel NodeEvent(completed, - # error=...) keeps firing for backwards-compat failure - # observability. + # exceptions (provider_unavailable etc.) flow through the existing + # exception path; the sentinel NodeEvent(completed, error=...) + # carries the error fields. v0.13.0 dropped the sentinel ``started`` + # event entirely — only ``completed`` fires on failure. from openarmature.graph.events import LlmCompletionEvent, NodeEvent def _503(_req: httpx.Request) -> httpx.Response: @@ -1340,11 +1323,8 @@ def _503(_req: httpx.Request) -> httpx.Response: node_events = [e for e in events if isinstance(e, NodeEvent)] typed_events = [e for e in events if isinstance(e, LlmCompletionEvent)] - # Sentinel pair fires; the completed event carries error details - # on its LlmEventPayload. - assert len(node_events) == 2 - assert [e.phase for e in node_events] == ["started", "completed"] - # No typed event on the failure path. + assert len(node_events) == 1 + assert node_events[0].phase == "completed" assert typed_events == [] @@ -1705,16 +1685,14 @@ def _handler(_req: httpx.Request) -> httpx.Response: assert typed.response_id is None -async def test_llm_completion_event_arrives_after_sentinel_completed_within_provider_emission() -> None: - # Within the provider's own emission window (sentinel started → - # sentinel completed → typed event), the typed LlmCompletionEvent - # MUST arrive after the sentinel NodeEvent(completed). Spec - # fixture 056 pins the broader contract (typed event arrives - # between the CALLING NODE's started/completed pair); this test - # locks down the provider-internal sub-ordering that contract - # depends on. The full bracketing (calling-node started → ... → - # calling-node completed) is covered by the conformance fixture - # which exercises a real CompiledGraph. +async def test_complete_success_emits_typed_event_as_single_emission() -> None: + # v0.13.0 dropped the success-side sentinel emission, so the + # provider's success-path emission window is now a single typed + # event — no within-emission ordering question remains. This test + # locks the single-emission shape so a regression that re-adds a + # sentinel NodeEvent on success would surface here. Spec fixture + # 056 pins the broader bracketing (typed event arrives between + # the CALLING NODE's started/completed pair) end-to-end. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} @@ -1726,14 +1704,8 @@ async def test_llm_completion_event_arrives_after_sentinel_completed_within_prov await provider.aclose() _release_dispatch(token) - # Build a type+phase tag for each event in arrival order. - sequence: list[str] = [] - for ev in events: - if isinstance(ev, NodeEvent): - sequence.append(f"sentinel:{ev.phase}") - elif isinstance(ev, LlmCompletionEvent): - sequence.append("typed:completed") - assert sequence == ["sentinel:started", "sentinel:completed", "typed:completed"] + assert len(events) == 1 + assert isinstance(events[0], LlmCompletionEvent) async def test_llm_completion_event_sources_node_identity_from_calling_context() -> None: diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 59a4697..9f467f3 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -1205,3 +1205,261 @@ async def test_implementation_attribution_rows_emit_on_every_trace() -> None: assert trace.metadata.get("implementation_name") == "openarmature-python" assert isinstance(trace.metadata.get("implementation_version"), str) assert trace.metadata["implementation_version"] + + +# --------------------------------------------------------------------------- +# Typed LlmCompletionEvent handling (proposal 0049 + 0057, PR 3c) +# --------------------------------------------------------------------------- + + +async def test_typed_llm_event_emits_generation_with_expected_fields() -> None: + # Happy-path: a single LlmCompletionEvent produces exactly one + # Generation observation under the typed event's invocation_id + # Trace, with model / usage / metadata sourced from the event. + from openarmature.llm.response import Usage + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + # disable_llm_payload defaults to True per §8.9; flip it off here + # so the test can also assert the payload (output) makes it through. + observer = LangfuseObserver(client=client, disable_llm_payload=False) + token = _set_invocation_id("inv-typed-1") + try: + await observer( + make_typed_event( + invocation_id="inv-typed-1", + model="m-test", + provider="vllm", + usage=Usage(prompt_tokens=10, completion_tokens=4, total_tokens=14), + finish_reason="stop", + response_id="resp-abc", + response_model="m-test-001", + output_content="hello", + request_params={"temperature": 0.7}, + ) + ) + finally: + _reset_invocation_id(token) + + assert "inv-typed-1" in client.traces + trace = client.traces["inv-typed-1"] + generations = [o for o in trace.observations if o.type == "generation"] + assert len(generations) == 1 + obs = generations[0] + assert obs.model == "m-test" + assert obs.usage == LangfuseUsage(input=10, output=4, total=14) + assert obs.model_parameters == {"temperature": 0.7} + assert obs.output == "hello" + assert obs.metadata.get("system") == "vllm" + assert obs.metadata.get("finish_reason") == "stop" + assert obs.metadata.get("response_id") == "resp-abc" + assert obs.metadata.get("response_model") == "m-test-001" + assert obs.ended is True + + +async def test_typed_llm_event_back_dates_generation_using_latency_ms() -> None: + # Generation observation's start/end timestamps reflect the + # adapter-boundary latency rather than the typed event's arrival + # moment. Verify end - start matches latency_ms within tolerance. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + latency_ms = 250.0 + token = _set_invocation_id("inv-typed-dur") + try: + await observer(make_typed_event(invocation_id="inv-typed-dur", latency_ms=latency_ms)) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-typed-dur"] + obs = next(o for o in trace.observations if o.type == "generation") + assert obs.start_time is not None and obs.end_time is not None + duration_ms = (obs.end_time - obs.start_time).total_seconds() * 1000 + # Float arithmetic tolerance; the back-date should be near-exact + # apart from microsecond rounding. + assert abs(duration_ms - latency_ms) < 1.0 + + +async def test_typed_llm_event_zero_duration_when_latency_missing() -> None: + # latency_ms=None falls back to a zero-duration Generation at + # end_time. Mirrors the OTel path. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + token = _set_invocation_id("inv-typed-no-latency") + try: + await observer(make_typed_event(invocation_id="inv-typed-no-latency", latency_ms=None)) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-typed-no-latency"] + obs = next(o for o in trace.observations if o.type == "generation") + assert obs.start_time is not None and obs.end_time is not None + assert obs.start_time == obs.end_time + + +async def test_typed_llm_event_drops_silently_outside_invocation() -> None: + # Without an invocation id ContextVar set, the typed handler + # MUST early-return without emitting a Generation. Symmetric with + # the OTel observer. + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + await observer(make_typed_event()) + assert client.traces == {} + + +async def test_disable_llm_spans_skips_typed_event_path() -> None: + # disable_llm_spans MUST gate the typed-event handler. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, disable_llm_spans=True) + token = _set_invocation_id("inv-disabled") + try: + await observer(make_typed_event(invocation_id="inv-disabled")) + finally: + _reset_invocation_id(token) + assert client.traces == {} + + +async def test_llm_error_path_emits_error_generation_from_sentinel_completed() -> None: + # Failure-path provider exceptions still flow through the sentinel + # NodeEvent(completed, error=...). Verify the observer emits an + # ERROR-level Generation with the canonical error_category as + # status_message. + from openarmature.graph.events import NodeEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from openarmature.observability.llm_event import LlmEventPayload + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + token = _set_invocation_id("inv-err") + try: + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="cc-err", + model="m-test", + error_type="ProviderRateLimit", + error_category="provider_rate_limited", + error_message="429 from upstream", + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(completed) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-err"] + obs = next(o for o in trace.observations if o.type == "generation") + assert obs.level == "ERROR" + assert obs.status_message == "provider_rate_limited" + + +async def test_llm_error_event_parents_under_branch_calling_node() -> None: + # Regression cover for the _resolve_llm_parent_observation_id + # keyword-only refactor: when an LLM failure fires inside a + # parallel-branches branch, the resulting ERROR Generation MUST + # parent under THAT branch's calling node observation, not under + # a sibling branch's same-named node. Pre-populates the observer's + # internal state with two open node observations that differ only + # by branch_name, then dispatches a sentinel-completed-error with + # a matching calling_branch_name and asserts the parent_observation + # _id points at the right one. + # + # Note: the same _resolve_llm_parent_observation_id call also + # serves the success-path typed event handler (with + # calling_branch_name = event.branch_name); error- and success- + # paths share the resolver, so this test transitively covers the + # success-path branch_name handling. + from openarmature.graph.events import NodeEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from openarmature.observability.langfuse.observer import ( + _InvState, + _OpenObservation, + ) + from openarmature.observability.llm_event import LlmEventPayload + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + invocation_id = "inv-pb-err" + token = _set_invocation_id(invocation_id) + try: + # Bootstrap the Trace + two branch-distinguished node + # observations directly. _InvState's open_observations map is + # keyed by (namespace, attempt_index, fan_out_index, + # branch_name); the calling node identity on the error payload + # is (("dispatcher", "ask"), 0, None, "fast"). + client.trace(id=invocation_id, name="dispatcher") + observer._inv_states[invocation_id] = _InvState(trace_id=invocation_id) # noqa: SLF001 + inv_state = observer._inv_states[invocation_id] # noqa: SLF001 + # Open two observations under the trace — one per branch. + fast_handle = client.generation(trace_id=invocation_id, name="ask", model="m-test") + slow_handle = client.generation(trace_id=invocation_id, name="ask", model="m-test") + fast_key = (("dispatcher", "ask"), 0, None, "fast") + slow_key = (("dispatcher", "ask"), 0, None, "slow") + inv_state.open_observations[fast_key] = _OpenObservation(handle=fast_handle) + inv_state.open_observations[slow_key] = _OpenObservation(handle=slow_handle) + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="cc-pb", + model="m-test", + error_type="ProviderUnavailable", + error_category="provider_unavailable", + error_message="503 from upstream", + calling_namespace_prefix=("dispatcher", "ask"), + calling_attempt_index=0, + calling_fan_out_index=None, + calling_branch_name="fast", + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(completed) + finally: + _reset_invocation_id(token) + + trace = client.traces[invocation_id] + # Three observations now: two synthetic "ask" + one error + # Generation. The error Generation MUST parent under fast_handle, + # not slow_handle. + error_gens = [o for o in trace.observations if o.type == "generation" and o.level == "ERROR"] + assert len(error_gens) == 1 + assert error_gens[0].parent_observation_id == fast_handle.id + assert error_gens[0].parent_observation_id != slow_handle.id diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index ae53406..6f5c452 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -161,6 +161,126 @@ def test_adapter_force_flush_delegates_to_client() -> None: assert wrapped.flush.call_count == 2 +def test_adapter_generation_routes_back_dated_calls_via_otel_tracer(monkeypatch: pytest.MonkeyPatch) -> None: + # v4.7's public ``Langfuse.start_observation`` does NOT accept + # ``start_time`` — only the internal ``_otel_tracer.start_span`` + # does. The adapter MUST route back-dated generation() calls via + # the OTel tracer path (mirroring the SDK's own ``create_event`` + # precedent). This test spies on BOTH paths: ``start_observation`` + # is patched to fail loudly if the back-dated path ever falls + # through to it (the prior monkeypatch test's gap), and the OTel + # tracer's ``start_span`` is spied to assert the back-dated + # nanosecond timestamp lands on the right surface. + from datetime import UTC, datetime + from unittest.mock import MagicMock + + client = _dummy_client() + captured_otel_kwargs: dict[str, Any] = {} + + def _otel_spy(**kwargs: Any) -> MagicMock: + captured_otel_kwargs.update(kwargs) + # The SDK calls get_span_context(), set_attribute(), etc. on + # the returned span during LangfuseGeneration construction. + # Plain MagicMock auto-creates most attrs, but trace_id / + # span_id MUST be real ints because the SDK formats them as + # 32 / 16-char hex internally. + span = MagicMock() + span.get_span_context.return_value = MagicMock( + trace_id=int("a" * 32, 16), + span_id=int("b" * 16, 16), + ) + return span + + def _start_observation_should_not_be_called(**_kwargs: Any) -> None: + raise AssertionError( + "start_observation MUST NOT be called on the back-dated path; " + "v4 SDK rejects start_time= and the adapter should route via _otel_tracer" + ) + + monkeypatch.setattr(client._otel_tracer, "start_span", _otel_spy) # noqa: SLF001 + monkeypatch.setattr(client, "start_observation", _start_observation_should_not_be_called) + adapter = LangfuseSDKAdapter(client) + adapter.trace(id="trace-ts", name="t") + + start = datetime(2026, 6, 8, 12, 0, 0, tzinfo=UTC) + adapter.generation(trace_id="trace-ts", name="g", model="m", start_time=start) + + expected_ns = int(start.timestamp() * 1_000_000_000) + assert captured_otel_kwargs.get("start_time") == expected_ns + assert captured_otel_kwargs.get("name") == "g" + + +def test_adapter_generation_without_start_time_uses_public_api(monkeypatch: pytest.MonkeyPatch) -> None: + # Companion to the back-dated test: when ``start_time`` is NOT + # supplied, the adapter falls back to the v4 SDK's public + # ``start_observation`` API and does NOT touch the private OTel + # tracer. + from unittest.mock import MagicMock + + client = _dummy_client() + captured_kwargs: dict[str, Any] = {} + + def _spy(**kwargs: Any) -> MagicMock: + captured_kwargs.update(kwargs) + return MagicMock(id="obs-spy", end=MagicMock()) + + def _otel_tracer_should_not_be_called(**_kwargs: Any) -> None: + raise AssertionError( + "_otel_tracer.start_span MUST NOT be called when start_time is None; " + "the public start_observation API should handle this path" + ) + + monkeypatch.setattr(client, "start_observation", _spy) + monkeypatch.setattr(client._otel_tracer, "start_span", _otel_tracer_should_not_be_called) # noqa: SLF001 + adapter = LangfuseSDKAdapter(client) + adapter.trace(id="trace-ts", name="t") + + adapter.generation(trace_id="trace-ts", name="g", model="m") + + assert captured_kwargs.get("name") == "g" + assert "start_time" not in captured_kwargs + + +def test_adapter_generation_handle_end_converts_end_time_to_nanoseconds() -> None: + # Companion to the start_time test: the handle's end() MUST + # convert the datetime to int nanoseconds before forwarding to + # the underlying v4 obs's end(). LangfuseSpan.end is typed + # ``Optional[int]`` (nanoseconds); passing a datetime through + # crashes the OTel span_processor's formatter with TypeError on + # ``end_time / 1e9`` deep in the SDK. + from datetime import UTC, datetime + from unittest.mock import MagicMock + + from openarmature.observability.langfuse.adapter import _SpanHandle + + sdk_obs = MagicMock(id="obs-e") + sdk_obs.end = MagicMock() + handle = _SpanHandle(sdk_obs) + + end = datetime(2026, 6, 8, 12, 0, 1, tzinfo=UTC) + handle.end(end_time=end) + + expected_ns = int(end.timestamp() * 1_000_000_000) + sdk_obs.end.assert_called_once_with(end_time=expected_ns) + + +def test_adapter_generation_handle_end_omits_end_time_when_unspecified() -> None: + # When no end_time is supplied, the handle MUST call the SDK obs's + # end() without the kwarg so the SDK uses its default + # (call-time). Locks the "default-respecting" branch. + from unittest.mock import MagicMock + + from openarmature.observability.langfuse.adapter import _SpanHandle + + sdk_obs = MagicMock(id="obs-e") + sdk_obs.end = MagicMock() + handle = _SpanHandle(sdk_obs) + + handle.end() + + sdk_obs.end.assert_called_once_with() + + # --------------------------------------------------------------------------- # Integration test against real Langfuse Cloud (opt-in) # ---------------------------------------------------------------------------