diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c621fb..05af67e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ## [Unreleased] +### Added + +- **`FailureIsolationMiddleware`** (proposal 0050, pipeline-utilities §6.3). A third bundled middleware primitive alongside `RetryMiddleware` and `TimingMiddleware`. It catches exceptions escaping the wrapped node's inner chain and returns a configured degraded partial update, so a non-critical node can fail without aborting the whole invocation. Configuration: `degraded_update` (a static mapping or a `state -> partial_update` callable, resolved at catch time), `event_name` (required, no default, since a generic name makes downstream telemetry strictly worse), an optional `predicate` (`Exception -> bool`; only matching exceptions are caught, others propagate), and an optional async `on_caught` hook. It catches `Exception`; `BaseException` (cancellation) propagates, matching `RetryMiddleware`. On a catch it dispatches a new framework-emitted `FailureIsolatedEvent` (a distinct observer-event variant carrying `event_name`, the wrapped node's lineage identity, `pre_state` / `post_state`, and a `CaughtException` record of category plus message) onto the observer delivery queue; the bundled OTel and Langfuse observers render it as a marker span / observation. Compose it OUTER of `RetryMiddleware` for the "retry transients, degrade gracefully on exhaustion" pattern. Additive: existing pipelines see no behavior change, and the spec pin is unchanged (0050 is already within the v0.53.0 pin). + ## [0.13.0] — 2026-06-09 LLM provider hardening release. The pinned spec advances from v0.46.0 to v0.53.0, absorbing four implemented proposals. Proposal 0049 introduces the first spec-normatively-typed observer event variant, `LlmCompletionEvent`, dispatched on every successful LLM provider call; proposal 0058 adds the failure-side counterpart, `LlmFailedEvent`; proposal 0057 extends the completion variant with eight request-side fields. The bundled `OpenAIProvider` retires its sentinel-namespace `NodeEvent` emission for LLM calls entirely, and the OTel and Langfuse observers now drive their LLM span / Generation from the typed events with back-dated timestamps so durations reflect the adapter boundary. Proposal 0047 closes implicit prefix-cache wire-byte stability: `Response.usage` gains cache-stat fields, the OTel observer emits `openarmature.llm.cache_read` attributes, and the OpenAI Chat Completions request body is byte-stable across equivalent inputs regardless of dict insertion order. Custom observers that filtered LLM calls by sentinel namespace MUST migrate to `isinstance` discrimination; `LLM_NAMESPACE` and `LlmEventPayload` remain as a documented compatibility surface. diff --git a/docs/concepts/middleware.md b/docs/concepts/middleware.md index e650c59..1d15c4c 100644 --- a/docs/concepts/middleware.md +++ b/docs/concepts/middleware.md @@ -199,6 +199,96 @@ Two implementation details worth knowing: globally patching `time.monotonic` (which would also distort asyncio's scheduling). +## Built-in: FailureIsolationMiddleware + +```python +from openarmature.graph import FailureIsolationMiddleware + +builder.add_node( + "extract_segments", + extract_fn, + middleware=[ + FailureIsolationMiddleware( + degraded_update={"segments": []}, + event_name="segment_extraction_degraded", + ), + ], +) +``` + +`FailureIsolationMiddleware` catches an exception escaping the wrapped +chain and returns a degraded partial update instead of letting it abort +the invocation. Reach for it when a node is not load-bearing enough to +kill the whole run: a failed enrichment step degrades to an empty list, +the graph continues, and the failure is still visible in your traces. +It is the named, observable form of the "catch and recover" pattern +from [Error semantics](#error-semantics) above. + +Configuration: + +- **`degraded_update`** (required) is the partial update returned on a + caught exception. It may be a static mapping, or a callable + `state -> partial_update` when the fallback shape depends on the input + state. The callable is resolved once, at catch time. +- **`event_name`** (required, no default) is a stable identifier for + this catch site. It rides on the emitted event (below) and any + downstream logging. There is no default on purpose: a generic name + like `"failure_isolated"` collapses every degraded path into one + indistinguishable bucket in a dashboard, so the name is forced at the + construction site, where the context to name it well is available. +- **`predicate`** is an optional `Exception -> bool`. When supplied, + only exceptions where it returns true are caught; everything else + propagates. The default catches every `Exception`. +- **`on_caught`** is an optional async hook `Exception -> None`, fired + when the middleware catches. Use it to pump the caught exception to + caller-specific telemetry beyond the framework event. It fires inline + before the degraded update returns, and an exception it raises is + isolated (logged, not propagated) so a buggy hook cannot defeat the + recovery. + +Like `RetryMiddleware`, it catches `Exception` only; `BaseException` +(cancellation, keyboard interrupt) propagates so aborts still work. + +### The failure-isolated event + +On a catch, the middleware dispatches a `FailureIsolatedEvent` onto the +observer stream. It is a distinct event variant, not a node event: it +carries the `event_name`, the wrapped node's lineage identity, the input +and degraded states, and a `CaughtException` record holding the +exception's `category` (when it has one) and message. Observers narrow +on it with `isinstance(event, FailureIsolatedEvent)`. The bundled OTel +and Langfuse observers render it as a marker span / observation so the +catch shows up alongside the node's own span. The default emission path +is the observer stream only, with no logging-library dependency; +`on_caught` is the escape hatch for anything else. + +### Composing with RetryMiddleware + +The two compose into the canonical "retry transients, then give up +gracefully" pattern. The order is load-bearing: failure isolation is the +**outer** layer, retry is **inner**. + +```python +builder.add_node( + "summarize", + summarize_fn, + middleware=[ + FailureIsolationMiddleware( + degraded_update={"summary": ""}, + event_name="summary_degraded", + ), + RetryMiddleware(max_attempts=3), + ], +) +``` + +Retry sits closest to the node, so it sees raw transient failures first +and retries them. Only what escapes retry (an exhausted budget, or a +non-transient exception retry's classifier declines) reaches the outer +failure isolation, which degrades. Reverse the order and the inner +isolation would swallow transients before retry ever saw them, defeating +the retry entirely. + ## Related - [Parallel branches](parallel-branches.md): per-branch middleware diff --git a/src/openarmature/graph/__init__.py b/src/openarmature/graph/__init__.py index 60a10ff..21d3984 100644 --- a/src/openarmature/graph/__init__.py +++ b/src/openarmature/graph/__init__.py @@ -36,6 +36,8 @@ UnreachableNode, ) from .events import ( + CaughtException, + FailureIsolatedEvent, InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, @@ -45,6 +47,8 @@ ) from .fan_out import FanOutConfig, FanOutNode from .middleware import ( + DegradedUpdate, + FailureIsolationMiddleware, Middleware, NextCall, RetryMiddleware, @@ -64,15 +68,19 @@ __all__ = [ "END", + "CaughtException", "CompileError", "CompiledGraph", "ConditionalEdge", "ConflictingReducers", "DanglingEdge", + "DegradedUpdate", "DrainSummary", "EdgeException", "EndSentinel", "ExplicitMapping", + "FailureIsolatedEvent", + "FailureIsolationMiddleware", "FanOutConfig", "FanOutCountModeAmbiguous", "FanOutEmpty", diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 6e602ec..807c2ac 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -659,7 +659,69 @@ class LlmFailedEvent: caller_invocation_metadata: Mapping[str, AttributeValue] | None = None +@dataclass(frozen=True) +class CaughtException: + """Structured record of an exception caught by + ``FailureIsolationMiddleware``. + + - ``category``: the exception's failure category when it carries + one (e.g. an llm-provider error's ``category`` attribute), else + ``None`` for a bare exception that carries no category. + - ``message``: the human-readable exception message (``str(exc)``); + the empty string when the exception carried no message. + """ + + category: str | None + message: str + + +# Spec: realizes pipeline-utilities §6.3 failure-isolation middleware +# (proposal 0050). Emitted by FailureIsolationMiddleware when it +# catches an exception escaping the inner chain and substitutes a +# degraded partial update. A distinct framework-emitted event kind +# (NOT a NodeEvent — does not reuse node_name / namespace / error), +# mirroring the proposal 0040 MetadataAugmentationEvent mechanism: +# enqueued on the engine's serial observer-delivery queue via +# ``current_dispatch()`` and NOT subject to the observer ``phases`` +# filter (matches MetadataAugmentationEvent / InvocationStartedEvent / +# InvocationCompletedEvent / LlmCompletionEvent / LlmFailedEvent +# treatment). +@dataclass(frozen=True) +class FailureIsolatedEvent: + """A failure-isolation event delivered to observers. + + Reports that ``FailureIsolationMiddleware`` caught an exception at + a node and substituted a degraded partial update for the node's + output. Observer code filters by type discrimination + (``isinstance(event, FailureIsolatedEvent)``). + + Field set: + + - ``event_name``: the caller-supplied identifier for this catch + site, from the middleware's configuration. + - ``namespace`` / ``attempt_index`` / ``fan_out_index`` / + ``branch_name``: the wrapped node's lineage identity, surfaced + for correlation with the node's other events. + - ``pre_state``: the state the wrapped node received. + - ``post_state``: the degraded partial update the middleware + returned in place of the node's output. + - ``caught_exception``: a :class:`CaughtException` record of the + caught exception (category + message). + """ + + event_name: str + namespace: tuple[str, ...] + attempt_index: int + fan_out_index: int | None + branch_name: str | None + pre_state: Any + post_state: Mapping[str, Any] + caught_exception: CaughtException + + __all__ = [ + "CaughtException", + "FailureIsolatedEvent", "FanOutEventConfig", "InvocationCompletedEvent", "InvocationStartedEvent", diff --git a/src/openarmature/graph/middleware/__init__.py b/src/openarmature/graph/middleware/__init__.py index e416477..80f2e99 100644 --- a/src/openarmature/graph/middleware/__init__.py +++ b/src/openarmature/graph/middleware/__init__.py @@ -18,6 +18,7 @@ """ from ._core import ChainCall, Middleware, NextCall, compose_chain +from .failure_isolation import DegradedUpdate, FailureIsolationMiddleware from .retry import ( TRANSIENT_CATEGORIES, BackoffStrategy, @@ -34,6 +35,8 @@ "BackoffStrategy", "ChainCall", "Classifier", + "DegradedUpdate", + "FailureIsolationMiddleware", "Middleware", "NextCall", "OnCompleteCallback", diff --git a/src/openarmature/graph/middleware/failure_isolation.py b/src/openarmature/graph/middleware/failure_isolation.py new file mode 100644 index 0000000..c501410 --- /dev/null +++ b/src/openarmature/graph/middleware/failure_isolation.py @@ -0,0 +1,187 @@ +# Spec: canonical failure-isolation middleware per pipeline-utilities +# §6.3 (proposal 0050). Packages the §2 third-MAY-bullet +# catch-and-recover pattern as a named primitive alongside §6.1 retry +# and §6.2 timing. + +"""Failure-isolation middleware (canonical). + +Wraps a node's chain so an exception escaping the inner chain becomes a +configured degraded partial update instead of propagating. The +companion to ``RetryMiddleware`` for the "retry transients, give up +gracefully on exhaustion" pattern. + +Catches ``Exception`` by default; ``BaseException`` +(``asyncio.CancelledError``, ``KeyboardInterrupt``) propagates so +cancellation works as expected — the same rule as ``RetryMiddleware``. + +On a caught exception the middleware first resolves ``degraded_update`` +(a static mapping, or a callable taking the pre-call state; invoked +once, at catch time, which is also what populates the dispatched +event's ``post_state``), then in order: + +1. Dispatches a ``FailureIsolatedEvent`` onto the engine's serial + observer-delivery queue (a framework-emitted event; the bundled + OTel and Langfuse observers render the catch). The default emission + path is the observer event, with no logging-library dependency. +2. Awaits the optional ``on_caught`` hook. +3. Returns the resolved degraded update as the node's partial update. + +Composition with ``RetryMiddleware``: failure isolation MUST be the +OUTER middleware (it only sees what escapes retry); retry MUST be INNER +(it sees raw transients first and retries them). Reversing the order +lets the inner isolation swallow transients before retry can see them, +defeating retry entirely. +""" + +from __future__ import annotations + +import warnings +from collections.abc import Awaitable, Callable, Mapping +from typing import Any + +from openarmature.observability.correlation import ( + current_attempt_index, + current_branch_name, + current_dispatch, + current_fan_out_index, + current_namespace_prefix, +) + +from ._core import NextCall + +# A degraded update is either a static partial-update mapping or a +# callable resolving one from the pre-call state. Resolved at catch +# time; the callable form covers input-state-dependent degraded shapes. +DegradedUpdate = Mapping[str, Any] | Callable[[Any], Mapping[str, Any]] + + +class FailureIsolationMiddleware: + """Catch exceptions escaping the inner chain; return a degraded + partial update. + + Configuration: + + - ``degraded_update`` (required): the partial update returned on a + caught exception, OR a callable ``state -> partial_update`` for + input-state-dependent degraded shapes. + - ``event_name`` (required): a stable identifier for this catch + site; surfaces on the ``FailureIsolatedEvent``. No default — + useful values are node-specific, and a generic default would make + downstream telemetry strictly worse. + - ``predicate`` (optional): ``Exception -> bool``. When supplied, + only exceptions where ``predicate(exc)`` is true are caught; + others propagate. Defaults to catching every ``Exception``. + - ``on_caught`` (optional): an async ``Exception -> Awaitable[None]`` + hook fired on a caught exception, for caller-specific telemetry + beyond the framework event. It runs inline before the degraded + update is returned, so a slow hook delays the node's return; an + exception raised by the hook is isolated (logged via + ``warnings.warn``, not propagated) so a telemetry bug cannot turn + a recovered node back into a failure. + """ + + def __init__( + self, + *, + degraded_update: DegradedUpdate, + event_name: str, + predicate: Callable[[Exception], bool] | None = None, + on_caught: Callable[[Exception], Awaitable[None]] | None = None, + ) -> None: + self.degraded_update = degraded_update + self.event_name = event_name + self.predicate = predicate + self.on_caught = on_caught + + async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]: + try: + return await next_(state) + except Exception as exc: + # BaseException (cancellation) never enters here — it + # extends BaseException, not Exception. Same rule as + # RetryMiddleware: cancellation MUST propagate. + if self.predicate is not None and not self.predicate(exc): + raise + # Resolve the degraded update once, at catch time, and reuse + # it for the event's post_state and the node return so a + # callable degraded_update is invoked exactly once. The + # observable order the spec prescribes — emit the event, then + # on_caught, then return the update — is preserved below; + # resolving here first only populates post_state. + degraded = self._resolve_degraded(state) + self._emit_event(state, exc, degraded) + if self.on_caught is not None: + try: + await self.on_caught(exc) + except Exception as hook_error: # noqa: BLE001 + # on_caught is caller telemetry; a bug in it MUST NOT + # turn a recovered node back into a crash. Isolate it + # the way the observer-delivery contract isolates + # observer exceptions (warn, don't propagate). + # BaseException (cancellation) still propagates by not + # being caught here. + warnings.warn( + f"FailureIsolationMiddleware on_caught raised " + f"{type(hook_error).__name__}: {hook_error}", + stacklevel=2, + ) + return degraded + + def _resolve_degraded(self, state: Any) -> Mapping[str, Any]: + if callable(self.degraded_update): + return self.degraded_update(state) + return self.degraded_update + + def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) -> None: + dispatch = current_dispatch() + # current_dispatch() is None outside an invocation (no observers + # in scope, e.g. unit-testing the middleware directly) — the + # degraded return still happens; there is just no delivery queue + # to enqueue onto. + if dispatch is None: + return + # Local import mirrors set_invocation_metadata's 0040 emit: it + # keeps the event-type import off the middleware module-load + # path and defers it until the first catch. + from openarmature.graph.events import CaughtException, FailureIsolatedEvent + + # A categorized exception (e.g. an llm-provider error) carries a + # string ``category``. When the engine has wrapped the original + # in a graph-engine error before it reached the middleware, the + # category rides on ``__cause__`` — walk it the same way the + # default retry classifier does so the caught failure's category + # survives the wrapping. A bare exception yields ``None``. + category = getattr(exc, "category", None) + if not isinstance(category, str): + cause = getattr(exc, "__cause__", None) + cause_category = getattr(cause, "category", None) if cause is not None else None + category = cause_category if isinstance(cause_category, str) else None + # ``attempt_index`` here is deliberately the NODE-level baseline, + # not a per-attempt wire index: failure isolation is a node-level + # concern ("the node, across its retries, was isolated"). When + # this middleware is OUTER of RetryMiddleware, retry has already + # reset the attempt ContextVar to that baseline (0) in its + # ``finally`` by the time the terminal exception reaches this + # catch, which is the frame we want (spec-confirmed). Parenting is + # unaffected: the node's attempt spans are already closed by + # delivery time (their completed event precedes this one on the + # serial queue), so observers parent the marker under the + # invocation span and correlate by ``namespace`` + node name. + dispatch( + FailureIsolatedEvent( + event_name=self.event_name, + namespace=current_namespace_prefix(), + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + pre_state=state, + post_state=degraded, + caught_exception=CaughtException(category=category, message=str(exc)), + ) + ) + + +__all__ = [ + "DegradedUpdate", + "FailureIsolationMiddleware", +] diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index 75ece1c..283c76f 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -35,6 +35,7 @@ from typing import Any, Literal, Protocol from .events import ( + FailureIsolatedEvent, InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, @@ -52,9 +53,11 @@ # InvocationStartedEvent / InvocationCompletedEvent (proposal 0043 # trace.input/output sourcing), LlmCompletionEvent (proposal 0049 # typed LLM provider call event, dispatched on every successful LLM -# completion), and LlmFailedEvent (proposal 0058 typed LLM failure -# event, dispatched alongside the §7 exception when provider.complete -# raises). +# completion), LlmFailedEvent (proposal 0058 typed LLM failure event, +# dispatched alongside the §7 exception when provider.complete raises), +# and FailureIsolatedEvent (proposal 0050 §6.3 framework-emitted event, +# dispatched by FailureIsolationMiddleware when it catches an exception +# escaping the inner chain and substitutes a degraded partial update). ObserverEvent = ( NodeEvent | MetadataAugmentationEvent @@ -62,6 +65,7 @@ | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ) @@ -91,9 +95,9 @@ async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None: conformance doesn't pin you to that name; any of `event`, `_event`, `e`, etc. matches. - Four event variants reach observers (graph-engine §6 + proposals - 0040, 0043). The signature is the union; observers ``isinstance``- - narrow on the first line and choose which variants they handle. + Seven event variants reach observers. The signature is the union; + observers ``isinstance``-narrow on the first line and choose which + variants they handle. - :class:`NodeEvent` — the started/completed/checkpoint phase events. Subject to the ``phases`` filter on @@ -124,6 +128,17 @@ async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None: backends can populate ``trace.output``. NOT subject to the ``phases`` filter; OTel-only observers ignore it via the isinstance gate. + - :class:`LlmCompletionEvent` — dispatched on every successful LLM + provider call. Carries the typed identity / request / response + field set for LLM-aware backends. NOT subject to the ``phases`` + filter; non-LLM observers ignore it via the isinstance gate. + - :class:`LlmFailedEvent` — the failure-side counterpart, + dispatched alongside the provider exception when an LLM call + raises. NOT subject to the ``phases`` filter. + - :class:`FailureIsolatedEvent` — dispatched by + ``FailureIsolationMiddleware`` when it catches an exception and + substitutes a degraded partial update. NOT subject to the + ``phases`` filter. Optional ``prepare_sync`` extension ----------------------------------- diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index 65394f1..a1d4b50 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -37,6 +37,7 @@ if TYPE_CHECKING: from openarmature.graph.events import ( + FailureIsolatedEvent, InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, @@ -224,6 +225,7 @@ def _reset_active_observers(token: Token[tuple[SubscribedObserver, ...]]) -> Non | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ], None, ] @@ -240,6 +242,7 @@ def current_dispatch() -> ( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ], None, ] @@ -267,6 +270,7 @@ def _set_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ], None, ], @@ -279,6 +283,7 @@ def _set_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ], None, ] @@ -299,6 +304,7 @@ def _reset_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ], None, ] diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 4525a41..445a6c7 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -30,6 +30,7 @@ from typing import Any, cast from openarmature.graph.events import ( + FailureIsolatedEvent, InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, @@ -359,6 +360,7 @@ async def __call__( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ), ) -> None: if isinstance(event, InvocationStartedEvent): @@ -381,6 +383,10 @@ async def __call__( if not self.disable_llm_spans: self._handle_typed_llm_failed(event) return + # Proposal 0050 §6.3 framework-emitted failure-isolation event. + if isinstance(event, FailureIsolatedEvent): + self._handle_failure_isolated(event) + return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return @@ -615,6 +621,51 @@ def _handle_metadata_augmentation(self, event: MetadataAugmentationEvent) -> Non if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain): observation.handle.update(metadata=metadata_delta) + # ------------------------------------------------------------------ + # Failure-isolation event (proposal 0050 §6.3) + # ------------------------------------------------------------------ + + def _handle_failure_isolated(self, event: FailureIsolatedEvent) -> None: + # Render the FailureIsolationMiddleware catch as a marker + # observation. Parented under the wrapped node's observation when + # it is still open; otherwise trace-level (the node observation + # is typically already closed-with-error by delivery time, since + # the node-body raise fires the node's completed event before the + # middleware recovers). The wrapped node's name rides on + # ``metadata.failure_isolation_node`` for correlation regardless. + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + invocation_id = current_invocation_id() + if invocation_id is None: + return + inv_state = self._inv_states.get(invocation_id) + if inv_state is None: + return + key: _StackKey = (event.namespace, event.attempt_index, event.fan_out_index, event.branch_name) + parent = inv_state.open_observations.get(key) + parent_observation_id = parent.handle.id if parent is not None else None + metadata: dict[str, Any] = { + "failure_isolation_event_name": event.event_name, + "error_message": event.caught_exception.message, + } + if event.namespace: + metadata["failure_isolation_node"] = event.namespace[-1] + if event.caught_exception.category is not None: + metadata["error_category"] = event.caught_exception.category + correlation_id = current_correlation_id() + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + handle = self.client.span( + trace_id=inv_state.trace_id, + name="openarmature.failure_isolated", + metadata=metadata, + parent_observation_id=parent_observation_id, + ) + handle.end() + # ------------------------------------------------------------------ # Invocation-boundary events (proposal 0043 §8.4.1 sourcing) # ------------------------------------------------------------------ diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 6f8f146..eb66372 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -98,6 +98,7 @@ from opentelemetry.trace.propagation import set_span_in_context from openarmature.graph.events import ( + FailureIsolatedEvent, InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, @@ -572,6 +573,7 @@ async def __call__( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | FailureIsolatedEvent ), ) -> None: # Proposal 0043 invocation-boundary events: OTel has no @@ -596,6 +598,10 @@ async def __call__( if not self.disable_llm_spans: self._handle_typed_llm_failed(event) return + # Proposal 0050 §6.3 framework-emitted failure-isolation event. + if isinstance(event, FailureIsolatedEvent): + self._handle_failure_isolated(event) + return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return @@ -1325,6 +1331,61 @@ def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: self._run_enrichers(span, event) span.end(end_time=end_time_ns) + def _handle_failure_isolated(self, event: FailureIsolatedEvent) -> None: + """Emit a zero-duration ``openarmature.failure_isolated`` span for + a FailureIsolationMiddleware catch. + + Parented under the calling node when its span is still open; + ``_resolve_llm_parent`` falls back to the invocation span + otherwise. The wrapped node's span is typically already + closed-with-error by the time this event is delivered — the + node-body raise dispatches the node's completed event before the + middleware recovers — so the marker most often parents directly + under the invocation span. The wrapped node's name rides on the + ``openarmature.failure_isolation.node`` attribute for + correlation regardless of parenting.""" + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + invocation_id = current_invocation_id() + if invocation_id is None: + return + inv_state = self._inv_state_for(invocation_id) + parent_ctx = self._resolve_llm_parent( + inv_state, + invocation_id, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, + ) + attrs: dict[str, Any] = { + "openarmature.failure_isolation.event_name": event.event_name, + "openarmature.failure_isolation.message": event.caught_exception.message, + } + if event.namespace: + attrs["openarmature.failure_isolation.node"] = event.namespace[-1] + if event.caught_exception.category is not None: + attrs["openarmature.error.category"] = event.caught_exception.category + cid = current_correlation_id() + if cid is not None: + attrs["openarmature.correlation_id"] = cid + span = self._tracer.start_span( + name="openarmature.failure_isolated", + context=cast("Any", parent_ctx), + kind=SpanKind.INTERNAL, + attributes=attrs, + ) + # The failure was caught and the node degraded gracefully, so the + # marker span itself is OK; the caught failure surfaces via the + # attributes (event_name / category / message), queryable without + # painting the span red. + span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, None) + span.end() + def _resolve_llm_parent( self, inv_state: _InvState, diff --git a/tests/unit/test_failure_isolation_middleware.py b/tests/unit/test_failure_isolation_middleware.py new file mode 100644 index 0000000..b29a303 --- /dev/null +++ b/tests/unit/test_failure_isolation_middleware.py @@ -0,0 +1,360 @@ +"""Unit + integration tests for FailureIsolationMiddleware (proposal 0050 §6.3). + +Covers the middleware's catch / degrade / predicate / on_caught +contract, the framework-emitted ``FailureIsolatedEvent`` and its field +population, the three-piece composition with ``RetryMiddleware``, and +rendering by the bundled OTel + Langfuse observers. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Mapping +from typing import Annotated, Any + +import pytest +from pydantic import Field + +from openarmature.graph import ( + END, + CaughtException, + FailureIsolatedEvent, + FailureIsolationMiddleware, + GraphBuilder, + ObserverEvent, + RetryMiddleware, + State, + append, + deterministic_backoff, +) +from openarmature.graph.middleware import NextCall +from openarmature.observability.correlation import ( + _reset_active_dispatch, + _reset_namespace_prefix, + _set_active_dispatch, + _set_namespace_prefix, +) + + +class _TransientError(Exception): + category = "provider_rate_limit" + + +def _raises(exc: BaseException) -> NextCall: + """A ``next`` callable that raises ``exc`` when invoked.""" + + async def _next(_state: Any) -> Mapping[str, Any]: + raise exc + + return _next + + +# --------------------------------------------------------------------------- +# Catch + degrade +# --------------------------------------------------------------------------- + + +async def test_static_degraded_update_returned_on_catch() -> None: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="x_failed") + out = await mw("state-in", _raises(ValueError("boom"))) + assert out == {"result": []} + + +async def test_callable_degraded_update_receives_pre_state() -> None: + seen: list[Any] = [] + + def degrade(state: Any) -> Mapping[str, Any]: + seen.append(state) + return {"result": ["fallback"]} + + mw = FailureIsolationMiddleware(degraded_update=degrade, event_name="x_failed") + out = await mw("state-in", _raises(ValueError("boom"))) + assert out == {"result": ["fallback"]} + assert seen == ["state-in"] + + +async def test_success_passes_through_untouched() -> None: + async def _ok(_state: Any) -> Mapping[str, Any]: + return {"result": ["real"]} + + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="x_failed") + out = await mw("state-in", _ok) + assert out == {"result": ["real"]} + + +# --------------------------------------------------------------------------- +# Predicate filtering +# --------------------------------------------------------------------------- + + +async def test_predicate_true_catches() -> None: + mw = FailureIsolationMiddleware( + degraded_update={"result": []}, + event_name="x_failed", + predicate=lambda exc: isinstance(exc, ValueError), + ) + out = await mw("s", _raises(ValueError("boom"))) + assert out == {"result": []} + + +async def test_predicate_false_propagates() -> None: + mw = FailureIsolationMiddleware( + degraded_update={"result": []}, + event_name="x_failed", + predicate=lambda exc: isinstance(exc, ValueError), + ) + with pytest.raises(KeyError): + await mw("s", _raises(KeyError("nope"))) + + +# --------------------------------------------------------------------------- +# on_caught hook +# --------------------------------------------------------------------------- + + +async def test_on_caught_fires_once_and_degrade_still_returned() -> None: + caught: list[BaseException] = [] + + async def on_caught(exc: Exception) -> None: + caught.append(exc) + + exc = ValueError("boom") + mw = FailureIsolationMiddleware( + degraded_update={"result": []}, + event_name="x_failed", + on_caught=on_caught, + ) + out = await mw("s", _raises(exc)) + assert out == {"result": []} + assert caught == [exc] + + +async def test_on_caught_raise_is_isolated_event_emitted_degrade_returned() -> None: + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + + async def bad_hook(_exc: Exception) -> None: + raise RuntimeError("hook boom") + + mw = FailureIsolationMiddleware( + degraded_update={"result": []}, + event_name="x_failed", + on_caught=bad_hook, + ) + try: + with pytest.warns(UserWarning, match="on_caught raised RuntimeError"): + out = await mw("s", _raises(ValueError("boom"))) + finally: + _reset_active_dispatch(disp_token) + + # A buggy on_caught is isolated: the degrade still happens, and the + # event was already dispatched (emit precedes on_caught). + assert out == {"result": []} + assert len(events) == 1 + assert isinstance(events[0], FailureIsolatedEvent) + + +# --------------------------------------------------------------------------- +# Cancellation propagates +# --------------------------------------------------------------------------- + + +async def test_base_exception_propagates() -> None: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="x_failed") + with pytest.raises(asyncio.CancelledError): + await mw("s", _raises(asyncio.CancelledError())) + + +# --------------------------------------------------------------------------- +# Framework event emission (via the dispatch ContextVar, no engine) +# --------------------------------------------------------------------------- + + +async def test_emits_failure_isolated_event_with_fields() -> None: + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + ns_token = _set_namespace_prefix(("segment",)) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="segment_failed") + out = await mw("state-in", _raises(_TransientError("rate limited"))) + finally: + _reset_namespace_prefix(ns_token) + _reset_active_dispatch(disp_token) + + assert out == {"result": []} + assert len(events) == 1 + ev = events[0] + assert isinstance(ev, FailureIsolatedEvent) + assert ev.event_name == "segment_failed" + assert ev.namespace == ("segment",) + assert ev.attempt_index == 0 + assert ev.pre_state == "state-in" + assert ev.post_state == {"result": []} + assert isinstance(ev.caught_exception, CaughtException) + assert ev.caught_exception.category == "provider_rate_limit" + assert ev.caught_exception.message == "rate limited" + + +async def test_bare_exception_has_null_category() -> None: + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="x_failed") + await mw("s", _raises(ValueError("plain"))) + finally: + _reset_active_dispatch(disp_token) + + assert len(events) == 1 + assert events[0].caught_exception.category is None + assert events[0].caught_exception.message == "plain" + + +async def test_no_event_outside_invocation() -> None: + # current_dispatch() is None outside an invocation; the degrade still + # happens, no event is emitted, and nothing raises. + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="x_failed") + out = await mw("s", _raises(ValueError("boom"))) + assert out == {"result": []} + + +# --------------------------------------------------------------------------- +# Engine integration +# --------------------------------------------------------------------------- + + +class _DocState(State): + points: Annotated[list[str], append] = Field(default_factory=list) + note: str = "" + + +async def _extract_raises(_s: _DocState) -> Mapping[str, Any]: + raise _TransientError("provider down") + + +def _isolated_extract_graph() -> Any: + return ( + GraphBuilder(_DocState) + .add_node( + "extract", + _extract_raises, + middleware=[ + FailureIsolationMiddleware( + degraded_update={"note": "degraded"}, + event_name="extract_failed", + ) + ], + ) + .add_edge("extract", END) + .set_entry("extract") + .compile() + ) + + +async def test_degrade_dont_crash_via_invoke() -> None: + graph = _isolated_extract_graph() + events: list[ObserverEvent] = [] + + async def rec(event: ObserverEvent) -> None: + events.append(event) + + final = await graph.invoke(_DocState(), observers=[rec]) + await graph.drain() + + # The node degraded gracefully; the invocation succeeded with the + # configured fallback applied rather than raising. + assert final.note == "degraded" + + isolated = [e for e in events if isinstance(e, FailureIsolatedEvent)] + assert len(isolated) == 1 + assert isolated[0].event_name == "extract_failed" + assert isolated[0].namespace == ("extract",) + assert isolated[0].caught_exception.category == "provider_rate_limit" + assert isolated[0].post_state == {"note": "degraded"} + + +async def test_three_piece_composition_with_retry() -> None: + attempts = {"n": 0} + + async def _flaky(_s: _DocState) -> Mapping[str, Any]: + attempts["n"] += 1 + raise _TransientError("still down") + + graph = ( + GraphBuilder(_DocState) + .add_node( + "flaky", + _flaky, + # Outer-to-inner: failure isolation OUTER, retry INNER. + middleware=[ + FailureIsolationMiddleware( + degraded_update={"note": "gave_up"}, + event_name="flaky_failed", + ), + RetryMiddleware(max_attempts=3, backoff=deterministic_backoff(0.0)), + ], + ) + .add_edge("flaky", END) + .set_entry("flaky") + .compile() + ) + + final = await graph.invoke(_DocState()) + await graph.drain() + + # Retry exhausted its 3 attempts; failure isolation then caught the + # propagated exhaustion exception and substituted the degraded value. + assert attempts["n"] == 3 + assert final.note == "gave_up" + + +# --------------------------------------------------------------------------- +# Bundled-observer rendering +# --------------------------------------------------------------------------- + + +async def test_otel_renders_failure_isolated_span() -> None: + pytest.importorskip("opentelemetry.sdk.trace") + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + from openarmature.observability.otel.observer import OTelObserver + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + graph = _isolated_extract_graph() + graph.attach_observer(observer) + + await graph.invoke(_DocState()) + await graph.drain() + observer.shutdown() + + spans = exporter.get_finished_spans() + marker = next((s for s in spans if s.name == "openarmature.failure_isolated"), None) + assert marker is not None, f"no failure_isolated span; got {[s.name for s in spans]}" + attrs = dict(marker.attributes or {}) + assert attrs.get("openarmature.failure_isolation.event_name") == "extract_failed" + assert attrs.get("openarmature.failure_isolation.node") == "extract" + assert attrs.get("openarmature.error.category") == "provider_rate_limit" + + +async def test_langfuse_renders_failure_isolated_observation() -> None: + from openarmature.observability.langfuse.client import InMemoryLangfuseClient + from openarmature.observability.langfuse.observer import LangfuseObserver + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + graph = _isolated_extract_graph() + graph.attach_observer(observer) + + await graph.invoke(_DocState()) + await graph.drain() + + trace = next(iter(client.traces.values())) + marker = next( + (o for o in trace.observations if o.name == "openarmature.failure_isolated"), + None, + ) + assert marker is not None, f"no failure_isolated observation; got {[o.name for o in trace.observations]}" + assert marker.metadata.get("failure_isolation_event_name") == "extract_failed" + assert marker.metadata.get("failure_isolation_node") == "extract" + assert marker.metadata.get("error_category") == "provider_rate_limit"