Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

## [Unreleased]

### Added

- **`FailureIsolationMiddleware`** (proposal 0050, pipeline-utilities §6.3). A third bundled middleware primitive alongside `RetryMiddleware` and `TimingMiddleware`. It catches exceptions escaping the wrapped node's inner chain and returns a configured degraded partial update, so a non-critical node can fail without aborting the whole invocation. Configuration: `degraded_update` (a static mapping or a `state -> partial_update` callable, resolved at catch time), `event_name` (required, no default, since a generic name makes downstream telemetry strictly worse), an optional `predicate` (`Exception -> bool`; only matching exceptions are caught, others propagate), and an optional async `on_caught` hook. It catches `Exception`; `BaseException` (cancellation) propagates, matching `RetryMiddleware`. On a catch it dispatches a new framework-emitted `FailureIsolatedEvent` (a distinct observer-event variant carrying `event_name`, the wrapped node's lineage identity, `pre_state` / `post_state`, and a `CaughtException` record of category plus message) onto the observer delivery queue; the bundled OTel and Langfuse observers render it as a marker span / observation. Compose it OUTER of `RetryMiddleware` for the "retry transients, degrade gracefully on exhaustion" pattern. Additive: existing pipelines see no behavior change, and the spec pin is unchanged (0050 is already within the v0.53.0 pin).

## [0.13.0] — 2026-06-09

LLM provider hardening release. The pinned spec advances from v0.46.0 to v0.53.0, absorbing four implemented proposals. Proposal 0049 introduces the first spec-normatively-typed observer event variant, `LlmCompletionEvent`, dispatched on every successful LLM provider call; proposal 0058 adds the failure-side counterpart, `LlmFailedEvent`; proposal 0057 extends the completion variant with eight request-side fields. The bundled `OpenAIProvider` retires its sentinel-namespace `NodeEvent` emission for LLM calls entirely, and the OTel and Langfuse observers now drive their LLM span / Generation from the typed events with back-dated timestamps so durations reflect the adapter boundary. Proposal 0047 closes implicit prefix-cache wire-byte stability: `Response.usage` gains cache-stat fields, the OTel observer emits `openarmature.llm.cache_read` attributes, and the OpenAI Chat Completions request body is byte-stable across equivalent inputs regardless of dict insertion order. Custom observers that filtered LLM calls by sentinel namespace MUST migrate to `isinstance` discrimination; `LLM_NAMESPACE` and `LlmEventPayload` remain as a documented compatibility surface.
Expand Down
90 changes: 90 additions & 0 deletions docs/concepts/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,96 @@ Two implementation details worth knowing:
globally patching `time.monotonic` (which would also distort
asyncio's scheduling).

## Built-in: FailureIsolationMiddleware

```python
from openarmature.graph import FailureIsolationMiddleware

builder.add_node(
"extract_segments",
extract_fn,
middleware=[
FailureIsolationMiddleware(
degraded_update={"segments": []},
event_name="segment_extraction_degraded",
),
],
)
```

`FailureIsolationMiddleware` catches an exception escaping the wrapped
chain and returns a degraded partial update instead of letting it abort
the invocation. Reach for it when a node is not load-bearing enough to
kill the whole run: a failed enrichment step degrades to an empty list,
the graph continues, and the failure is still visible in your traces.
It is the named, observable form of the "catch and recover" pattern
from [Error semantics](#error-semantics) above.

Configuration:

- **`degraded_update`** (required) is the partial update returned on a
caught exception. It may be a static mapping, or a callable
`state -> partial_update` when the fallback shape depends on the input
state. The callable is resolved once, at catch time.
- **`event_name`** (required, no default) is a stable identifier for
this catch site. It rides on the emitted event (below) and any
downstream logging. There is no default on purpose: a generic name
like `"failure_isolated"` collapses every degraded path into one
indistinguishable bucket in a dashboard, so the name is forced at the
construction site, where the context to name it well is available.
- **`predicate`** is an optional `Exception -> bool`. When supplied,
only exceptions where it returns true are caught; everything else
propagates. The default catches every `Exception`.
- **`on_caught`** is an optional async hook `Exception -> None`, fired
when the middleware catches. Use it to pump the caught exception to
caller-specific telemetry beyond the framework event. It fires inline
before the degraded update returns, and an exception it raises is
isolated (logged, not propagated) so a buggy hook cannot defeat the
recovery.

Like `RetryMiddleware`, it catches `Exception` only; `BaseException`
(cancellation, keyboard interrupt) propagates so aborts still work.

### The failure-isolated event

On a catch, the middleware dispatches a `FailureIsolatedEvent` onto the
observer stream. It is a distinct event variant, not a node event: it
carries the `event_name`, the wrapped node's lineage identity, the input
and degraded states, and a `CaughtException` record holding the
exception's `category` (when it has one) and message. Observers narrow
on it with `isinstance(event, FailureIsolatedEvent)`. The bundled OTel
and Langfuse observers render it as a marker span / observation so the
catch shows up alongside the node's own span. The default emission path
is the observer stream only, with no logging-library dependency;
`on_caught` is the escape hatch for anything else.

### Composing with RetryMiddleware

The two compose into the canonical "retry transients, then give up
gracefully" pattern. The order is load-bearing: failure isolation is the
**outer** layer, retry is **inner**.

```python
builder.add_node(
"summarize",
summarize_fn,
middleware=[
FailureIsolationMiddleware(
degraded_update={"summary": ""},
event_name="summary_degraded",
),
RetryMiddleware(max_attempts=3),
],
)
```

Retry sits closest to the node, so it sees raw transient failures first
and retries them. Only what escapes retry (an exhausted budget, or a
non-transient exception retry's classifier declines) reaches the outer
failure isolation, which degrades. Reverse the order and the inner
isolation would swallow transients before retry ever saw them, defeating
the retry entirely.

## Related

- [Parallel branches](parallel-branches.md): per-branch middleware
Expand Down
8 changes: 8 additions & 0 deletions src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
UnreachableNode,
)
from .events import (
CaughtException,
FailureIsolatedEvent,
InvocationCompletedEvent,
InvocationStartedEvent,
LlmCompletionEvent,
Expand All @@ -45,6 +47,8 @@
)
from .fan_out import FanOutConfig, FanOutNode
from .middleware import (
DegradedUpdate,
FailureIsolationMiddleware,
Middleware,
NextCall,
RetryMiddleware,
Expand All @@ -64,15 +68,19 @@

__all__ = [
"END",
"CaughtException",
"CompileError",
"CompiledGraph",
"ConditionalEdge",
"ConflictingReducers",
"DanglingEdge",
"DegradedUpdate",
"DrainSummary",
"EdgeException",
"EndSentinel",
"ExplicitMapping",
"FailureIsolatedEvent",
"FailureIsolationMiddleware",
"FanOutConfig",
"FanOutCountModeAmbiguous",
"FanOutEmpty",
Expand Down
62 changes: 62 additions & 0 deletions src/openarmature/graph/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,69 @@ class LlmFailedEvent:
caller_invocation_metadata: Mapping[str, AttributeValue] | None = None


@dataclass(frozen=True)
class CaughtException:
"""Structured record of an exception caught by
``FailureIsolationMiddleware``.

- ``category``: the exception's failure category when it carries
one (e.g. an llm-provider error's ``category`` attribute), else
``None`` for a bare exception that carries no category.
- ``message``: the human-readable exception message (``str(exc)``);
the empty string when the exception carried no message.
"""

category: str | None
message: str


# Spec: realizes pipeline-utilities §6.3 failure-isolation middleware
# (proposal 0050). Emitted by FailureIsolationMiddleware when it
# catches an exception escaping the inner chain and substitutes a
# degraded partial update. A distinct framework-emitted event kind
# (NOT a NodeEvent — does not reuse node_name / namespace / error),
# mirroring the proposal 0040 MetadataAugmentationEvent mechanism:
# enqueued on the engine's serial observer-delivery queue via
# ``current_dispatch()`` and NOT subject to the observer ``phases``
# filter (matches MetadataAugmentationEvent / InvocationStartedEvent /
# InvocationCompletedEvent / LlmCompletionEvent / LlmFailedEvent
# treatment).
@dataclass(frozen=True)
class FailureIsolatedEvent:
"""A failure-isolation event delivered to observers.

Reports that ``FailureIsolationMiddleware`` caught an exception at
a node and substituted a degraded partial update for the node's
output. Observer code filters by type discrimination
(``isinstance(event, FailureIsolatedEvent)``).

Field set:

- ``event_name``: the caller-supplied identifier for this catch
site, from the middleware's configuration.
- ``namespace`` / ``attempt_index`` / ``fan_out_index`` /
``branch_name``: the wrapped node's lineage identity, surfaced
for correlation with the node's other events.
- ``pre_state``: the state the wrapped node received.
- ``post_state``: the degraded partial update the middleware
returned in place of the node's output.
- ``caught_exception``: a :class:`CaughtException` record of the
caught exception (category + message).
"""

event_name: str
namespace: tuple[str, ...]
attempt_index: int
fan_out_index: int | None
branch_name: str | None
pre_state: Any
post_state: Mapping[str, Any]
caught_exception: CaughtException


__all__ = [
"CaughtException",
"FailureIsolatedEvent",
"FanOutEventConfig",
"InvocationCompletedEvent",
"InvocationStartedEvent",
Expand Down
3 changes: 3 additions & 0 deletions src/openarmature/graph/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

from ._core import ChainCall, Middleware, NextCall, compose_chain
from .failure_isolation import DegradedUpdate, FailureIsolationMiddleware
from .retry import (
TRANSIENT_CATEGORIES,
BackoffStrategy,
Expand All @@ -34,6 +35,8 @@
"BackoffStrategy",
"ChainCall",
"Classifier",
"DegradedUpdate",
"FailureIsolationMiddleware",
"Middleware",
"NextCall",
"OnCompleteCallback",
Expand Down
Loading