diff --git a/CHANGELOG.md b/CHANGELOG.md index d6a0243..87d3154 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Changed - **`RetryMiddleware` now takes a `RetryConfig` record** instead of individual constructor kwargs (proposal 0050 prep). The four retry settings (`max_attempts` / `classifier` / `backoff` / `on_retry`, each optional) move onto a frozen `RetryConfig`; construct as `RetryMiddleware(RetryConfig(max_attempts=...))`, while bare `RetryMiddleware()` still applies the defaults. This is a breaking change to the `RetryMiddleware` constructor. The record is the same shape the upcoming call-level `complete(retry=...)` parameter will accept, so one retry config serves both the per-node and per-call layers. `None` fields resolve to the canonical defaults (`default_classifier` / `exponential_jitter_backoff`) at use, preserving the prior behavior. +- **Failure-isolation events report the originating cause's category at non-node placements** (proposal 0065, pipeline-utilities §6.3). When `FailureIsolationMiddleware` runs as instance middleware (§9.7), branch middleware (§11.7), or parent-node middleware on a fan-out / parallel-branches node, the graph engine has already wrapped the originating error as a `node_exception` carrier before the middleware catches it. `FailureIsolatedEvent.caught_exception.category` now resolves through that carrier (and any nested carriers) to the nearest categorized originating cause and reports its category instead of the masking `node_exception`, so the reported category agrees with what the §6.1 retry classifier acted on. For example, an instance whose retries exhaust on `provider_unavailable` now surfaces `provider_unavailable` rather than `node_exception`. The `message` tracks the resolved cause for category/message coherence. Node-level placement was already faithful and is unchanged, and catch/degrade behavior is unchanged at every site (only the event's reported cause changes). The wrapped-instance/branch lineage SHOULD (`fan_out_index` / `branch_name`) is deferred to a follow-up, since it needs the engine to surface per-instance identity to the wrapping-site middleware. ## [0.13.0] — 2026-06-09 diff --git a/src/openarmature/graph/middleware/failure_isolation.py b/src/openarmature/graph/middleware/failure_isolation.py index c501410..914fe56 100644 --- a/src/openarmature/graph/middleware/failure_isolation.py +++ b/src/openarmature/graph/middleware/failure_isolation.py @@ -55,6 +55,43 @@ DegradedUpdate = Mapping[str, Any] | Callable[[Any], Mapping[str, Any]] +def _resolve_cause(exc: Exception) -> BaseException: + # Cause fidelity (proposal 0065 / §6.3, plus the python "nearest + # categorized" refinement). Walk the ``__cause__`` chain to the most + # actionable cause, skipping graph-engine §4 ``node_exception`` carrier + # wrappers (``NodeException`` and subtypes such as + # ``ParallelBranchesBranchFailed``) the engine applies at a non-node + # placement (§9.7 instance, §11.7 branch, §9.6 / §11.6 parent-node + # middleware). Returns the FIRST non-carrier exception that carries a + # string ``category`` — so a deliberately re-categorized surface error + # wins, while an uncategorized surface error resolves to the categorized + # cause beneath it (the same chain §6.1's default classifier consults + # for retryability, so the reported category agrees with what retry + # acted on). When nothing in the chain carries a category, returns the + # originating non-carrier raise (its own message, null category). + # Node-level placement has no carrier, so ``exc`` itself is the + # originating raise. The local import keeps ``errors`` off the + # middleware module-load path, matching the deferred ``events`` import + # in ``_emit_event``. + from openarmature.graph.errors import NodeException + + origin: BaseException | None = None + current: BaseException | None = exc + seen: set[int] = set() + # Traverse only BaseException instances (a non-exception ``__cause__`` + # ends the walk) and guard against a cyclic ``__cause__`` chain so a + # malformed chain can't hang or crash the degrade path. + while isinstance(current, BaseException) and id(current) not in seen: + seen.add(id(current)) + if not isinstance(current, NodeException): + if origin is None: + origin = current + if isinstance(getattr(current, "category", None), str): + return current + current = current.__cause__ + return origin if origin is not None else exc + + class FailureIsolationMiddleware: """Catch exceptions escaping the inner chain; return a degraded partial update. @@ -145,17 +182,16 @@ def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) - # path and defers it until the first catch. from openarmature.graph.events import CaughtException, FailureIsolatedEvent - # A categorized exception (e.g. an llm-provider error) carries a - # string ``category``. When the engine has wrapped the original - # in a graph-engine error before it reached the middleware, the - # category rides on ``__cause__`` — walk it the same way the - # default retry classifier does so the caught failure's category - # survives the wrapping. A bare exception yields ``None``. - category = getattr(exc, "category", None) - if not isinstance(category, str): - cause = getattr(exc, "__cause__", None) - cause_category = getattr(cause, "category", None) if cause is not None else None - category = cause_category if isinstance(cause_category, str) else None + # Cause fidelity (proposal 0065 / §6.3). ``_resolve_cause`` walks + # past graph-engine ``node_exception`` carrier wrappers to the + # nearest categorized originating cause (see its comment); the + # reported ``category`` and ``message`` both come from it so they + # describe one exception — NOT the masking ``node_exception``. A + # bare / uncategorized cause yields a null category. Node-level + # placement has no carrier, so this is the caught exception itself. + cause = _resolve_cause(exc) + cause_category = getattr(cause, "category", None) + category = cause_category if isinstance(cause_category, str) else None # ``attempt_index`` here is deliberately the NODE-level baseline, # not a per-attempt wire index: failure isolation is a node-level # concern ("the node, across its retries, was isolated"). When @@ -176,7 +212,9 @@ def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) - branch_name=current_branch_name(), pre_state=state, post_state=degraded, - caught_exception=CaughtException(category=category, message=str(exc)), + # ``message`` tracks the resolved cause (§6.3 SHOULD) so + # the reported category and message describe one exception. + caught_exception=CaughtException(category=category, message=str(cause)), ) ) diff --git a/tests/unit/test_failure_isolation_middleware.py b/tests/unit/test_failure_isolation_middleware.py index 57f3b08..821e1ca 100644 --- a/tests/unit/test_failure_isolation_middleware.py +++ b/tests/unit/test_failure_isolation_middleware.py @@ -28,6 +28,7 @@ append, deterministic_backoff, ) +from openarmature.graph.errors import NodeException, ParallelBranchesBranchFailed from openarmature.graph.middleware import NextCall from openarmature.observability.correlation import ( _reset_active_dispatch, @@ -41,6 +42,10 @@ class _TransientError(Exception): category = "provider_rate_limit" +class _NonTransientError(Exception): + category = "provider_invalid_request" + + def _raises(exc: BaseException) -> NextCall: """A ``next`` callable that raises ``exc`` when invoked.""" @@ -210,6 +215,139 @@ async def test_bare_exception_has_null_category() -> None: assert events[0].caught_exception.message == "plain" +# --------------------------------------------------------------------------- +# Cause fidelity at carrier-wrapper sites (proposal 0065) +# --------------------------------------------------------------------------- + + +async def test_node_exception_carrier_resolves_to_originating_category() -> None: + # At a non-node placement the engine wraps the originating error as a + # node_exception carrier before the middleware catches it; the event + # reports the originating category, NOT the masking node_exception. + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + carrier = NodeException(node_name="work", cause=_TransientError("rate limited"), recoverable_state={}) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + out = await mw("s", _raises(carrier)) + finally: + _reset_active_dispatch(disp_token) + + assert out == {"result": []} + assert isinstance(events[0], FailureIsolatedEvent) + assert events[0].caught_exception.category == "provider_rate_limit" + assert events[0].caught_exception.message == "rate limited" + + +async def test_nested_carriers_resolve_to_originating_category() -> None: + # Nested subgraph boundaries stack node_exception carriers; resolution + # walks all of them to the originating cause. + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + inner = NodeException(node_name="inner", cause=_TransientError("rate limited"), recoverable_state={}) + outer = NodeException(node_name="outer", cause=inner, recoverable_state={}) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + await mw("s", _raises(outer)) + finally: + _reset_active_dispatch(disp_token) + + assert events[0].caught_exception.category == "provider_rate_limit" + + +async def test_branch_carrier_subtype_resolves_to_originating_category() -> None: + # The §11.7 branch site catches a ParallelBranchesBranchFailed (a + # NodeException subtype); resolution still reaches the originating cause. + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + carrier = ParallelBranchesBranchFailed( + node_name="dispatcher", + cause=_TransientError("rate limited"), + recoverable_state={}, + branch_name="only", + ) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + await mw("s", _raises(carrier)) + finally: + _reset_active_dispatch(disp_token) + + assert events[0].caught_exception.category == "provider_rate_limit" + + +async def test_carrier_over_uncategorized_cause_is_null() -> None: + # Resolving through the carrier reaches a cause with no category, so the + # reported category is null (the existing bare-exception rule). + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + carrier = NodeException(node_name="work", cause=ValueError("boom"), recoverable_state={}) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + await mw("s", _raises(carrier)) + finally: + _reset_active_dispatch(disp_token) + + assert events[0].caught_exception.category is None + assert events[0].caught_exception.message == "boom" + + +async def test_uncategorized_surface_resolves_to_categorized_cause() -> None: + # A node that wraps a categorized provider error in an uncategorized + # domain error: the event surfaces the underlying provider category + # (agreeing with what §6.1's classifier retries on), and the message + # tracks that same cause for coherence. + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + surface = ValueError("wrapped") + surface.__cause__ = _TransientError("rate limited") + carrier = NodeException(node_name="work", cause=surface, recoverable_state={}) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + await mw("s", _raises(carrier)) + finally: + _reset_active_dispatch(disp_token) + + assert events[0].caught_exception.category == "provider_rate_limit" + assert events[0].caught_exception.message == "rate limited" + + +async def test_categorized_surface_wins_over_deeper_cause() -> None: + # A node that deliberately re-categorizes (raises a categorized error + # FROM a categorized cause): the nearest category wins, so the node's + # re-categorization is respected rather than the deeper cause. + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + surface = _NonTransientError("misconfigured") + surface.__cause__ = _TransientError("rate limited") + carrier = NodeException(node_name="work", cause=surface, recoverable_state={}) + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + await mw("s", _raises(carrier)) + finally: + _reset_active_dispatch(disp_token) + + assert events[0].caught_exception.category == "provider_invalid_request" + assert events[0].caught_exception.message == "misconfigured" + + +async def test_cyclic_cause_chain_terminates() -> None: + # Defensive: a self-referential __cause__ chain must not hang the + # degrade path. Resolution terminates and the node still degrades. + events: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: events.append(e)) + a = NodeException(node_name="a", cause=ValueError("seed"), recoverable_state={}) + b = NodeException(node_name="b", cause=a, recoverable_state={}) + a.__cause__ = b # cycle: a -> b -> a + try: + mw = FailureIsolationMiddleware(degraded_update={"result": []}, event_name="iso") + out = await mw("s", _raises(a)) + finally: + _reset_active_dispatch(disp_token) + + assert out == {"result": []} + assert len(events) == 1 + + async def test_no_event_outside_invocation() -> None: # current_dispatch() is None outside an invocation; the degrade still # happens, no event is emitted, and nothing raises.