Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
### Changed

- **`RetryMiddleware` now takes a `RetryConfig` record** instead of individual constructor kwargs (proposal 0050 prep). The four retry settings (`max_attempts` / `classifier` / `backoff` / `on_retry`, each optional) move onto a frozen `RetryConfig`; construct as `RetryMiddleware(RetryConfig(max_attempts=...))`, while bare `RetryMiddleware()` still applies the defaults. This is a breaking change to the `RetryMiddleware` constructor. The record is the same shape the upcoming call-level `complete(retry=...)` parameter will accept, so one retry config serves both the per-node and per-call layers. `None` fields resolve to the canonical defaults (`default_classifier` / `exponential_jitter_backoff`) at use, preserving the prior behavior.
- **Failure-isolation events report the originating cause's category at non-node placements** (proposal 0065, pipeline-utilities §6.3). When `FailureIsolationMiddleware` runs as instance middleware (§9.7), branch middleware (§11.7), or parent-node middleware on a fan-out / parallel-branches node, the graph engine has already wrapped the originating error as a `node_exception` carrier before the middleware catches it. `FailureIsolatedEvent.caught_exception.category` now resolves through that carrier (and any nested carriers) to the nearest categorized originating cause and reports its category instead of the masking `node_exception`, so the reported category agrees with what the §6.1 retry classifier acted on. For example, an instance whose retries exhaust on `provider_unavailable` now surfaces `provider_unavailable` rather than `node_exception`. The `message` tracks the resolved cause for category/message coherence. Node-level placement was already faithful and is unchanged, and catch/degrade behavior is unchanged at every site (only the event's reported cause changes). The wrapped-instance/branch lineage SHOULD (`fan_out_index` / `branch_name`) is deferred to a follow-up, since it needs the engine to surface per-instance identity to the wrapping-site middleware.

## [0.13.0] — 2026-06-09

Expand Down
62 changes: 50 additions & 12 deletions src/openarmature/graph/middleware/failure_isolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,43 @@
DegradedUpdate = Mapping[str, Any] | Callable[[Any], Mapping[str, Any]]


def _resolve_cause(exc: Exception) -> BaseException:
# Cause fidelity (proposal 0065 / §6.3, plus the python "nearest
# categorized" refinement). Walk the ``__cause__`` chain to the most
# actionable cause, skipping graph-engine §4 ``node_exception`` carrier
# wrappers (``NodeException`` and subtypes such as
# ``ParallelBranchesBranchFailed``) the engine applies at a non-node
# placement (§9.7 instance, §11.7 branch, §9.6 / §11.6 parent-node
# middleware). Returns the FIRST non-carrier exception that carries a
# string ``category`` — so a deliberately re-categorized surface error
# wins, while an uncategorized surface error resolves to the categorized
# cause beneath it (the same chain §6.1's default classifier consults
# for retryability, so the reported category agrees with what retry
# acted on). When nothing in the chain carries a category, returns the
# originating non-carrier raise (its own message, null category).
# Node-level placement has no carrier, so ``exc`` itself is the
# originating raise. The local import keeps ``errors`` off the
# middleware module-load path, matching the deferred ``events`` import
# in ``_emit_event``.
from openarmature.graph.errors import NodeException

origin: BaseException | None = None
current: BaseException | None = exc
seen: set[int] = set()
# Traverse only BaseException instances (a non-exception ``__cause__``
# ends the walk) and guard against a cyclic ``__cause__`` chain so a
# malformed chain can't hang or crash the degrade path.
while isinstance(current, BaseException) and id(current) not in seen:
seen.add(id(current))
if not isinstance(current, NodeException):
if origin is None:
origin = current
if isinstance(getattr(current, "category", None), str):
return current
current = current.__cause__
return origin if origin is not None else exc


class FailureIsolationMiddleware:
"""Catch exceptions escaping the inner chain; return a degraded
partial update.
Expand Down Expand Up @@ -145,17 +182,16 @@ def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) -
# path and defers it until the first catch.
from openarmature.graph.events import CaughtException, FailureIsolatedEvent

# A categorized exception (e.g. an llm-provider error) carries a
# string ``category``. When the engine has wrapped the original
# in a graph-engine error before it reached the middleware, the
# category rides on ``__cause__`` — walk it the same way the
# default retry classifier does so the caught failure's category
# survives the wrapping. A bare exception yields ``None``.
category = getattr(exc, "category", None)
if not isinstance(category, str):
cause = getattr(exc, "__cause__", None)
cause_category = getattr(cause, "category", None) if cause is not None else None
category = cause_category if isinstance(cause_category, str) else None
# Cause fidelity (proposal 0065 / §6.3). ``_resolve_cause`` walks
# past graph-engine ``node_exception`` carrier wrappers to the
# nearest categorized originating cause (see its comment); the
# reported ``category`` and ``message`` both come from it so they
# describe one exception — NOT the masking ``node_exception``. A
# bare / uncategorized cause yields a null category. Node-level
# placement has no carrier, so this is the caught exception itself.
cause = _resolve_cause(exc)
cause_category = getattr(cause, "category", None)
category = cause_category if isinstance(cause_category, str) else None
# ``attempt_index`` here is deliberately the NODE-level baseline,
# not a per-attempt wire index: failure isolation is a node-level
# concern ("the node, across its retries, was isolated"). When
Expand All @@ -176,7 +212,9 @@ def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) -
branch_name=current_branch_name(),
pre_state=state,
post_state=degraded,
caught_exception=CaughtException(category=category, message=str(exc)),
# ``message`` tracks the resolved cause (§6.3 SHOULD) so
# the reported category and message describe one exception.
caught_exception=CaughtException(category=category, message=str(cause)),
)
)

Expand Down
138 changes: 138 additions & 0 deletions tests/unit/test_failure_isolation_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
append,
deterministic_backoff,
)
from openarmature.graph.errors import NodeException, ParallelBranchesBranchFailed
from openarmature.graph.middleware import NextCall
from openarmature.observability.correlation import (
_reset_active_dispatch,
Expand All @@ -41,6 +42,10 @@ class _TransientError(Exception):
category = "provider_rate_limit"


class _NonTransientError(Exception):
category = "provider_invalid_request"


def _raises(exc: BaseException) -> NextCall:
"""A ``next`` callable that raises ``exc`` when invoked."""

Expand Down Expand Up @@ -210,6 +215,139 @@ async def test_bare_exception_has_null_category() -> None:
assert events[0].caught_exception.message == "plain"


# ---------------------------------------------------------------------------
# Cause fidelity at carrier-wrapper sites (proposal 0065)
# ---------------------------------------------------------------------------


async def test_node_exception_carrier_resolves_to_originating_category() -> None:
# At a non-node placement the engine wraps the originating error as a
# node_exception carrier before the middleware catches it; the event
# reports the originating category, NOT the masking node_exception.
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
carrier = NodeException(node_name="work", cause=_TransientError("rate limited"), recoverable_state={})
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
out = await mw("s", _raises(carrier))
finally:
_reset_active_dispatch(disp_token)

assert out == {"result": []}
assert isinstance(events[0], FailureIsolatedEvent)
assert events[0].caught_exception.category == "provider_rate_limit"
assert events[0].caught_exception.message == "rate limited"


async def test_nested_carriers_resolve_to_originating_category() -> None:
# Nested subgraph boundaries stack node_exception carriers; resolution
# walks all of them to the originating cause.
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
inner = NodeException(node_name="inner", cause=_TransientError("rate limited"), recoverable_state={})
outer = NodeException(node_name="outer", cause=inner, recoverable_state={})
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
await mw("s", _raises(outer))
finally:
_reset_active_dispatch(disp_token)

assert events[0].caught_exception.category == "provider_rate_limit"


async def test_branch_carrier_subtype_resolves_to_originating_category() -> None:
# The §11.7 branch site catches a ParallelBranchesBranchFailed (a
# NodeException subtype); resolution still reaches the originating cause.
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
carrier = ParallelBranchesBranchFailed(
node_name="dispatcher",
cause=_TransientError("rate limited"),
recoverable_state={},
branch_name="only",
)
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
await mw("s", _raises(carrier))
finally:
_reset_active_dispatch(disp_token)

assert events[0].caught_exception.category == "provider_rate_limit"


async def test_carrier_over_uncategorized_cause_is_null() -> None:
# Resolving through the carrier reaches a cause with no category, so the
# reported category is null (the existing bare-exception rule).
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
carrier = NodeException(node_name="work", cause=ValueError("boom"), recoverable_state={})
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
await mw("s", _raises(carrier))
finally:
_reset_active_dispatch(disp_token)

assert events[0].caught_exception.category is None
assert events[0].caught_exception.message == "boom"


async def test_uncategorized_surface_resolves_to_categorized_cause() -> None:
# A node that wraps a categorized provider error in an uncategorized
# domain error: the event surfaces the underlying provider category
# (agreeing with what §6.1's classifier retries on), and the message
# tracks that same cause for coherence.
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
surface = ValueError("wrapped")
surface.__cause__ = _TransientError("rate limited")
carrier = NodeException(node_name="work", cause=surface, recoverable_state={})
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
await mw("s", _raises(carrier))
finally:
_reset_active_dispatch(disp_token)

assert events[0].caught_exception.category == "provider_rate_limit"
assert events[0].caught_exception.message == "rate limited"


async def test_categorized_surface_wins_over_deeper_cause() -> None:
# A node that deliberately re-categorizes (raises a categorized error
# FROM a categorized cause): the nearest category wins, so the node's
# re-categorization is respected rather than the deeper cause.
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
surface = _NonTransientError("misconfigured")
surface.__cause__ = _TransientError("rate limited")
carrier = NodeException(node_name="work", cause=surface, recoverable_state={})
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
await mw("s", _raises(carrier))
finally:
_reset_active_dispatch(disp_token)

assert events[0].caught_exception.category == "provider_invalid_request"
assert events[0].caught_exception.message == "misconfigured"


async def test_cyclic_cause_chain_terminates() -> None:
# Defensive: a self-referential __cause__ chain must not hang the
# degrade path. Resolution terminates and the node still degrades.
events: list[Any] = []
disp_token = _set_active_dispatch(lambda e: events.append(e))
a = NodeException(node_name="a", cause=ValueError("seed"), recoverable_state={})
b = NodeException(node_name="b", cause=a, recoverable_state={})
a.__cause__ = b # cycle: a -> b -> a
try:
mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso")
out = await mw("s", _raises(a))
finally:
_reset_active_dispatch(disp_token)

assert out == {"result": []}
assert len(events) == 1


async def test_no_event_outside_invocation() -> None:
# current_dispatch() is None outside an invocation; the degrade still
# happens, no event is emitted, and nothing raises.
Expand Down