Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

- **`FailureIsolationMiddleware`** (proposal 0050, pipeline-utilities §6.3). A third bundled middleware primitive alongside `RetryMiddleware` and `TimingMiddleware`. It catches exceptions escaping the wrapped node's inner chain and returns a configured degraded partial update, so a non-critical node can fail without aborting the whole invocation. Configuration: `degraded_update` (a static mapping or a `state -> partial_update` callable, resolved at catch time), `event_name` (required, no default, since a generic name makes downstream telemetry strictly worse), an optional `predicate` (`Exception -> bool`; only matching exceptions are caught, others propagate), and an optional async `on_caught` hook. It catches `Exception`; `BaseException` (cancellation) propagates, matching `RetryMiddleware`. On a catch it dispatches a new framework-emitted `FailureIsolatedEvent` (a distinct observer-event variant carrying `event_name`, the wrapped node's lineage identity, `pre_state` / `post_state`, and a `CaughtException` record of category plus message) onto the observer delivery queue; the bundled OTel and Langfuse observers render it as a marker span / observation. Compose it OUTER of `RetryMiddleware` for the "retry transients, degrade gracefully on exhaustion" pattern. Additive: existing pipelines see no behavior change, and the spec pin is unchanged (0050 is already within the v0.53.0 pin).

### Changed

- **`RetryMiddleware` now takes a `RetryConfig` record** instead of individual constructor kwargs (proposal 0050 prep). The four retry settings (`max_attempts` / `classifier` / `backoff` / `on_retry`, each optional) move onto a frozen `RetryConfig`; construct as `RetryMiddleware(RetryConfig(max_attempts=...))`, while bare `RetryMiddleware()` still applies the defaults. This is a breaking change to the `RetryMiddleware` constructor. The record is the same shape the upcoming call-level `complete(retry=...)` parameter will accept, so one retry config serves both the per-node and per-call layers. `None` fields resolve to the canonical defaults (`default_classifier` / `exponential_jitter_backoff`) at use, preserving the prior behavior.

## [0.13.0] — 2026-06-09

LLM provider hardening release. The pinned spec advances from v0.46.0 to v0.53.0, absorbing four implemented proposals. Proposal 0049 introduces the first spec-normatively-typed observer event variant, `LlmCompletionEvent`, dispatched on every successful LLM provider call; proposal 0058 adds the failure-side counterpart, `LlmFailedEvent`; proposal 0057 extends the completion variant with eight request-side fields. The bundled `OpenAIProvider` retires its sentinel-namespace `NodeEvent` emission for LLM calls entirely, and the OTel and Langfuse observers now drive their LLM span / Generation from the typed events with back-dated timestamps so durations reflect the adapter boundary. Proposal 0047 closes implicit prefix-cache wire-byte stability: `Response.usage` gains cache-stat fields, the OTel observer emits `openarmature.llm.cache_read` attributes, and the OpenAI Chat Completions request body is byte-stable across equivalent inputs regardless of dict insertion order. Custom observers that filtered LLM calls by sentinel namespace MUST migrate to `isinstance` discrimination; `LLM_NAMESPACE` and `LlmEventPayload` remain as a documented compatibility surface.
Expand Down
14 changes: 8 additions & 6 deletions docs/concepts/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,23 @@ hand a transformed state down the chain, pass a new state instance to
## Built-in: RetryMiddleware

```python
from openarmature.graph import RetryMiddleware, exponential_jitter_backoff
from openarmature.graph import RetryConfig, RetryMiddleware, exponential_jitter_backoff


async def on_retry(exc: Exception, attempt: int) -> None:
log.warning("retrying after %r (attempt %d)", exc, attempt)


retry = RetryMiddleware(
max_attempts=3,
backoff=exponential_jitter_backoff,
on_retry=on_retry,
RetryConfig(
max_attempts=3,
backoff=exponential_jitter_backoff,
on_retry=on_retry,
)
)
```

Four plug points, all optional:
Configured with a `RetryConfig`; four fields, all optional:

- **`max_attempts`** is the total attempt count including the first
call. `1` disables retry. Default `3`.
Expand Down Expand Up @@ -277,7 +279,7 @@ builder.add_node(
degraded_update={"summary": ""},
event_name="summary_degraded",
),
RetryMiddleware(max_attempts=3),
RetryMiddleware(RetryConfig(max_attempts=3)),
],
)
```
Expand Down
11 changes: 7 additions & 4 deletions examples/fan-out-with-retry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
append,
)
from openarmature.graph.middleware import (
RetryConfig,
RetryMiddleware,
TimingMiddleware,
TimingRecord,
Expand Down Expand Up @@ -261,10 +262,12 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
headline_subgraph = build_headline_subgraph()

retry = RetryMiddleware(
max_attempts=3,
# Short fixed delay so the demo isn't slow. A production app would
# use exponential_jitter_backoff (the default).
backoff=deterministic_backoff(0.2),
RetryConfig(
max_attempts=3,
# Short fixed delay so the demo isn't slow. A production app would
# use exponential_jitter_backoff (the default).
backoff=deterministic_backoff(0.2),
)
)
timing = TimingMiddleware(
node_name="headline_run",
Expand Down
7 changes: 5 additions & 2 deletions examples/parallel-branches/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
append,
)
from openarmature.graph.middleware import (
RetryConfig,
RetryMiddleware,
deterministic_backoff,
)
Expand Down Expand Up @@ -268,8 +269,10 @@ def build_graph() -> CompiledGraph[ArticleState]:
# the same policy on a longer summarize call (where a retry doubles
# cost) or on a topic-extract that has different transient profile.
sentiment_retry = RetryMiddleware(
max_attempts=3,
backoff=deterministic_backoff(0.2),
RetryConfig(
max_attempts=3,
backoff=deterministic_backoff(0.2),
)
)

return (
Expand Down
2 changes: 2 additions & 0 deletions src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
FailureIsolationMiddleware,
Middleware,
NextCall,
RetryConfig,
RetryMiddleware,
TimingMiddleware,
TimingRecord,
Expand Down Expand Up @@ -115,6 +116,7 @@
"Reducer",
"ReducerError",
"RemoveHandle",
"RetryConfig",
"RetryMiddleware",
"RoutingError",
"RuntimeGraphError",
Expand Down
2 changes: 2 additions & 0 deletions src/openarmature/graph/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
BackoffStrategy,
Classifier,
OnRetryCallback,
RetryConfig,
RetryMiddleware,
default_classifier,
deterministic_backoff,
Expand All @@ -41,6 +42,7 @@
"NextCall",
"OnCompleteCallback",
"OnRetryCallback",
"RetryConfig",
"RetryMiddleware",
"TRANSIENT_CATEGORIES",
"TimingMiddleware",
Expand Down
78 changes: 52 additions & 26 deletions src/openarmature/graph/middleware/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import asyncio
import random
from collections.abc import Awaitable, Callable, Mapping
from dataclasses import dataclass
from typing import Any

from openarmature.llm.errors import TRANSIENT_CATEGORIES
Expand Down Expand Up @@ -100,39 +101,63 @@ def fn(_attempt: int) -> float:
OnRetryCallback = Callable[[Exception, int], Awaitable[None]]


class RetryMiddleware:
"""Canonical retry middleware.

Configuration:
@dataclass(frozen=True)
class RetryConfig:
"""Canonical retry configuration record consumed by
:class:`RetryMiddleware`.

- ``max_attempts``: total attempts including the first call. ``1``
disables retry. Default ``3``.
- ``classifier``: predicate ``(exception, state) -> bool``. Default
:func:`default_classifier` (matches ``category`` against
- ``classifier``: predicate ``(exception, state) -> bool`` deciding
whether a failure is retry-eligible. ``None`` (the default)
selects :func:`default_classifier` (matches ``category`` against
``TRANSIENT_CATEGORIES``).
- ``backoff``: callable ``(attempt_index) -> seconds``. Default
:func:`exponential_jitter_backoff` (base 1s, cap 30s, full jitter).
- ``backoff``: callable ``(attempt_index) -> seconds``. ``None``
(the default) selects :func:`exponential_jitter_backoff` (base
1s, cap 30s, full jitter).
- ``on_retry``: optional async callback ``(exception, attempt_index)
-> None``. Fires before each sleep.
-> None`` fired before each backoff sleep.
"""

def __init__(
self,
*,
max_attempts: int = 3,
classifier: Classifier | None = None,
backoff: BackoffStrategy | None = None,
on_retry: OnRetryCallback | None = None,
) -> None:
if max_attempts < 1:
max_attempts: int = 3
classifier: Classifier | None = None
backoff: BackoffStrategy | None = None
on_retry: OnRetryCallback | None = None

def __post_init__(self) -> None:
if self.max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
self.max_attempts = max_attempts
self.classifier: Classifier = classifier or default_classifier
self.backoff: BackoffStrategy = backoff or exponential_jitter_backoff
self.on_retry: OnRetryCallback | None = on_retry


class RetryMiddleware:
"""Canonical retry middleware.

Configured with a :class:`RetryConfig` (or the default
``RetryConfig()`` when omitted). Construct as
``RetryMiddleware(RetryConfig(max_attempts=...))``.
"""

def __init__(self, config: RetryConfig | None = None) -> None:
if config is None:
config = RetryConfig()
# Defensive guard for untyped callers: the static type already
# rules a non-RetryConfig out (pyright flags this as redundant),
# but an eager TypeError beats a cryptic AttributeError when a
# mistyped value (e.g. ``RetryMiddleware(3)``) reaches ``.config``.
if not isinstance(config, RetryConfig): # pyright: ignore[reportUnnecessaryIsInstance]
raise TypeError(
f"RetryMiddleware expects a RetryConfig (or None); got "
f"{type(config).__name__}. Construct as "
f"RetryMiddleware(RetryConfig(max_attempts=...))."
)
self.config = config

async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]:
attempt = 0
# ``None`` config fields select the canonical defaults; resolve
# once here so the loop works against concrete callables.
classifier = self.config.classifier or default_classifier
backoff = self.config.backoff or exponential_jitter_backoff
# Spec observability §3.4 per-attempt scoping: each retry
# attempt sees only the metadata in scope at retry-loop entry
# ("pre-attempt baseline") plus that attempt's own writes;
Expand Down Expand Up @@ -176,11 +201,11 @@ async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]:
# metadata for the error span) sees the baseline,
# not the failed attempt's transient state.
_reset_invocation_metadata(metadata_token)
if attempt + 1 >= self.max_attempts or not self.classifier(exc, state):
if attempt + 1 >= self.config.max_attempts or not classifier(exc, state):
raise
if self.on_retry is not None:
await self.on_retry(exc, attempt)
await asyncio.sleep(self.backoff(attempt))
if self.config.on_retry is not None:
await self.config.on_retry(exc, attempt)
await asyncio.sleep(backoff(attempt))
attempt += 1
except BaseException:
# Cancellation path. `CancelledError` (or other
Expand All @@ -202,6 +227,7 @@ async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]:
"BackoffStrategy",
"Classifier",
"OnRetryCallback",
"RetryConfig",
"RetryMiddleware",
"TRANSIENT_CATEGORIES",
"default_classifier",
Expand Down
10 changes: 6 additions & 4 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ async def _run_fixture_007_case(case: Mapping[str, Any]) -> None:
from opentelemetry.trace import StatusCode

from openarmature.graph import RuntimeGraphError
from openarmature.graph.middleware import RetryMiddleware
from openarmature.graph.middleware import RetryConfig, RetryMiddleware
from openarmature.graph.middleware.retry import deterministic_backoff

observer, exporter = _build_observer()
Expand Down Expand Up @@ -725,9 +725,11 @@ def _classifier(exc: Exception, _state: Any, _transient: frozenset[str] = transi
classifier_fn = None
node_middleware.setdefault(flaky_node_name, []).append(
RetryMiddleware(
max_attempts=int(mw_spec.get("max_attempts", 3)),
backoff=backoff,
classifier=classifier_fn,
RetryConfig(
max_attempts=int(mw_spec.get("max_attempts", 3)),
backoff=backoff,
classifier=classifier_fn,
)
)
)

Expand Down
9 changes: 6 additions & 3 deletions tests/conformance/test_pipeline_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from openarmature.graph.middleware import (
Middleware,
OnCompleteCallback,
RetryConfig,
RetryMiddleware,
TimingMiddleware,
TimingRecord,
Expand Down Expand Up @@ -234,9 +235,11 @@ def _build_middleware(
classifier_cfg = config.get("classifier")
classifier = _build_classifier(classifier_cfg) if classifier_cfg is not None else None
return RetryMiddleware(
max_attempts=int(config.get("max_attempts", 3)),
backoff=backoff,
classifier=classifier,
RetryConfig(
max_attempts=int(config.get("max_attempts", 3)),
backoff=backoff,
classifier=classifier,
)
)
if mw_type == "timing":
on_complete_cfg = cast("dict[str, Any]", config.get("on_complete") or {})
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_failure_isolation_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
FailureIsolationMiddleware,
GraphBuilder,
ObserverEvent,
RetryConfig,
RetryMiddleware,
State,
append,
Expand Down Expand Up @@ -290,7 +291,7 @@ async def _flaky(_s: _DocState) -> Mapping[str, Any]:
degraded_update={"note": "gave_up"},
event_name="flaky_failed",
),
RetryMiddleware(max_attempts=3, backoff=deterministic_backoff(0.0)),
RetryMiddleware(RetryConfig(max_attempts=3, backoff=deterministic_backoff(0.0))),
],
)
.add_edge("flaky", END)
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_fan_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
FanOutFieldNotList,
GraphBuilder,
NodeException,
RetryConfig,
RetryMiddleware,
State,
append,
Expand Down Expand Up @@ -578,7 +579,7 @@ async def maybe_fail(state: WorkerState) -> Mapping[str, Any]:
inner_builder.add_edge("compute", END)
inner = inner_builder.compile()

retry = RetryMiddleware(max_attempts=3, backoff=deterministic_backoff(0))
retry = RetryMiddleware(RetryConfig(max_attempts=3, backoff=deterministic_backoff(0)))

builder: GraphBuilder[InstanceMwParentState] = GraphBuilder(InstanceMwParentState)
builder.set_entry("process")
Expand Down
Loading