From c448ce2e9502d9b9ecd778cf7d230187c386bf05 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 8 Jun 2026 17:24:11 -0700 Subject: [PATCH 1/4] Migrate Langfuse to typed LLM event Drive the openarmature.llm.complete Generation observation lifecycle from LlmCompletionEvent on the success path, mirroring PR 3b's OTel shape. Open and close the Generation in one shot at typed-event arrival with start_time back-dated by latency_ms so the Langfuse UI shows the adapter-boundary measurement rather than dispatcher queue delay. Sentinel pair stays for the failure path until the spec extends LlmCompletionEvent with error semantics (coord thread filed). Widen the LangfuseClient Protocol with optional start_time on generation() and end_time on Span/Generation handle end(). The SDK adapter forwards both to the v4 SDK; the InMemory client stores them on LangfuseObservation for test assertions. Drop the sentinel NodeEvent pair emission on the success path from OpenAIProvider.complete(). The bundled OTel and Langfuse observers consume the typed event directly; external custom observers consuming LLM events MUST migrate to isinstance discrimination. The sentinel completed event still fires on the failure path; sentinel started is no longer emitted. --- CHANGELOG.md | 4 +- src/openarmature/llm/providers/openai.py | 66 +--- .../observability/langfuse/adapter.py | 11 +- .../observability/langfuse/client.py | 23 +- .../observability/langfuse/observer.py | 327 +++++++++++++----- .../observability/otel/observer.py | 19 +- .../integration/test_langfuse_sdk_adapter.py | 95 +++++ tests/unit/test_llm_provider.py | 112 +++--- tests/unit/test_observability_langfuse.py | 258 ++++++++++++++ .../test_observability_langfuse_adapter.py | 64 ++++ 10 files changed, 761 insertions(+), 218 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a14405..8c0bb7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Changed -- **OTel observer drives the `openarmature.llm.complete` span lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the span in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so the span duration reflects the actual adapter-boundary measurement rather than dispatcher queue delay. Failure-path spans continue to fire from the sentinel `NodeEvent` pair (the typed event is success-only per the proposal). The §5.5 attribute set is unchanged. Dual-emit window: the provider still emits both the sentinel pair AND the typed event during v0.13.0; the sentinel pair drops in v0.15.0. +- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged. +- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path. +- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter forwards to the v4 Langfuse SDK's `start_observation(start_time=...)` and `obs.end(end_time=...)` (supported in 4.6+). The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. - **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. ### Added diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 7a6a054..5ec57c7 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -441,21 +441,6 @@ async def complete( serialized_messages = _serialize_messages_for_payload(messages) request_params = _request_params_from_config(config) request_extras = _request_extras_from_config(config) - if dispatch is not None: - dispatch( - _make_llm_event( - "started", - call_id=call_id, - model=self.model, - genai_system=self._genai_system, - input_messages=serialized_messages, - request_params=request_params, - request_extras=request_extras, - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, - ) - ) - # Wall-clock latency measured at the adapter boundary per # proposal 0049's LlmCompletionEvent.latency_ms contract. The # boundary spans from "just before _do_complete is called" to @@ -473,13 +458,15 @@ async def complete( response = await self._do_complete(body, schema_dict, schema_class) except Exception as exc: if dispatch is not None: - # Failure path: only the sentinel NodeEvent pair fires. - # Per proposal 0049 §3 (alternative 3): LlmCompletionEvent - # is completion-only; failures flow through the - # llm-provider §7 exception path. The error continues - # to surface through the existing observer chain via - # the sentinel NodeEvent's error_type / error_category - # fields on LlmEventPayload. + # Failure path: the sentinel NodeEvent carries the + # error fields per llm-provider §7. LlmCompletionEvent + # is success-only per proposal 0049 §3 alternative 3, + # so failures continue to surface through the sentinel + # pair until the spec extends the typed event with + # error semantics. Only ``completed`` is emitted on + # failure — no started counterpart, since both bundled + # observers' handlers ignore sentinel-started after the + # v0.13.0 migration. dispatch( _make_llm_event( "completed", @@ -498,33 +485,14 @@ async def complete( latency_ms = (time.perf_counter() - adapter_start) * 1000.0 if dispatch is not None: - # Sentinel NodeEvent pair stays during the dual-emit window - # per proposal 0049 §5.5.7 SHOULD-emit-both transition. The - # window stays open through v0.13.0 with the sentinel - # emission removed in v0.15.0 (CHANGELOG callout pinned to - # the v0.13.0 release notes). - dispatch( - _make_llm_event( - "completed", - call_id=call_id, - model=self.model, - genai_system=self._genai_system, - finish_reason=response.finish_reason, - usage=response.usage, - input_messages=serialized_messages, - output_content=response.message.content or None, - request_params=request_params, - request_extras=request_extras, - response_id=response.response_id, - response_model=response.response_model, - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, - ) - ) - # The new typed LlmCompletionEvent — observers filtering via - # isinstance(event, LlmCompletionEvent) receive this; legacy - # observers filtering on the sentinel namespace see the - # NodeEvent pair above. Failure path doesn't reach here. + # Success path: emit only the typed LlmCompletionEvent. + # The sentinel NodeEvent pair previously emitted on success + # for compatibility with pre-typed-event observers was + # dropped in v0.13.0; bundled observers (OTel + Langfuse) + # consume the typed event directly, and external custom + # observers should migrate to type discrimination via + # ``isinstance(event, LlmCompletionEvent)`` if they need + # LLM call notifications. dispatch( self._build_llm_completion_event( response, diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index fcbdbc3..2c91e57 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -35,6 +35,7 @@ import json from contextlib import ExitStack +from datetime import datetime from typing import TYPE_CHECKING, Any, cast from .client import LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, ObservationLevel @@ -129,13 +130,16 @@ def update(self, **fields: Any) -> None: kwargs[key] = value self._obs.update(**kwargs) - def end(self, **fields: Any) -> None: + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: # Apply any field updates first (so they're set BEFORE the # observation closes), then call end(). v4's end() takes only # an optional ``end_time``; field mutation happens via update(). if fields: self.update(**fields) - self._obs.end() + if end_time is not None: + self._obs.end(end_time=end_time) + else: + self._obs.end() class LangfuseSDKAdapter: @@ -337,6 +341,7 @@ def generation( output: Any = None, usage: LangfuseUsage | None = None, prompt: Any = None, + start_time: datetime | None = None, ) -> LangfuseGenerationHandle: extra_kwargs: dict[str, Any] = { "model": model, @@ -356,6 +361,8 @@ def generation( if usage.total is not None: usage_details["total"] = usage.total extra_kwargs["usage_details"] = usage_details + if start_time is not None: + extra_kwargs["start_time"] = start_time obs = self._start_observation( as_type="generation", trace_id=trace_id, diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 2132da3..8091b76 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -30,6 +30,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime from typing import Any, Literal, Protocol, runtime_checkable ObservationType = Literal["span", "generation", "event"] @@ -65,6 +66,13 @@ class LangfuseObservation: level: ObservationLevel = "DEFAULT" status_message: str | None = None ended: bool = False + # Optional caller-supplied timestamps. Populated when a typed-event + # consumer (e.g., the LangfuseObserver on the typed LLM completion + # path) back-dates the observation using a wall-clock measurement + # rather than letting the SDK record open/close moments. ``None`` + # means "use the SDK's default"; both fields are independent. + start_time: datetime | None = None + end_time: datetime | None = None # Generation-specific (None / empty on Span and Event observations) model: str | None = None @@ -125,7 +133,7 @@ def id(self) -> str: ... def update(self, **fields: Any) -> None: ... - def end(self, **fields: Any) -> None: ... + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ... @runtime_checkable @@ -138,7 +146,7 @@ def id(self) -> str: ... def update(self, **fields: Any) -> None: ... - def end(self, **fields: Any) -> None: ... + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ... @runtime_checkable @@ -224,6 +232,7 @@ def generation( output: Any = None, usage: LangfuseUsage | None = None, prompt: Any = None, + start_time: datetime | None = None, ) -> LangfuseGenerationHandle: ... def force_flush(self, timeout_ms: int = 30_000) -> bool: @@ -268,7 +277,9 @@ def id(self) -> str: def update(self, **fields: Any) -> None: _apply_fields(self.observation, fields) - def end(self, **fields: Any) -> None: + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: + if end_time is not None: + self.observation.end_time = end_time _apply_fields(self.observation, fields) self.observation.ended = True @@ -286,7 +297,9 @@ def id(self) -> str: def update(self, **fields: Any) -> None: _apply_fields(self.observation, fields) - def end(self, **fields: Any) -> None: + def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: + if end_time is not None: + self.observation.end_time = end_time _apply_fields(self.observation, fields) self.observation.ended = True @@ -428,6 +441,7 @@ def generation( output: Any = None, usage: LangfuseUsage | None = None, prompt: Any = None, + start_time: datetime | None = None, ) -> LangfuseGenerationHandle: trace = self._get_trace(trace_id) observation = LangfuseObservation( @@ -444,6 +458,7 @@ def generation( output=output, usage=usage, prompt_entity_link=prompt, + start_time=start_time, ) trace.observations.append(observation) return _InMemoryGenerationHandle(observation=observation) diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 55016ce..f68f890 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -26,6 +26,7 @@ import uuid from collections.abc import Callable, Mapping from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta from typing import Any, cast from openarmature.graph.events import ( @@ -191,7 +192,6 @@ class _InvState: open_observations: dict[_StackKey, _OpenObservation] = field( default_factory=dict[_StackKey, _OpenObservation] ) - open_llm_observations: dict[str, _OpenObservation] = field(default_factory=dict[str, _OpenObservation]) # Synthetic subgraph dispatch Span observations, keyed by namespace # prefix. Per spec §8.3 each subgraph wrapper produces a Span # observation in its parent's Trace; descendant node observations @@ -366,23 +366,23 @@ async def __call__( if isinstance(event, InvocationCompletedEvent): self._handle_invocation_completed(event) return - # Proposal 0049 typed LlmCompletionEvent: ignored during the - # dual-emit window — the Langfuse mapping continues to drive - # its §5.5 Generation observation lifecycle off the sentinel - # NodeEvent pair the provider emits alongside the typed event. - # Migration to type discrimination lands in a subsequent PR; - # this early-return keeps the observer Protocol-compatible - # without changing behavior. + # Proposal 0049 typed LlmCompletionEvent (success path). Drives + # the §5.5 Generation observation lifecycle for successful + # provider calls. Failures don't emit this variant; they flow + # through the sentinel-pair error path below. if isinstance(event, LlmCompletionEvent): + if not self.disable_llm_spans: + self._handle_typed_llm_completion(event) return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider events use a sentinel namespace per §5.5; route - # them to the dedicated Generation path. + # LLM provider sentinel events: failure-path completed opens + + # closes an ERROR-level Generation; everything else is a no-op + # (success-path typed handler above owns the Generation). if event.namespace == LLM_NAMESPACE: if not self.disable_llm_spans: - self._handle_llm_event(event) + self._handle_llm_error_event(event) return if event.phase == "started": self._open_started_observation(event) @@ -1244,13 +1244,12 @@ def close_invocation(self, invocation_id: str) -> None: if inv_state is None: return # Order: deepest leaves first so parents see all children - # closed before they end. LLM observations → leaf nodes - # (sorted deepest-first by namespace length) → per-instance - # fan-out dispatches → subgraph dispatches. - for call_id in list(inv_state.open_llm_observations.keys()): - obs = inv_state.open_llm_observations.pop(call_id, None) - if obs is not None: - obs.handle.end() + # closed before they end. Leaf node observations (sorted + # deepest-first by namespace length) → per-instance fan-out + # dispatches → subgraph dispatches. LLM observations don't + # appear here — both the success and error paths open + close + # the Generation in one shot at handler-time, so there are no + # in-flight LLM observations to drain. for key in sorted( inv_state.open_observations.keys(), key=lambda k: -len(k[0]), @@ -1321,82 +1320,155 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> # Generation observation lifecycle (LLM provider events) # ------------------------------------------------------------------ - def _handle_llm_event(self, event: NodeEvent) -> None: + # v0.13.0 (proposal 0049 + 0057): success-path Generation lifecycle + # is driven by the typed LlmCompletionEvent — opened and closed in + # one shot at typed-event arrival, with start_time back-dated by + # latency_ms so the observation's duration reflects the adapter- + # boundary measurement rather than dispatcher queue delay. Failure + # path keeps the sentinel NodeEvent pair (LlmCompletionEvent is + # success-only per proposal 0049 §3 alternative 3). The provider + # also dropped sentinel-pair emission on the success path in this + # release, so on success the typed event is the only signal the + # Generation observation has to fire from. + def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: + """Open + close the Generation observation from the typed + LlmCompletionEvent (success path).""" + # Mid-call metadata augmentation can't reach this observation: + # the typed event arrives only after complete() returns, and + # the observation is back-dated past any augmentation event + # that fired while the call was in flight. Since complete() + # is awaited, node bodies can't actually run augmentation + # mid-call, so this is theoretical only. from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, ) - if not isinstance(event.pre_state, LlmEventPayload): - # Defensive — sentinel-namespaced events MUST carry an - # LlmEventPayload per llm-provider / observability §5.5. - return invocation_id = current_invocation_id() if invocation_id is None: return - payload = event.pre_state + correlation_id = current_correlation_id() # The Trace MAY not exist yet if the LLM call fires before any - # node `started` event has hit this observer (race-y under - # tests that prepare via `prepare_sync` only). The in-memory - # client tolerates create-on-demand; production SDK adapters - # should too. + # node `started` event has hit this observer. Create-on-demand + # mirrors the sentinel-pair handler's behavior. if invocation_id not in self._inv_states: - self._open_trace(invocation_id, current_correlation_id(), event) + self._open_trace_for_typed_event(invocation_id, correlation_id, event) inv_state = self._inv_states[invocation_id] - correlation_id = current_correlation_id() + # Back-date start_time using latency_ms; fall back to end_time + # for both when latency is missing (zero-duration observation, + # mirroring the OTel path). + end_time = datetime.now(UTC) + if event.latency_ms is not None: + start_time = end_time - timedelta(milliseconds=event.latency_ms) + else: + start_time = end_time + parent_observation_id = self._resolve_llm_parent_observation_id( + inv_state, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, + ) + metadata = self._typed_event_metadata(event, correlation_id) + model_parameters: dict[str, Any] = dict(event.request_params or {}) + input_value: Any = None + output_value: Any = None + if not self.disable_llm_payload: + if event.input_messages: + input_value = self._maybe_truncate_for_input(event.input_messages) + if event.output_content is not None: + output_value = self._maybe_truncate_for_output(event.output_content) + if event.request_extras: + metadata["request_extras"] = self._maybe_truncate_for_extras(dict(event.request_extras)) + target_trace_id = self._trace_id_for(inv_state, event.namespace, event.fan_out_index) + handle = self.client.generation( + trace_id=target_trace_id, + name="openarmature.llm.complete", + model=event.model, + model_parameters=model_parameters, + input=input_value, + output=output_value, + metadata=metadata, + parent_observation_id=parent_observation_id, + prompt=self._resolve_prompt_link_from_typed_event(event), + start_time=start_time, + ) + usage = self._usage_from_typed_event(event) + end_kwargs: dict[str, Any] = {} + if usage is not None: + end_kwargs["usage"] = usage + handle.end(end_time=end_time, **end_kwargs) - if event.phase == "started": - parent_observation_id = self._resolve_llm_parent_observation_id(inv_state, payload) - metadata, model_parameters, input_value, output_value = self._llm_metadata_and_payload( - payload, correlation_id, phase="started" - ) - target_trace_id = self._trace_id_for( - inv_state, payload.calling_namespace_prefix, payload.calling_fan_out_index - ) - handle = self.client.generation( - trace_id=target_trace_id, - name="openarmature.llm.complete", - model=payload.model, - model_parameters=model_parameters, - input=input_value, - output=output_value, - metadata=metadata, - parent_observation_id=parent_observation_id, - prompt=self._resolve_prompt_link(payload), - ) - inv_state.open_llm_observations[payload.call_id] = _OpenObservation( - handle=handle, - fan_out_index_chain=event.fan_out_index_chain, - branch_name_chain=event.branch_name_chain, - ) - return + def _handle_llm_error_event(self, event: NodeEvent) -> None: + """Emit an ERROR-level Generation observation from the sentinel + NodeEvent on the failure path. Success-path sentinel completion + is no longer emitted by the provider in v0.13.0; this handler + only fires for failures.""" + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) - # completed: pop the started handle and finalize. - observation = inv_state.open_llm_observations.pop(payload.call_id, None) - if observation is None: + if event.phase != "completed": + # Sentinel started becomes a no-op once the success-side + # emission drops. Failures only emit the completed half. + return + if not isinstance(event.pre_state, LlmEventPayload): return - metadata, _model_parameters, _input_value, output_value = self._llm_metadata_and_payload( - payload, correlation_id, phase="completed" + payload = event.pre_state + if payload.error_type is None: + # Defensive — success path no longer emits the sentinel + # pair; if a non-error sentinel completion slips through + # (e.g., legacy custom provider not yet migrated), the + # typed event handler owns the Generation. + return + invocation_id = current_invocation_id() + if invocation_id is None: + return + correlation_id = current_correlation_id() + if invocation_id not in self._inv_states: + self._open_trace(invocation_id, correlation_id, event) + inv_state = self._inv_states[invocation_id] + parent_observation_id = self._resolve_llm_parent_observation_id( + inv_state, + calling_namespace_prefix=payload.calling_namespace_prefix, + calling_attempt_index=payload.calling_attempt_index, + calling_fan_out_index=payload.calling_fan_out_index, + calling_branch_name=payload.calling_branch_name, ) - end_kwargs: dict[str, Any] = {"metadata": metadata} - if output_value is not None: - end_kwargs["output"] = output_value - usage = self._usage_from_payload(payload) - if usage is not None: - end_kwargs["usage"] = usage - # Error-category mapping: §8.4.2 + §8.4.3 (an LLM provider - # error_category lands on the Generation observation's level - # and statusMessage the same as on a Span observation). - if payload.error_category is not None: - end_kwargs["level"] = "ERROR" - end_kwargs["status_message"] = payload.error_category - observation.handle.end(**end_kwargs) + metadata, model_parameters, input_value, _ = self._llm_metadata_and_payload( + payload, correlation_id, phase="started" + ) + target_trace_id = self._trace_id_for( + inv_state, payload.calling_namespace_prefix, payload.calling_fan_out_index + ) + handle = self.client.generation( + trace_id=target_trace_id, + name="openarmature.llm.complete", + model=payload.model, + model_parameters=model_parameters, + input=input_value, + metadata=metadata, + parent_observation_id=parent_observation_id, + prompt=self._resolve_prompt_link(payload), + ) + # Error-category mapping: §8.4.2 + §8.4.3. + end_kwargs: dict[str, Any] = { + "level": "ERROR", + "status_message": payload.error_category or payload.error_type, + } + handle.end(**end_kwargs) def _resolve_llm_parent_observation_id( - self, inv_state: _InvState, payload: LlmEventPayload + self, + inv_state: _InvState, + *, + calling_namespace_prefix: tuple[str, ...], + calling_attempt_index: int, + calling_fan_out_index: int | None, + calling_branch_name: str | None, ) -> str | None: - # Calling-node identity comes from the payload (set at - # dispatch time per llm-provider §5.5). Precedence: + # Calling-node identity precedence: # 1. Exact-match leaf node at the calling key. # 2. Per-instance fan-out dispatch observation when the # call originated inside a fan-out instance. @@ -1407,28 +1479,115 @@ def _resolve_llm_parent_observation_id( # exact-match miss would otherwise need a leaf-ancestor walk # to handle. key: _StackKey = ( - payload.calling_namespace_prefix, - payload.calling_attempt_index, - payload.calling_fan_out_index, - payload.calling_branch_name, + calling_namespace_prefix, + calling_attempt_index, + calling_fan_out_index, + calling_branch_name, ) observation = inv_state.open_observations.get(key) if observation is not None: return observation.handle.id # Per-instance fan-out dispatch. - if payload.calling_fan_out_index is not None and payload.calling_namespace_prefix: - instance_key = payload.calling_namespace_prefix[:1] + (str(payload.calling_fan_out_index),) + if calling_fan_out_index is not None and calling_namespace_prefix: + instance_key = calling_namespace_prefix[:1] + (str(calling_fan_out_index),) dispatch = inv_state.fan_out_instance_observations.get(instance_key) if dispatch is not None: return dispatch.handle.id # Subgraph dispatch, longest-prefix-first. - for prefix_len in range(len(payload.calling_namespace_prefix), 0, -1): - prefix = payload.calling_namespace_prefix[:prefix_len] + for prefix_len in range(len(calling_namespace_prefix), 0, -1): + prefix = calling_namespace_prefix[:prefix_len] sg = inv_state.subgraph_observations.get(prefix) if sg is not None: return sg.handle.id return None + def _typed_event_metadata(self, event: LlmCompletionEvent, correlation_id: str | None) -> dict[str, Any]: + """Build the Generation observation's metadata dict from the + typed event. Mirrors _llm_metadata_and_payload's metadata + construction but reads from LlmCompletionEvent fields, and + combines started + completed phases into a single populated + dict (the typed event carries everything at once).""" + metadata: dict[str, Any] = {} + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + metadata["system"] = event.provider + active_prompt = event.active_prompt + if active_prompt is not None: + metadata["prompt"] = { + "name": active_prompt.name, + "version": active_prompt.version, + "label": active_prompt.label, + "template_hash": active_prompt.template_hash, + "rendered_hash": active_prompt.rendered_hash, + } + active_group = event.active_prompt_group + if active_group is not None: + metadata["prompt_group_name"] = active_group.group_name + # Asymmetric guard with _llm_metadata_and_payload below: the + # typed event types caller_invocation_metadata as Mapping | None + # while LlmEventPayload defaults to an empty mapping (never + # None). Don't "normalize" the two paths without normalizing + # the source types. + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(metadata, event.caller_invocation_metadata) + if event.finish_reason is not None: + metadata["finish_reason"] = event.finish_reason + if event.response_model is not None: + metadata["response_model"] = event.response_model + if event.response_id is not None: + metadata["response_id"] = event.response_id + return metadata + + def _usage_from_typed_event(self, event: LlmCompletionEvent) -> LangfuseUsage | None: + """Map the typed event's Usage onto the Langfuse Usage record + per §8.4.3. Returns None when no usage was reported.""" + usage = event.usage + if usage is None: + return None + if usage.prompt_tokens is None and usage.completion_tokens is None and usage.total_tokens is None: + return None + return LangfuseUsage( + input=usage.prompt_tokens, + output=usage.completion_tokens, + total=usage.total_tokens, + ) + + def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent) -> Any: + """§8.4.4 case discrimination on the typed event's active_prompt + snapshot. Same logic as _resolve_prompt_link but reads from + LlmCompletionEvent instead of LlmEventPayload.""" + active_prompt = event.active_prompt + if active_prompt is None: + return None + entities = getattr(active_prompt, "observability_entities", None) + if not isinstance(entities, dict): + return None + return cast("dict[str, Any]", entities).get("langfuse_prompt") + + def _open_trace_for_typed_event( + self, invocation_id: str, correlation_id: str | None, event: LlmCompletionEvent + ) -> None: + """Trace open path for a typed LlmCompletionEvent arriving + before any node-started event reached this observer. + Synthesizes the minimal trace shape from the typed event's + scoping fields.""" + if event.namespace: + entry_node = event.namespace[0] + else: + entry_node = event.node_name or "openarmature.llm.complete" + metadata: dict[str, Any] = { + "entry_node": entry_node, + "spec_version": self.spec_version, + "implementation_name": self.implementation_name, + "implementation_version": self.implementation_version, + } + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(metadata, event.caller_invocation_metadata) + self.client.trace(id=invocation_id, name=entry_node, metadata=metadata) + self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) + def _llm_metadata_and_payload( self, payload: LlmEventPayload, diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 18bdb63..5265876 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -595,11 +595,12 @@ async def __call__( if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider sentinel events: success-path is a no-op (the - # typed event handler above owns the span). Failure-path - # completed events open + close an error span; sentinel-started - # and successful-completed are no-ops here. Dual-emit window - # closes in v0.15.0 per the CHANGELOG note pinned to v0.13.0. + # LLM provider sentinel events: the typed event handler above + # owns the success-path span. Failure-path ``completed`` events + # open + close an error span. v0.13.0 dropped sentinel-pair + # emission on the success path entirely; the only remaining + # sentinel emission is failure-path ``completed`` (until the + # spec extends the typed event with error semantics). if event.namespace == _LLM_NAMESPACE: if not self.disable_llm_spans: self._handle_llm_error_event(event) @@ -1086,9 +1087,11 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: # shot at typed-event arrival, with start_time back-dated by # latency_ms so the span duration matches the adapter-boundary # measurement. The error-path span is still driven by the sentinel - # NodeEvent pair the provider emits (LlmCompletionEvent is success- - # only per proposal 0049 §3 alternative 3). Dual-emit window closes - # in v0.15.0 — the sentinel pair will then drop entirely. + # NodeEvent (LlmCompletionEvent is success-only per proposal 0049 + # §3 alternative 3). Success-side sentinel emission was dropped + # from the provider in v0.13.0; the failure-side sentinel emission + # stays until the spec extends LlmCompletionEvent with error + # semantics (coord-thread tracked). # # v0.17.0 attribute set (proposal 0024) preserved unchanged: # - Baseline openarmature.llm.* attributes diff --git a/tests/integration/test_langfuse_sdk_adapter.py b/tests/integration/test_langfuse_sdk_adapter.py index 5018d14..f3e2a17 100644 --- a/tests/integration/test_langfuse_sdk_adapter.py +++ b/tests/integration/test_langfuse_sdk_adapter.py @@ -138,3 +138,98 @@ async def test_sdk_adapter_handles_invocation_with_no_real_observation() -> None assert trace.output == expected_output assert len(trace.observations) == 1 assert trace.observations[0].name == "openarmature.trace_io" + + +@pytest.mark.integration +async def test_sdk_adapter_generation_timestamps_round_trip_through_langfuse() -> None: + """End-to-end verification that explicit start_time / end_time on + the adapter's generation(...) / handle.end(...) calls actually + land on the Langfuse-side observation. The unit tests cover the + SDK call-site (start_observation receives start_time= kwarg); this + test closes the loop by reading the projected timestamps back from + the REST API and asserting they reflect the back-dated values. + + Catches the failure mode the v0.13.0 Langfuse migration is + susceptible to: if the v4 SDK silently drops start_time as an + unrecognized kwarg on a particular version, the Langfuse UI shows + call-time timestamps instead of the back-dated latency_ms-based + ones, with no error to surface the misconfiguration. + """ + from datetime import UTC, datetime, timedelta + + from langfuse import Langfuse + + from openarmature.observability.langfuse.adapter import LangfuseSDKAdapter + + client = Langfuse() + adapter = LangfuseSDKAdapter(client) + + invocation_id = str(uuid.uuid4()) + adapter.trace(id=invocation_id, name="test_sdk_adapter_generation_timestamps") + # Back-date by 250 ms — a value far enough above the SDK's own + # call-time jitter that a passthrough failure would show up + # clearly in the projected start/end timestamps. + end_time = datetime.now(UTC) + start_time = end_time - timedelta(milliseconds=250) + handle = adapter.generation( + trace_id=invocation_id, + name="openarmature.llm.complete", + model="test-model", + start_time=start_time, + ) + handle.end(end_time=end_time) + + adapter.force_flush() + time.sleep(2) + + hex_id = invocation_id.replace("-", "") + try: + _poll_trace_with_retry(client, hex_id) + # Pull observations directly: the trace.observations list lags + # the headline trace fields in the REST projection, so query + # the observations endpoint with a retry budget similar to the + # trace poll. Filter by name rather than indexing the first + # entry so future synthetic observations (Langfuse adds + # ``openarmature.trace_io`` carriers for trace input/output + # under some flows) don't shadow the target Generation. + observation: Any = None + last_exc: Exception | None = None + for _ in range(12): + try: + response = client.api.observations.get_many(trace_id=hex_id) + observation = next( + (o for o in response.data if o.name == "openarmature.llm.complete"), + None, + ) + if observation is not None: + break + except Exception as exc: # noqa: BLE001 + last_exc = exc + time.sleep(5) + assert observation is not None, ( + f"openarmature.llm.complete observation for trace {hex_id} did not appear via REST " + f"after retry budget; last error: {last_exc!r}" + ) + + # The REST projection's start/end timestamps MUST match the + # back-dated values within a small tolerance (the SDK rounds + # to microseconds; Langfuse's REST projection may round + # further). + assert observation.start_time is not None + assert observation.end_time is not None + start_delta = abs((observation.start_time - start_time).total_seconds()) + end_delta = abs((observation.end_time - end_time).total_seconds()) + assert start_delta < 0.01, ( + f"observation.start_time drift {start_delta * 1000:.3f}ms exceeds 10ms tolerance — " + f"sent {start_time.isoformat()}, got {observation.start_time.isoformat()}" + ) + assert end_delta < 0.01, ( + f"observation.end_time drift {end_delta * 1000:.3f}ms exceeds 10ms tolerance — " + f"sent {end_time.isoformat()}, got {observation.end_time.isoformat()}" + ) + finally: + # Match the existing module's clean-exit pattern: shutdown + # releases the SDK's background OTel exporter + ingestion + # queues. Without this, a long-running pytest process could + # accumulate background threads across integration tests. + client.shutdown() diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index a604442..db985ba 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -768,17 +768,13 @@ async def test_complete_negative_cached_tokens_surfaces_as_invalid_response() -> await provider.aclose() -async def test_complete_populates_cached_tokens_on_llm_event_payload() -> None: - # Proposal 0047: _make_llm_event MUST propagate - # Response.usage.cached_tokens onto the sentinel LlmEventPayload - # so observers driving §5.5.3.1 cache attribute emission off the - # sentinel NodeEvent pair have the cache stat available. The - # conformance fixture 040 covers this end-to-end via OTel span - # attribute assertion; this test localizes the assertion to the - # provider-payload boundary so a regression here surfaces - # independently of the observer rendering layer. - from openarmature.observability.llm_event import LlmEventPayload - +async def test_complete_populates_cached_tokens_on_typed_event() -> None: + # Proposal 0047: Response.usage.cached_tokens MUST flow onto the + # typed LlmCompletionEvent's usage record so observers driving + # §5.5.3.1 cache attribute emission have the cache stat available. + # This locks the field at the provider-event boundary; the + # conformance fixture 040 covers the end-to-end OTel span + # attribute path. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( { @@ -795,26 +791,18 @@ async def test_complete_populates_cached_tokens_on_llm_event_payload() -> None: await provider.aclose() _release_dispatch(token) - completed_payloads = [ - e.pre_state - for e in events - if isinstance(e, NodeEvent) and e.phase == "completed" and isinstance(e.pre_state, LlmEventPayload) - ] - assert len(completed_payloads) == 1 - payload = completed_payloads[0] - assert payload.cached_tokens == 42 + typed = next(e for e in events if isinstance(e, LlmCompletionEvent)) + assert typed.usage is not None + assert typed.usage.cached_tokens == 42 # The OpenAI-compat mapping leaves cache_creation_tokens absent - # per spec §8.1.2; verify the field stays None on the payload. - assert payload.cache_creation_tokens is None + # per spec §8.1.2; verify the field stays None on the typed event. + assert typed.usage.cache_creation_tokens is None async def test_complete_leaves_cached_tokens_none_when_provider_silent() -> None: # Companion to the populated case: when the wire response omits # prompt_tokens_details, Response.usage.cached_tokens stays None - # and the LlmEventPayload reflects that. Locks the absent path - # at the provider-payload boundary. - from openarmature.observability.llm_event import LlmEventPayload - + # and the typed event's Usage record reflects that. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 100, "completion_tokens": 5, "total_tokens": 105} @@ -826,14 +814,10 @@ async def test_complete_leaves_cached_tokens_none_when_provider_silent() -> None await provider.aclose() _release_dispatch(token) - completed_payloads = [ - e.pre_state - for e in events - if isinstance(e, NodeEvent) and e.phase == "completed" and isinstance(e.pre_state, LlmEventPayload) - ] - assert len(completed_payloads) == 1 - assert completed_payloads[0].cached_tokens is None - assert completed_payloads[0].cache_creation_tokens is None + typed = next(e for e in events if isinstance(e, LlmCompletionEvent)) + assert typed.usage is not None + assert typed.usage.cached_tokens is None + assert typed.usage.cache_creation_tokens is None # RuntimeConfig.from_partial — Python ergonomic introduced alongside @@ -1290,11 +1274,13 @@ def _release_dispatch(token: _DispatchToken) -> None: _reset_active_dispatch(token) -async def test_complete_success_emits_both_sentinel_and_typed_event() -> None: - # Dual-emit window per proposal 0049 §5.5.7 SHOULD-emit-both - # transition: the provider emits BOTH the existing sentinel - # NodeEvent pair (started + completed) AND the new typed - # LlmCompletionEvent on success. +async def test_complete_success_emits_only_typed_event() -> None: + # v0.13.0 dropped the success-side sentinel emission. The provider + # now emits a single typed LlmCompletionEvent on success; no + # sentinel NodeEvent pair (started or completed) fires for + # successful calls. External observers consuming LLM events on + # the success path MUST filter via isinstance(event, + # LlmCompletionEvent). events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} @@ -1308,20 +1294,17 @@ async def test_complete_success_emits_both_sentinel_and_typed_event() -> None: node_events = [e for e in events if isinstance(e, NodeEvent)] typed_events = [e for e in events if isinstance(e, LlmCompletionEvent)] - # Sentinel pair: started + completed (the existing pattern). - assert len(node_events) == 2 - assert [e.phase for e in node_events] == ["started", "completed"] - # One typed event (success-only emission). + assert node_events == [] assert len(typed_events) == 1 -async def test_complete_failure_emits_only_sentinel_no_typed_event() -> None: +async def test_complete_failure_emits_only_sentinel_completed_no_typed_event() -> None: # Per proposal 0049 §3 alternative 3: LlmCompletionEvent fires on # successful structured-response completion only. Provider - # exceptions (provider_unavailable etc.) flow through the - # existing exception path; the sentinel NodeEvent(completed, - # error=...) keeps firing for backwards-compat failure - # observability. + # exceptions (provider_unavailable etc.) flow through the existing + # exception path; the sentinel NodeEvent(completed, error=...) + # carries the error fields. v0.13.0 dropped the sentinel ``started`` + # event entirely — only ``completed`` fires on failure. from openarmature.graph.events import LlmCompletionEvent, NodeEvent def _503(_req: httpx.Request) -> httpx.Response: @@ -1340,11 +1323,8 @@ def _503(_req: httpx.Request) -> httpx.Response: node_events = [e for e in events if isinstance(e, NodeEvent)] typed_events = [e for e in events if isinstance(e, LlmCompletionEvent)] - # Sentinel pair fires; the completed event carries error details - # on its LlmEventPayload. - assert len(node_events) == 2 - assert [e.phase for e in node_events] == ["started", "completed"] - # No typed event on the failure path. + assert len(node_events) == 1 + assert node_events[0].phase == "completed" assert typed_events == [] @@ -1705,16 +1685,14 @@ def _handler(_req: httpx.Request) -> httpx.Response: assert typed.response_id is None -async def test_llm_completion_event_arrives_after_sentinel_completed_within_provider_emission() -> None: - # Within the provider's own emission window (sentinel started → - # sentinel completed → typed event), the typed LlmCompletionEvent - # MUST arrive after the sentinel NodeEvent(completed). Spec - # fixture 056 pins the broader contract (typed event arrives - # between the CALLING NODE's started/completed pair); this test - # locks down the provider-internal sub-ordering that contract - # depends on. The full bracketing (calling-node started → ... → - # calling-node completed) is covered by the conformance fixture - # which exercises a real CompiledGraph. +async def test_complete_success_emits_typed_event_as_single_emission() -> None: + # v0.13.0 dropped the success-side sentinel emission, so the + # provider's success-path emission window is now a single typed + # event — no within-emission ordering question remains. This test + # locks the single-emission shape so a regression that re-adds a + # sentinel NodeEvent on success would surface here. Spec fixture + # 056 pins the broader bracketing (typed event arrives between + # the CALLING NODE's started/completed pair) end-to-end. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} @@ -1726,14 +1704,8 @@ async def test_llm_completion_event_arrives_after_sentinel_completed_within_prov await provider.aclose() _release_dispatch(token) - # Build a type+phase tag for each event in arrival order. - sequence: list[str] = [] - for ev in events: - if isinstance(ev, NodeEvent): - sequence.append(f"sentinel:{ev.phase}") - elif isinstance(ev, LlmCompletionEvent): - sequence.append("typed:completed") - assert sequence == ["sentinel:started", "sentinel:completed", "typed:completed"] + assert len(events) == 1 + assert isinstance(events[0], LlmCompletionEvent) async def test_llm_completion_event_sources_node_identity_from_calling_context() -> None: diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 59a4697..9f467f3 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -1205,3 +1205,261 @@ async def test_implementation_attribution_rows_emit_on_every_trace() -> None: assert trace.metadata.get("implementation_name") == "openarmature-python" assert isinstance(trace.metadata.get("implementation_version"), str) assert trace.metadata["implementation_version"] + + +# --------------------------------------------------------------------------- +# Typed LlmCompletionEvent handling (proposal 0049 + 0057, PR 3c) +# --------------------------------------------------------------------------- + + +async def test_typed_llm_event_emits_generation_with_expected_fields() -> None: + # Happy-path: a single LlmCompletionEvent produces exactly one + # Generation observation under the typed event's invocation_id + # Trace, with model / usage / metadata sourced from the event. + from openarmature.llm.response import Usage + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + # disable_llm_payload defaults to True per §8.9; flip it off here + # so the test can also assert the payload (output) makes it through. + observer = LangfuseObserver(client=client, disable_llm_payload=False) + token = _set_invocation_id("inv-typed-1") + try: + await observer( + make_typed_event( + invocation_id="inv-typed-1", + model="m-test", + provider="vllm", + usage=Usage(prompt_tokens=10, completion_tokens=4, total_tokens=14), + finish_reason="stop", + response_id="resp-abc", + response_model="m-test-001", + output_content="hello", + request_params={"temperature": 0.7}, + ) + ) + finally: + _reset_invocation_id(token) + + assert "inv-typed-1" in client.traces + trace = client.traces["inv-typed-1"] + generations = [o for o in trace.observations if o.type == "generation"] + assert len(generations) == 1 + obs = generations[0] + assert obs.model == "m-test" + assert obs.usage == LangfuseUsage(input=10, output=4, total=14) + assert obs.model_parameters == {"temperature": 0.7} + assert obs.output == "hello" + assert obs.metadata.get("system") == "vllm" + assert obs.metadata.get("finish_reason") == "stop" + assert obs.metadata.get("response_id") == "resp-abc" + assert obs.metadata.get("response_model") == "m-test-001" + assert obs.ended is True + + +async def test_typed_llm_event_back_dates_generation_using_latency_ms() -> None: + # Generation observation's start/end timestamps reflect the + # adapter-boundary latency rather than the typed event's arrival + # moment. Verify end - start matches latency_ms within tolerance. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + latency_ms = 250.0 + token = _set_invocation_id("inv-typed-dur") + try: + await observer(make_typed_event(invocation_id="inv-typed-dur", latency_ms=latency_ms)) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-typed-dur"] + obs = next(o for o in trace.observations if o.type == "generation") + assert obs.start_time is not None and obs.end_time is not None + duration_ms = (obs.end_time - obs.start_time).total_seconds() * 1000 + # Float arithmetic tolerance; the back-date should be near-exact + # apart from microsecond rounding. + assert abs(duration_ms - latency_ms) < 1.0 + + +async def test_typed_llm_event_zero_duration_when_latency_missing() -> None: + # latency_ms=None falls back to a zero-duration Generation at + # end_time. Mirrors the OTel path. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + token = _set_invocation_id("inv-typed-no-latency") + try: + await observer(make_typed_event(invocation_id="inv-typed-no-latency", latency_ms=None)) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-typed-no-latency"] + obs = next(o for o in trace.observations if o.type == "generation") + assert obs.start_time is not None and obs.end_time is not None + assert obs.start_time == obs.end_time + + +async def test_typed_llm_event_drops_silently_outside_invocation() -> None: + # Without an invocation id ContextVar set, the typed handler + # MUST early-return without emitting a Generation. Symmetric with + # the OTel observer. + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + await observer(make_typed_event()) + assert client.traces == {} + + +async def test_disable_llm_spans_skips_typed_event_path() -> None: + # disable_llm_spans MUST gate the typed-event handler. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, disable_llm_spans=True) + token = _set_invocation_id("inv-disabled") + try: + await observer(make_typed_event(invocation_id="inv-disabled")) + finally: + _reset_invocation_id(token) + assert client.traces == {} + + +async def test_llm_error_path_emits_error_generation_from_sentinel_completed() -> None: + # Failure-path provider exceptions still flow through the sentinel + # NodeEvent(completed, error=...). Verify the observer emits an + # ERROR-level Generation with the canonical error_category as + # status_message. + from openarmature.graph.events import NodeEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from openarmature.observability.llm_event import LlmEventPayload + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + token = _set_invocation_id("inv-err") + try: + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="cc-err", + model="m-test", + error_type="ProviderRateLimit", + error_category="provider_rate_limited", + error_message="429 from upstream", + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(completed) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-err"] + obs = next(o for o in trace.observations if o.type == "generation") + assert obs.level == "ERROR" + assert obs.status_message == "provider_rate_limited" + + +async def test_llm_error_event_parents_under_branch_calling_node() -> None: + # Regression cover for the _resolve_llm_parent_observation_id + # keyword-only refactor: when an LLM failure fires inside a + # parallel-branches branch, the resulting ERROR Generation MUST + # parent under THAT branch's calling node observation, not under + # a sibling branch's same-named node. Pre-populates the observer's + # internal state with two open node observations that differ only + # by branch_name, then dispatches a sentinel-completed-error with + # a matching calling_branch_name and asserts the parent_observation + # _id points at the right one. + # + # Note: the same _resolve_llm_parent_observation_id call also + # serves the success-path typed event handler (with + # calling_branch_name = event.branch_name); error- and success- + # paths share the resolver, so this test transitively covers the + # success-path branch_name handling. + from openarmature.graph.events import NodeEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from openarmature.observability.langfuse.observer import ( + _InvState, + _OpenObservation, + ) + from openarmature.observability.llm_event import LlmEventPayload + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + invocation_id = "inv-pb-err" + token = _set_invocation_id(invocation_id) + try: + # Bootstrap the Trace + two branch-distinguished node + # observations directly. _InvState's open_observations map is + # keyed by (namespace, attempt_index, fan_out_index, + # branch_name); the calling node identity on the error payload + # is (("dispatcher", "ask"), 0, None, "fast"). + client.trace(id=invocation_id, name="dispatcher") + observer._inv_states[invocation_id] = _InvState(trace_id=invocation_id) # noqa: SLF001 + inv_state = observer._inv_states[invocation_id] # noqa: SLF001 + # Open two observations under the trace — one per branch. + fast_handle = client.generation(trace_id=invocation_id, name="ask", model="m-test") + slow_handle = client.generation(trace_id=invocation_id, name="ask", model="m-test") + fast_key = (("dispatcher", "ask"), 0, None, "fast") + slow_key = (("dispatcher", "ask"), 0, None, "slow") + inv_state.open_observations[fast_key] = _OpenObservation(handle=fast_handle) + inv_state.open_observations[slow_key] = _OpenObservation(handle=slow_handle) + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="cc-pb", + model="m-test", + error_type="ProviderUnavailable", + error_category="provider_unavailable", + error_message="503 from upstream", + calling_namespace_prefix=("dispatcher", "ask"), + calling_attempt_index=0, + calling_fan_out_index=None, + calling_branch_name="fast", + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(completed) + finally: + _reset_invocation_id(token) + + trace = client.traces[invocation_id] + # Three observations now: two synthetic "ask" + one error + # Generation. The error Generation MUST parent under fast_handle, + # not slow_handle. + error_gens = [o for o in trace.observations if o.type == "generation" and o.level == "ERROR"] + assert len(error_gens) == 1 + assert error_gens[0].parent_observation_id == fast_handle.id + assert error_gens[0].parent_observation_id != slow_handle.id diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index ae53406..f4bfb89 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -161,6 +161,70 @@ def test_adapter_force_flush_delegates_to_client() -> None: assert wrapped.flush.call_count == 2 +def test_adapter_generation_forwards_start_time_to_sdk(monkeypatch: pytest.MonkeyPatch) -> None: + # The typed-event LLM path back-dates the Generation observation + # using ``start_time = end_time - timedelta(milliseconds= + # latency_ms)``. The adapter MUST flow ``start_time`` through to + # the v4 SDK's ``start_observation(start_time=...)`` kwarg. + # Without this passthrough, Langfuse would show the call-time + # timestamp instead of the back-dated one — the verification gap + # the InMemory client can't catch on its own. + from datetime import UTC, datetime + from unittest.mock import MagicMock + + client = _dummy_client() + captured_kwargs: dict[str, Any] = {} + + def _spy(**kwargs: Any) -> MagicMock: + captured_kwargs.update(kwargs) + return MagicMock(id="obs-spy", end=MagicMock()) + + monkeypatch.setattr(client, "start_observation", _spy) + adapter = LangfuseSDKAdapter(client) + adapter.trace(id="trace-ts", name="t") + + start = datetime(2026, 6, 8, 12, 0, 0, tzinfo=UTC) + adapter.generation(trace_id="trace-ts", name="g", model="m", start_time=start) + + assert captured_kwargs.get("start_time") == start + + +def test_adapter_generation_handle_end_forwards_end_time_to_sdk() -> None: + # Companion to the start_time test: the handle's end() MUST flow + # ``end_time`` through to the underlying v4 obs's ``end(end_time + # =...)`` call so the Langfuse UI shows the back-dated duration. + from datetime import UTC, datetime + from unittest.mock import MagicMock + + from openarmature.observability.langfuse.adapter import _SpanHandle + + sdk_obs = MagicMock(id="obs-e") + sdk_obs.end = MagicMock() + handle = _SpanHandle(sdk_obs) + + end = datetime(2026, 6, 8, 12, 0, 1, tzinfo=UTC) + handle.end(end_time=end) + + sdk_obs.end.assert_called_once_with(end_time=end) + + +def test_adapter_generation_handle_end_omits_end_time_when_unspecified() -> None: + # When no end_time is supplied, the handle MUST call the SDK obs's + # end() without the kwarg so the SDK uses its default + # (call-time). Locks the "default-respecting" branch. + from unittest.mock import MagicMock + + from openarmature.observability.langfuse.adapter import _SpanHandle + + sdk_obs = MagicMock(id="obs-e") + sdk_obs.end = MagicMock() + handle = _SpanHandle(sdk_obs) + + handle.end() + + sdk_obs.end.assert_called_once_with() + + # --------------------------------------------------------------------------- # Integration test against real Langfuse Cloud (opt-in) # --------------------------------------------------------------------------- From 6bf1b5b78e7b73e4e91f95c1a058ede3a54ec2f7 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 8 Jun 2026 17:59:03 -0700 Subject: [PATCH 2/4] Route Langfuse timestamps through OTel tracer The v4.7 Langfuse SDK exposes timestamp control only on its internal OTel tracer, not on the public start_observation() API that the adapter was calling. Two quirks the original Protocol widening got wrong, both surfaced by live-account verification: start_observation() rejects a start_time kwarg with TypeError. When start_time is supplied, the adapter now mirrors the SDK's own create_event precedent: open the OTel span directly via _otel_tracer.start_span(name=, start_time=int_ns) within a trace context and wrap the result in LangfuseGeneration. The existing no-start_time path still uses the public API. LangfuseSpan.end(end_time) is typed Optional[int] (nanoseconds), not datetime. The adapter now converts the Protocol's datetime surface to nanoseconds before forwarding. Without the conversion the OTel span_processor's formatter crashes with TypeError on end_time / 1e9 deep in the SDK. Strengthen the unit tests: spy on both _otel_tracer.start_span and start_observation so the back-dated path asserts the OTel route is taken and the public-API path asserts the OTel tracer is NOT touched. The previous monkeypatch test accepted **kwargs and would have passed even with the broken implementation. Widen the integration test's REST poll budget to 180s and use server-side name+type filters; add a diagnostic that lists observation names actually projected under the trace_id when the target Generation doesn't appear, so a future name-mismatch SDK change surfaces explicitly. --- .../observability/langfuse/adapter.py | 106 ++++++++++++++++-- .../integration/test_langfuse_sdk_adapter.py | 39 ++++--- .../test_observability_langfuse_adapter.py | 88 ++++++++++++--- 3 files changed, 190 insertions(+), 43 deletions(-) diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index 2c91e57..ed2964b 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -134,10 +134,14 @@ def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: # Apply any field updates first (so they're set BEFORE the # observation closes), then call end(). v4's end() takes only # an optional ``end_time``; field mutation happens via update(). + # The SDK's end_time is typed Optional[int] nanoseconds — + # convert from the Protocol's datetime surface before passing + # through. Without the conversion the OTel span_processor's + # formatter raises TypeError when it tries ``end_time / 1e9``. if fields: self.update(**fields) if end_time is not None: - self._obs.end(end_time=end_time) + self._obs.end(end_time=int(end_time.timestamp() * 1_000_000_000)) else: self._obs.end() @@ -362,19 +366,97 @@ def generation( usage_details["total"] = usage.total extra_kwargs["usage_details"] = usage_details if start_time is not None: - extra_kwargs["start_time"] = start_time - obs = self._start_observation( - as_type="generation", - trace_id=trace_id, - name=name, - metadata=metadata, - parent_observation_id=parent_observation_id, - level=level, - status_message=status_message, - **{k: v for k, v in extra_kwargs.items() if v is not None}, - ) + # v4's public ``start_observation`` does NOT accept a + # ``start_time`` kwarg — only the internal OTel tracer + # does. Mirror the SDK's own ``create_event`` precedent + # (langfuse/_client/client.py:1518-1551): open the OTel + # span directly via the private ``_otel_tracer`` with the + # back-dated timestamp, then wrap it in LangfuseGeneration. + # This is the only path to a back-dated Generation in + # v4.7; the live-account integration test catches a future + # SDK break. + obs = self._start_back_dated_generation( + trace_id=trace_id, + name=name, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + start_time=start_time, + **{k: v for k, v in extra_kwargs.items() if v is not None}, + ) + else: + obs = self._start_observation( + as_type="generation", + trace_id=trace_id, + name=name, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + **{k: v for k, v in extra_kwargs.items() if v is not None}, + ) return _SpanHandle(obs) + def _start_back_dated_generation( + self, + *, + trace_id: str, + name: str | None, + metadata: dict[str, Any] | None, + parent_observation_id: str | None, + level: ObservationLevel, + status_message: str | None, + start_time: datetime, + **extra: Any, + ) -> Any: + """Open a LangfuseGeneration at a back-dated timestamp by going + through the private OTel tracer rather than the public + ``start_observation`` API (which doesn't accept ``start_time`` + in v4.7). Mirrors the SDK's ``create_event`` precedent.""" + from langfuse._client.span import LangfuseGeneration + from opentelemetry import trace as otel_trace_api + + trace_entry = self._trace_info.get(trace_id) + trace_context: TraceContext = {"trace_id": _to_otel_trace_id(trace_id)} + if parent_observation_id is not None: + trace_context["parent_span_id"] = parent_observation_id + + # OTel's ``start_span(start_time=...)`` takes int nanoseconds + # since epoch. The SDK uses ``time_ns()`` for its instant + # events; for back-dating, convert from the supplied datetime. + start_time_ns = int(start_time.timestamp() * 1_000_000_000) + + remote_parent_span = self._client._create_remote_parent_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001 + trace_id=trace_context["trace_id"], + parent_span_id=trace_context.get("parent_span_id"), + ) + + with ExitStack() as stack: + if trace_entry is not None: + stack.enter_context( + propagate_attributes( + trace_name=trace_entry["name"], + metadata=_stringify_metadata(trace_entry["metadata"]), + ) + ) + stack.enter_context(otel_trace_api.use_span(remote_parent_span)) + otel_span = self._client._otel_tracer.start_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001 + name=name or "observation", + start_time=start_time_ns, + ) + generation_kwargs: dict[str, Any] = { + "otel_span": otel_span, + "langfuse_client": self._client, + "metadata": metadata, + } + if level != "DEFAULT": + generation_kwargs["level"] = level + if status_message is not None: + generation_kwargs["status_message"] = status_message + generation_kwargs.update(extra) + return LangfuseGeneration(**generation_kwargs) + def force_flush(self, timeout_ms: int = 30_000) -> bool: """Best-effort flush of the underlying Langfuse client. diff --git a/tests/integration/test_langfuse_sdk_adapter.py b/tests/integration/test_langfuse_sdk_adapter.py index f3e2a17..24d914a 100644 --- a/tests/integration/test_langfuse_sdk_adapter.py +++ b/tests/integration/test_langfuse_sdk_adapter.py @@ -185,30 +185,39 @@ async def test_sdk_adapter_generation_timestamps_round_trip_through_langfuse() - hex_id = invocation_id.replace("-", "") try: _poll_trace_with_retry(client, hex_id) - # Pull observations directly: the trace.observations list lags - # the headline trace fields in the REST projection, so query - # the observations endpoint with a retry budget similar to the - # trace poll. Filter by name rather than indexing the first - # entry so future synthetic observations (Langfuse adds - # ``openarmature.trace_io`` carriers for trace input/output - # under some flows) don't shadow the target Generation. + # Pull observations via REST. The observations-list endpoint + # lags the headline trace fields by an "indeterminate window" + # per Langfuse's own caveat (mirrored in the trace_io test + # above), so the retry budget here is wider (180s vs the + # trace_io test's 60s). Filter server-side by name + type so + # the response is small + scoped; track seen names across + # polls so a name-mismatch failure surfaces with diagnostics + # rather than a generic "not found". observation: Any = None last_exc: Exception | None = None - for _ in range(12): + seen_names: set[str] = set() + for _ in range(36): try: - response = client.api.observations.get_many(trace_id=hex_id) - observation = next( - (o for o in response.data if o.name == "openarmature.llm.complete"), - None, + response = client.api.observations.get_many( + trace_id=hex_id, + name="openarmature.llm.complete", + type="GENERATION", ) - if observation is not None: + if response.data: + observation = response.data[0] break + # Fall back to an unfiltered query so we know whether + # ANY observations have projected — distinguishes "REST + # lag" from "observation projected with unexpected name". + fallback = client.api.observations.get_many(trace_id=hex_id) + seen_names.update(o.name or "" for o in fallback.data) except Exception as exc: # noqa: BLE001 last_exc = exc time.sleep(5) assert observation is not None, ( - f"openarmature.llm.complete observation for trace {hex_id} did not appear via REST " - f"after retry budget; last error: {last_exc!r}" + f"openarmature.llm.complete Generation for trace {hex_id} did not appear via REST " + f"after 180s retry budget. Observations seen under trace (any name): {seen_names or 'none'}. " + f"Last error: {last_exc!r}" ) # The REST projection's start/end timestamps MUST match the diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index f4bfb89..6f5c452 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -161,17 +161,62 @@ def test_adapter_force_flush_delegates_to_client() -> None: assert wrapped.flush.call_count == 2 -def test_adapter_generation_forwards_start_time_to_sdk(monkeypatch: pytest.MonkeyPatch) -> None: - # The typed-event LLM path back-dates the Generation observation - # using ``start_time = end_time - timedelta(milliseconds= - # latency_ms)``. The adapter MUST flow ``start_time`` through to - # the v4 SDK's ``start_observation(start_time=...)`` kwarg. - # Without this passthrough, Langfuse would show the call-time - # timestamp instead of the back-dated one — the verification gap - # the InMemory client can't catch on its own. +def test_adapter_generation_routes_back_dated_calls_via_otel_tracer(monkeypatch: pytest.MonkeyPatch) -> None: + # v4.7's public ``Langfuse.start_observation`` does NOT accept + # ``start_time`` — only the internal ``_otel_tracer.start_span`` + # does. The adapter MUST route back-dated generation() calls via + # the OTel tracer path (mirroring the SDK's own ``create_event`` + # precedent). This test spies on BOTH paths: ``start_observation`` + # is patched to fail loudly if the back-dated path ever falls + # through to it (the prior monkeypatch test's gap), and the OTel + # tracer's ``start_span`` is spied to assert the back-dated + # nanosecond timestamp lands on the right surface. from datetime import UTC, datetime from unittest.mock import MagicMock + client = _dummy_client() + captured_otel_kwargs: dict[str, Any] = {} + + def _otel_spy(**kwargs: Any) -> MagicMock: + captured_otel_kwargs.update(kwargs) + # The SDK calls get_span_context(), set_attribute(), etc. on + # the returned span during LangfuseGeneration construction. + # Plain MagicMock auto-creates most attrs, but trace_id / + # span_id MUST be real ints because the SDK formats them as + # 32 / 16-char hex internally. + span = MagicMock() + span.get_span_context.return_value = MagicMock( + trace_id=int("a" * 32, 16), + span_id=int("b" * 16, 16), + ) + return span + + def _start_observation_should_not_be_called(**_kwargs: Any) -> None: + raise AssertionError( + "start_observation MUST NOT be called on the back-dated path; " + "v4 SDK rejects start_time= and the adapter should route via _otel_tracer" + ) + + monkeypatch.setattr(client._otel_tracer, "start_span", _otel_spy) # noqa: SLF001 + monkeypatch.setattr(client, "start_observation", _start_observation_should_not_be_called) + adapter = LangfuseSDKAdapter(client) + adapter.trace(id="trace-ts", name="t") + + start = datetime(2026, 6, 8, 12, 0, 0, tzinfo=UTC) + adapter.generation(trace_id="trace-ts", name="g", model="m", start_time=start) + + expected_ns = int(start.timestamp() * 1_000_000_000) + assert captured_otel_kwargs.get("start_time") == expected_ns + assert captured_otel_kwargs.get("name") == "g" + + +def test_adapter_generation_without_start_time_uses_public_api(monkeypatch: pytest.MonkeyPatch) -> None: + # Companion to the back-dated test: when ``start_time`` is NOT + # supplied, the adapter falls back to the v4 SDK's public + # ``start_observation`` API and does NOT touch the private OTel + # tracer. + from unittest.mock import MagicMock + client = _dummy_client() captured_kwargs: dict[str, Any] = {} @@ -179,20 +224,30 @@ def _spy(**kwargs: Any) -> MagicMock: captured_kwargs.update(kwargs) return MagicMock(id="obs-spy", end=MagicMock()) + def _otel_tracer_should_not_be_called(**_kwargs: Any) -> None: + raise AssertionError( + "_otel_tracer.start_span MUST NOT be called when start_time is None; " + "the public start_observation API should handle this path" + ) + monkeypatch.setattr(client, "start_observation", _spy) + monkeypatch.setattr(client._otel_tracer, "start_span", _otel_tracer_should_not_be_called) # noqa: SLF001 adapter = LangfuseSDKAdapter(client) adapter.trace(id="trace-ts", name="t") - start = datetime(2026, 6, 8, 12, 0, 0, tzinfo=UTC) - adapter.generation(trace_id="trace-ts", name="g", model="m", start_time=start) + adapter.generation(trace_id="trace-ts", name="g", model="m") - assert captured_kwargs.get("start_time") == start + assert captured_kwargs.get("name") == "g" + assert "start_time" not in captured_kwargs -def test_adapter_generation_handle_end_forwards_end_time_to_sdk() -> None: - # Companion to the start_time test: the handle's end() MUST flow - # ``end_time`` through to the underlying v4 obs's ``end(end_time - # =...)`` call so the Langfuse UI shows the back-dated duration. +def test_adapter_generation_handle_end_converts_end_time_to_nanoseconds() -> None: + # Companion to the start_time test: the handle's end() MUST + # convert the datetime to int nanoseconds before forwarding to + # the underlying v4 obs's end(). LangfuseSpan.end is typed + # ``Optional[int]`` (nanoseconds); passing a datetime through + # crashes the OTel span_processor's formatter with TypeError on + # ``end_time / 1e9`` deep in the SDK. from datetime import UTC, datetime from unittest.mock import MagicMock @@ -205,7 +260,8 @@ def test_adapter_generation_handle_end_forwards_end_time_to_sdk() -> None: end = datetime(2026, 6, 8, 12, 0, 1, tzinfo=UTC) handle.end(end_time=end) - sdk_obs.end.assert_called_once_with(end_time=end) + expected_ns = int(end.timestamp() * 1_000_000_000) + sdk_obs.end.assert_called_once_with(end_time=expected_ns) def test_adapter_generation_handle_end_omits_end_time_when_unspecified() -> None: From fd5f7d18c834242963bc672b1d79de54b33c8d8a Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 8 Jun 2026 19:01:29 -0700 Subject: [PATCH 3/4] Address PR 143 review Tighten three stale comments that still referred to a sentinel "pair" on the failure path. The provider now emits only a single sentinel completed event on failure (no started counterpart); the comments in langfuse/observer.py (dispatch site + handler docstring) and openai.py (failure-emission site) didn't catch up with the v0.13.0 emission change in the same PR. --- src/openarmature/llm/providers/openai.py | 12 ++++++------ .../observability/langfuse/observer.py | 15 +++++++++------ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 5ec57c7..9c2d310 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -461,12 +461,12 @@ async def complete( # Failure path: the sentinel NodeEvent carries the # error fields per llm-provider §7. LlmCompletionEvent # is success-only per proposal 0049 §3 alternative 3, - # so failures continue to surface through the sentinel - # pair until the spec extends the typed event with - # error semantics. Only ``completed`` is emitted on - # failure — no started counterpart, since both bundled - # observers' handlers ignore sentinel-started after the - # v0.13.0 migration. + # so failures continue to surface through a sentinel + # ``completed`` event until the spec extends the typed + # event with error semantics. Only ``completed`` fires + # — no started counterpart, since both bundled + # observers' handlers ignore sentinel-started after + # the v0.13.0 migration. dispatch( _make_llm_event( "completed", diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index f68f890..d239121 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -369,7 +369,8 @@ async def __call__( # Proposal 0049 typed LlmCompletionEvent (success path). Drives # the §5.5 Generation observation lifecycle for successful # provider calls. Failures don't emit this variant; they flow - # through the sentinel-pair error path below. + # through the sentinel error path below (a single sentinel + # ``completed`` event — no started counterpart in v0.13.0+). if isinstance(event, LlmCompletionEvent): if not self.disable_llm_spans: self._handle_typed_llm_completion(event) @@ -1325,11 +1326,13 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> # one shot at typed-event arrival, with start_time back-dated by # latency_ms so the observation's duration reflects the adapter- # boundary measurement rather than dispatcher queue delay. Failure - # path keeps the sentinel NodeEvent pair (LlmCompletionEvent is - # success-only per proposal 0049 §3 alternative 3). The provider - # also dropped sentinel-pair emission on the success path in this - # release, so on success the typed event is the only signal the - # Generation observation has to fire from. + # path keeps a single sentinel NodeEvent (``completed`` phase + # carrying error fields on its LlmEventPayload — LlmCompletionEvent + # is success-only per proposal 0049 §3 alternative 3). The provider + # dropped success-path sentinel emission entirely in this release, + # so on success the typed event is the only signal the Generation + # observation has to fire from; the failure path's sentinel + # ``started`` was also dropped, leaving only ``completed``. def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: """Open + close the Generation observation from the typed LlmCompletionEvent (success path).""" From 4df3c0a9ba597aa8e3593c037984fbc4961075f9 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 8 Jun 2026 19:11:43 -0700 Subject: [PATCH 4/4] Align CHANGELOG + integration docstring with SDK routing Both descriptions were written before live-account verification revealed that v4.7 Langfuse SDK's start_observation rejects start_time with TypeError. The CHANGELOG entry claimed the adapter forwards via start_observation(start_time=...); the integration test docstring said unit tests validate that surface. Rewrote both to describe the actual routing: the back-dated path bypasses start_observation and goes through the private _otel_tracer.start_ span, wrapping the OTel span in LangfuseGeneration directly. The guarded failure mode shifts accordingly: not "SDK silently drops start_time" but "future SDK renames _otel_tracer or moves LangfuseGeneration", breaking the private-API path silently. --- CHANGELOG.md | 2 +- .../integration/test_langfuse_sdk_adapter.py | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c0bb7f..d751cdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The - **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged. - **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path. -- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter forwards to the v4 Langfuse SDK's `start_observation(start_time=...)` and `obs.end(end_time=...)` (supported in 4.6+). The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. +- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. - **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. ### Added diff --git a/tests/integration/test_langfuse_sdk_adapter.py b/tests/integration/test_langfuse_sdk_adapter.py index 24d914a..972cda6 100644 --- a/tests/integration/test_langfuse_sdk_adapter.py +++ b/tests/integration/test_langfuse_sdk_adapter.py @@ -145,15 +145,20 @@ async def test_sdk_adapter_generation_timestamps_round_trip_through_langfuse() - """End-to-end verification that explicit start_time / end_time on the adapter's generation(...) / handle.end(...) calls actually land on the Langfuse-side observation. The unit tests cover the - SDK call-site (start_observation receives start_time= kwarg); this - test closes the loop by reading the projected timestamps back from - the REST API and asserting they reflect the back-dated values. + SDK call-site shape (the back-dated path bypasses the public + start_observation API and routes through the private + _otel_tracer.start_span instead, because v4.7's start_observation + rejects start_time with TypeError); this test closes the loop by + reading the projected timestamps back from the REST API and + asserting they reflect the back-dated values. Catches the failure mode the v0.13.0 Langfuse migration is - susceptible to: if the v4 SDK silently drops start_time as an - unrecognized kwarg on a particular version, the Langfuse UI shows - call-time timestamps instead of the back-dated latency_ms-based - ones, with no error to surface the misconfiguration. + susceptible to: if a future SDK release renames _otel_tracer, + moves LangfuseGeneration, or otherwise breaks the private-API + surface the adapter relies on, the back-dating routing fails + silently — the Langfuse UI shows call-time timestamps instead + of the back-dated latency_ms-based ones, with no error to + surface the misconfiguration. """ from datetime import UTC, datetime, timedelta