Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Changed

- **OTel observer drives the `openarmature.llm.complete` span lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the span in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so the span duration reflects the actual adapter-boundary measurement rather than dispatcher queue delay. Failure-path spans continue to fire from the sentinel `NodeEvent` pair (the typed event is success-only per the proposal). The §5.5 attribute set is unchanged. Dual-emit window: the provider still emits both the sentinel pair AND the typed event during v0.13.0; the sentinel pair drops in v0.15.0.
- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged.
- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path.
- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions.
- **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.<key>` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips.

### Added
Expand Down
66 changes: 17 additions & 49 deletions src/openarmature/llm/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,21 +441,6 @@ async def complete(
serialized_messages = _serialize_messages_for_payload(messages)
request_params = _request_params_from_config(config)
request_extras = _request_extras_from_config(config)
if dispatch is not None:
dispatch(
_make_llm_event(
"started",
call_id=call_id,
model=self.model,
genai_system=self._genai_system,
input_messages=serialized_messages,
request_params=request_params,
request_extras=request_extras,
active_prompt=active_prompt,
active_prompt_group=active_prompt_group,
)
)

# Wall-clock latency measured at the adapter boundary per
# proposal 0049's LlmCompletionEvent.latency_ms contract. The
# boundary spans from "just before _do_complete is called" to
Expand All @@ -473,13 +458,15 @@ async def complete(
response = await self._do_complete(body, schema_dict, schema_class)
except Exception as exc:
if dispatch is not None:
# Failure path: only the sentinel NodeEvent pair fires.
# Per proposal 0049 §3 (alternative 3): LlmCompletionEvent
# is completion-only; failures flow through the
# llm-provider §7 exception path. The error continues
# to surface through the existing observer chain via
# the sentinel NodeEvent's error_type / error_category
# fields on LlmEventPayload.
# Failure path: the sentinel NodeEvent carries the
# error fields per llm-provider §7. LlmCompletionEvent
# is success-only per proposal 0049 §3 alternative 3,
# so failures continue to surface through a sentinel
# ``completed`` event until the spec extends the typed
# event with error semantics. Only ``completed`` fires
# — no started counterpart, since both bundled
# observers' handlers ignore sentinel-started after
# the v0.13.0 migration.
dispatch(
_make_llm_event(
"completed",
Expand All @@ -498,33 +485,14 @@ async def complete(
latency_ms = (time.perf_counter() - adapter_start) * 1000.0

if dispatch is not None:
# Sentinel NodeEvent pair stays during the dual-emit window
# per proposal 0049 §5.5.7 SHOULD-emit-both transition. The
# window stays open through v0.13.0 with the sentinel
# emission removed in v0.15.0 (CHANGELOG callout pinned to
# the v0.13.0 release notes).
dispatch(
_make_llm_event(
"completed",
call_id=call_id,
model=self.model,
genai_system=self._genai_system,
finish_reason=response.finish_reason,
usage=response.usage,
input_messages=serialized_messages,
output_content=response.message.content or None,
request_params=request_params,
request_extras=request_extras,
response_id=response.response_id,
response_model=response.response_model,
active_prompt=active_prompt,
active_prompt_group=active_prompt_group,
)
)
# The new typed LlmCompletionEvent — observers filtering via
# isinstance(event, LlmCompletionEvent) receive this; legacy
# observers filtering on the sentinel namespace see the
# NodeEvent pair above. Failure path doesn't reach here.
# Success path: emit only the typed LlmCompletionEvent.
# The sentinel NodeEvent pair previously emitted on success
# for compatibility with pre-typed-event observers was
# dropped in v0.13.0; bundled observers (OTel + Langfuse)
# consume the typed event directly, and external custom
# observers should migrate to type discrimination via
# ``isinstance(event, LlmCompletionEvent)`` if they need
# LLM call notifications.
dispatch(
self._build_llm_completion_event(
response,
Expand Down
113 changes: 101 additions & 12 deletions src/openarmature/observability/langfuse/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import json
from contextlib import ExitStack
from datetime import datetime
from typing import TYPE_CHECKING, Any, cast

from .client import LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, ObservationLevel
Expand Down Expand Up @@ -129,13 +130,20 @@ def update(self, **fields: Any) -> None:
kwargs[key] = value
self._obs.update(**kwargs)

def end(self, **fields: Any) -> None:
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
# Apply any field updates first (so they're set BEFORE the
# observation closes), then call end(). v4's end() takes only
# an optional ``end_time``; field mutation happens via update().
# The SDK's end_time is typed Optional[int] nanoseconds —
# convert from the Protocol's datetime surface before passing
# through. Without the conversion the OTel span_processor's
# formatter raises TypeError when it tries ``end_time / 1e9``.
if fields:
self.update(**fields)
self._obs.end()
if end_time is not None:
self._obs.end(end_time=int(end_time.timestamp() * 1_000_000_000))
else:
self._obs.end()


class LangfuseSDKAdapter:
Expand Down Expand Up @@ -337,6 +345,7 @@ def generation(
output: Any = None,
usage: LangfuseUsage | None = None,
prompt: Any = None,
start_time: datetime | None = None,
) -> LangfuseGenerationHandle:
extra_kwargs: dict[str, Any] = {
"model": model,
Expand All @@ -356,18 +365,98 @@ def generation(
if usage.total is not None:
usage_details["total"] = usage.total
extra_kwargs["usage_details"] = usage_details
obs = self._start_observation(
as_type="generation",
trace_id=trace_id,
name=name,
metadata=metadata,
parent_observation_id=parent_observation_id,
level=level,
status_message=status_message,
**{k: v for k, v in extra_kwargs.items() if v is not None},
)
if start_time is not None:
# v4's public ``start_observation`` does NOT accept a
# ``start_time`` kwarg — only the internal OTel tracer
# does. Mirror the SDK's own ``create_event`` precedent
# (langfuse/_client/client.py:1518-1551): open the OTel
# span directly via the private ``_otel_tracer`` with the
# back-dated timestamp, then wrap it in LangfuseGeneration.
# This is the only path to a back-dated Generation in
# v4.7; the live-account integration test catches a future
# SDK break.
obs = self._start_back_dated_generation(
trace_id=trace_id,
name=name,
metadata=metadata,
parent_observation_id=parent_observation_id,
level=level,
status_message=status_message,
start_time=start_time,
**{k: v for k, v in extra_kwargs.items() if v is not None},
)
else:
obs = self._start_observation(
as_type="generation",
trace_id=trace_id,
name=name,
metadata=metadata,
parent_observation_id=parent_observation_id,
level=level,
status_message=status_message,
**{k: v for k, v in extra_kwargs.items() if v is not None},
)
return _SpanHandle(obs)

def _start_back_dated_generation(
self,
*,
trace_id: str,
name: str | None,
metadata: dict[str, Any] | None,
parent_observation_id: str | None,
level: ObservationLevel,
status_message: str | None,
start_time: datetime,
**extra: Any,
) -> Any:
"""Open a LangfuseGeneration at a back-dated timestamp by going
through the private OTel tracer rather than the public
``start_observation`` API (which doesn't accept ``start_time``
in v4.7). Mirrors the SDK's ``create_event`` precedent."""
from langfuse._client.span import LangfuseGeneration
from opentelemetry import trace as otel_trace_api

trace_entry = self._trace_info.get(trace_id)
trace_context: TraceContext = {"trace_id": _to_otel_trace_id(trace_id)}
if parent_observation_id is not None:
trace_context["parent_span_id"] = parent_observation_id

# OTel's ``start_span(start_time=...)`` takes int nanoseconds
# since epoch. The SDK uses ``time_ns()`` for its instant
# events; for back-dating, convert from the supplied datetime.
start_time_ns = int(start_time.timestamp() * 1_000_000_000)

remote_parent_span = self._client._create_remote_parent_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001
trace_id=trace_context["trace_id"],
parent_span_id=trace_context.get("parent_span_id"),
)

with ExitStack() as stack:
if trace_entry is not None:
stack.enter_context(
propagate_attributes(
trace_name=trace_entry["name"],
metadata=_stringify_metadata(trace_entry["metadata"]),
)
)
stack.enter_context(otel_trace_api.use_span(remote_parent_span))
otel_span = self._client._otel_tracer.start_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001
name=name or "observation",
start_time=start_time_ns,
)
generation_kwargs: dict[str, Any] = {
"otel_span": otel_span,
"langfuse_client": self._client,
"metadata": metadata,
}
if level != "DEFAULT":
generation_kwargs["level"] = level
if status_message is not None:
generation_kwargs["status_message"] = status_message
generation_kwargs.update(extra)
return LangfuseGeneration(**generation_kwargs)

def force_flush(self, timeout_ms: int = 30_000) -> bool:
"""Best-effort flush of the underlying Langfuse client.

Expand Down
23 changes: 19 additions & 4 deletions src/openarmature/observability/langfuse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Literal, Protocol, runtime_checkable

ObservationType = Literal["span", "generation", "event"]
Expand Down Expand Up @@ -65,6 +66,13 @@ class LangfuseObservation:
level: ObservationLevel = "DEFAULT"
status_message: str | None = None
ended: bool = False
# Optional caller-supplied timestamps. Populated when a typed-event
# consumer (e.g., the LangfuseObserver on the typed LLM completion
# path) back-dates the observation using a wall-clock measurement
# rather than letting the SDK record open/close moments. ``None``
# means "use the SDK's default"; both fields are independent.
start_time: datetime | None = None
end_time: datetime | None = None

# Generation-specific (None / empty on Span and Event observations)
model: str | None = None
Expand Down Expand Up @@ -125,7 +133,7 @@ def id(self) -> str: ...

def update(self, **fields: Any) -> None: ...

def end(self, **fields: Any) -> None: ...
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ...
Comment thread
chris-colinsky marked this conversation as resolved.


@runtime_checkable
Expand All @@ -138,7 +146,7 @@ def id(self) -> str: ...

def update(self, **fields: Any) -> None: ...

def end(self, **fields: Any) -> None: ...
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ...
Comment thread
chris-colinsky marked this conversation as resolved.


@runtime_checkable
Expand Down Expand Up @@ -224,6 +232,7 @@ def generation(
output: Any = None,
usage: LangfuseUsage | None = None,
prompt: Any = None,
start_time: datetime | None = None,
) -> LangfuseGenerationHandle: ...

def force_flush(self, timeout_ms: int = 30_000) -> bool:
Expand Down Expand Up @@ -268,7 +277,9 @@ def id(self) -> str:
def update(self, **fields: Any) -> None:
_apply_fields(self.observation, fields)

def end(self, **fields: Any) -> None:
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
if end_time is not None:
self.observation.end_time = end_time
_apply_fields(self.observation, fields)
self.observation.ended = True

Expand All @@ -286,7 +297,9 @@ def id(self) -> str:
def update(self, **fields: Any) -> None:
_apply_fields(self.observation, fields)

def end(self, **fields: Any) -> None:
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
if end_time is not None:
self.observation.end_time = end_time
_apply_fields(self.observation, fields)
self.observation.ended = True

Expand Down Expand Up @@ -428,6 +441,7 @@ def generation(
output: Any = None,
usage: LangfuseUsage | None = None,
prompt: Any = None,
start_time: datetime | None = None,
) -> LangfuseGenerationHandle:
trace = self._get_trace(trace_id)
observation = LangfuseObservation(
Expand All @@ -444,6 +458,7 @@ def generation(
output=output,
usage=usage,
prompt_entity_link=prompt,
start_time=start_time,
)
trace.observations.append(observation)
return _InMemoryGenerationHandle(observation=observation)
Expand Down
Loading