diff --git a/conformance.toml b/conformance.toml index 7d8c3a8..4cdd9ac 100644 --- a/conformance.toml +++ b/conformance.toml @@ -386,12 +386,13 @@ since = "0.13.0" # (LlmRetryAttemptEvent) scoped to a future cycle. Call-level retry # ships terminal-only: exactly one LlmCompletionEvent / LlmFailedEvent # per ``complete()`` call. Failure-isolation conformance fixtures -# (058-063) are wired + passing this cycle EXCEPT 061 (three-piece -# composition): the FailureIsolatedEvent's attempt_index there is the -# node-level baseline (0) while the fixture asserts the final retry -# attempt (1) per §6.3's lineage-correlation rule — RetryMiddleware -# resets the attempt ContextVar before the outer isolation middleware -# catches. That attempt_index reconciliation is pending. +# (058-063) are all wired + passing: the FailureIsolatedEvent's +# attempt_index reports the final / exhausting attempt per §6.3's +# lineage-correlation rule (spec ruled this in the attempt-index coord +# thread; RetryMiddleware now records the final attempt in a +# terminal-attempt scope the outer isolation reads, rather than the +# post-reset baseline). ``partial`` is now solely about the +# call-level-retry per-attempt span surface above. [proposals."0050"] status = "partial" since = "0.14.0" diff --git a/src/openarmature/graph/middleware/failure_isolation.py b/src/openarmature/graph/middleware/failure_isolation.py index 914fe56..a47f0f9 100644 --- a/src/openarmature/graph/middleware/failure_isolation.py +++ b/src/openarmature/graph/middleware/failure_isolation.py @@ -40,6 +40,9 @@ from typing import Any from openarmature.observability.correlation import ( + _current_terminal_attempt_index, + _reset_terminal_attempt_index, + _set_terminal_attempt_index, current_attempt_index, current_branch_name, current_dispatch, @@ -131,38 +134,49 @@ def __init__( self.on_caught = on_caught async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]: + # Establish a clean terminal-attempt scope: an inner + # RetryMiddleware records its final / exhausting attempt here on + # give-up, and _emit_event reports it (proposal 0050 §6.3). The + # ``None`` on entry shadows any stale ambient value so this call + # reads correctly; the finally restores the prior value (token + # semantics), and the next isolation call shadows again on entry. + terminal_token = _set_terminal_attempt_index(None) try: - return await next_(state) - except Exception as exc: - # BaseException (cancellation) never enters here — it - # extends BaseException, not Exception. Same rule as - # RetryMiddleware: cancellation MUST propagate. - if self.predicate is not None and not self.predicate(exc): - raise - # Resolve the degraded update once, at catch time, and reuse - # it for the event's post_state and the node return so a - # callable degraded_update is invoked exactly once. The - # observable order the spec prescribes — emit the event, then - # on_caught, then return the update — is preserved below; - # resolving here first only populates post_state. - degraded = self._resolve_degraded(state) - self._emit_event(state, exc, degraded) - if self.on_caught is not None: - try: - await self.on_caught(exc) - except Exception as hook_error: # noqa: BLE001 - # on_caught is caller telemetry; a bug in it MUST NOT - # turn a recovered node back into a crash. Isolate it - # the way the observer-delivery contract isolates - # observer exceptions (warn, don't propagate). - # BaseException (cancellation) still propagates by not - # being caught here. - warnings.warn( - f"FailureIsolationMiddleware on_caught raised " - f"{type(hook_error).__name__}: {hook_error}", - stacklevel=2, - ) - return degraded + try: + return await next_(state) + except Exception as exc: + # BaseException (cancellation) never enters here — it + # extends BaseException, not Exception. Same rule as + # RetryMiddleware: cancellation MUST propagate. + if self.predicate is not None and not self.predicate(exc): + raise + # Resolve the degraded update once, at catch time, and + # reuse it for the event's post_state and the node return + # so a callable degraded_update is invoked exactly once. + # The observable order the spec prescribes — emit the + # event, then on_caught, then return the update — is + # preserved below; resolving here first only populates + # post_state. + degraded = self._resolve_degraded(state) + self._emit_event(state, exc, degraded) + if self.on_caught is not None: + try: + await self.on_caught(exc) + except Exception as hook_error: # noqa: BLE001 + # on_caught is caller telemetry; a bug in it MUST + # NOT turn a recovered node back into a crash. + # Isolate it the way the observer-delivery contract + # isolates observer exceptions (warn, don't + # propagate). BaseException (cancellation) still + # propagates by not being caught here. + warnings.warn( + f"FailureIsolationMiddleware on_caught raised " + f"{type(hook_error).__name__}: {hook_error}", + stacklevel=2, + ) + return degraded + finally: + _reset_terminal_attempt_index(terminal_token) def _resolve_degraded(self, state: Any) -> Mapping[str, Any]: if callable(self.degraded_update): @@ -192,22 +206,26 @@ def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) - cause = _resolve_cause(exc) cause_category = getattr(cause, "category", None) category = cause_category if isinstance(cause_category, str) else None - # ``attempt_index`` here is deliberately the NODE-level baseline, - # not a per-attempt wire index: failure isolation is a node-level - # concern ("the node, across its retries, was isolated"). When - # this middleware is OUTER of RetryMiddleware, retry has already - # reset the attempt ContextVar to that baseline (0) in its - # ``finally`` by the time the terminal exception reaches this - # catch, which is the frame we want (spec-confirmed). Parenting is - # unaffected: the node's attempt spans are already closed by - # delivery time (their completed event precedes this one on the - # serial queue), so observers parent the marker under the - # invocation span and correlate by ``namespace`` + node name. + # ``attempt_index`` is the wrapped node's final / exhausting + # attempt (proposal 0050 §6.3: "the same lineage tuple NodeEvent + # carries, for correlation with the wrapped node's other events"). + # When this middleware is OUTER of RetryMiddleware, retry records + # that index in the terminal-attempt scope on give-up — its own + # ``finally`` has reset the live attempt-index var to the baseline + # by the time the exception reaches this catch, so we read the + # recorded terminal index instead. With no retry, nothing is + # recorded and we fall back to the live attempt index (0 at a node + # body). Parenting is unaffected: the node's attempt spans are + # already closed by delivery time (their completed event precedes + # this one on the serial queue), so observers parent the marker + # under the invocation span and correlate by ``namespace`` + name. + terminal_attempt = _current_terminal_attempt_index() + attempt_index = terminal_attempt if terminal_attempt is not None else current_attempt_index() dispatch( FailureIsolatedEvent( event_name=self.event_name, namespace=current_namespace_prefix(), - attempt_index=current_attempt_index(), + attempt_index=attempt_index, fan_out_index=current_fan_out_index(), branch_name=current_branch_name(), pre_state=state, diff --git a/src/openarmature/graph/middleware/retry.py b/src/openarmature/graph/middleware/retry.py index b90e125..b1883fc 100644 --- a/src/openarmature/graph/middleware/retry.py +++ b/src/openarmature/graph/middleware/retry.py @@ -24,7 +24,11 @@ from typing import Any from openarmature.llm.errors import TRANSIENT_CATEGORIES -from openarmature.observability.correlation import _reset_attempt_index, _set_attempt_index +from openarmature.observability.correlation import ( + _record_terminal_attempt_index, + _reset_attempt_index, + _set_attempt_index, +) from openarmature.observability.metadata import ( _invocation_metadata_var, _reset_invocation_metadata, @@ -202,6 +206,11 @@ async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]: # not the failed attempt's transient state. _reset_invocation_metadata(metadata_token) if attempt + 1 >= self.config.max_attempts or not classifier(exc, state): + # Record the final / exhausting attempt so an OUTER + # FailureIsolationMiddleware reports it rather than + # the post-reset baseline (proposal 0050 §6.3). The + # enclosing isolation scope owns the cleanup. + _record_terminal_attempt_index(attempt) raise if self.config.on_retry is not None: await self.config.on_retry(exc, attempt) diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index a1d4b50..af515d7 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -483,6 +483,61 @@ def _reset_attempt_index(token: Token[int]) -> None: _attempt_index_var.reset(token) +# --------------------------------------------------------------------------- +# Terminal attempt index — for FailureIsolationMiddleware (proposal 0050 +# §6.3). RetryMiddleware resets ``attempt_index`` in its per-iteration +# ``finally`` as the exhausted exception unwinds, so an OUTER +# FailureIsolationMiddleware would otherwise read the post-reset baseline +# rather than the final / exhausting attempt the §6.3 lineage-correlation +# rule mandates. On give-up, retry records the final attempt here; the +# enclosing FailureIsolationMiddleware establishes the scope (None on +# entry, reset on exit) and reads it, falling back to ``attempt_index`` +# when no retry exhausted. A retry that records without an enclosing +# isolation scope leaves a non-None value in the ambient context, but it +# is never OBSERVED: the sole reader (FailureIsolationMiddleware) shadows +# any such stale value with its own ``None`` on entry before reading. +# +# Two setters by design: ``_set`` / ``_reset`` bracket the isolation SCOPE +# (token-based, mirroring ``_attempt_index``); ``_record`` is retry's +# fire-and-forget write WITHIN that scope (no token, since the scope owner +# does the reset), keeping retry off ContextVar bookkeeping it doesn't own. +# +# Known limitation: the INNERMOST isolation consumes the record (its reset +# discards it), so a nested OUTER isolation catching an inner-isolation- +# rejected exception after a retry exhaustion reads the live baseline. That +# nesting is contrived and intentionally unguarded. +# --------------------------------------------------------------------------- + + +_terminal_attempt_index_var: ContextVar[int | None] = ContextVar( + "openarmature.terminal_attempt_index", default=None +) + + +def _current_terminal_attempt_index() -> int | None: + """Return the final / exhausting attempt index recorded by a retry + that gave up within the current FailureIsolationMiddleware scope, or + ``None`` when no retry exhausted. Internal.""" + return _terminal_attempt_index_var.get() + + +def _set_terminal_attempt_index(value: int | None) -> Token[int | None]: + """Establish a terminal-attempt scope (FailureIsolationMiddleware sets + ``None`` on entry). Internal.""" + return _terminal_attempt_index_var.set(value) + + +def _reset_terminal_attempt_index(token: Token[int | None]) -> None: + _terminal_attempt_index_var.reset(token) + + +def _record_terminal_attempt_index(value: int) -> None: + """Record the final / exhausting attempt within the current scope + (RetryMiddleware on give-up). The enclosing FailureIsolationMiddleware + scope owns the cleanup, so no token is returned. Internal.""" + _terminal_attempt_index_var.set(value) + + # --------------------------------------------------------------------------- # Active observer span — for engine-side OTel context attach inside # ``innermost``. Populated synchronously by an observer's ``prepare_sync`` @@ -567,6 +622,8 @@ def _reset_active_observer_span(token: Token[object | None]) -> None: # ``openarmature.graph.compiled`` can drive set/reset without # pyright's strict ``reportUnusedFunction`` flagging them as # dead. Underscore-prefixed; not part of the user-facing API. + "_current_terminal_attempt_index", + "_record_terminal_attempt_index", "_reset_active_dispatch", "_reset_active_observer_span", "_reset_active_observers", @@ -578,6 +635,7 @@ def _reset_active_observer_span(token: Token[object | None]) -> None: "_reset_fan_out_index_chain", "_reset_invocation_id", "_reset_namespace_prefix", + "_reset_terminal_attempt_index", "_set_active_dispatch", "_set_active_observer_span", "_set_active_observers", @@ -589,4 +647,5 @@ def _reset_active_observer_span(token: Token[object | None]) -> None: "_set_fan_out_index_chain", "_set_invocation_id", "_set_namespace_prefix", + "_set_terminal_attempt_index", ] diff --git a/tests/conformance/test_pipeline_utilities.py b/tests/conformance/test_pipeline_utilities.py index a8766b9..08bc3bb 100644 --- a/tests/conformance/test_pipeline_utilities.py +++ b/tests/conformance/test_pipeline_utilities.py @@ -133,18 +133,6 @@ def _fixture_id(path: Path) -> str: "029-checkpoint-subgraph-resume": "checkpointing (test_checkpoint.py)", "030-checkpoint-not-found": "checkpointing (test_checkpoint.py)", "031-checkpoint-correlation-id-preserved-across-resume": "checkpointing (test_checkpoint.py)", - # Failure-isolation three-piece composition (proposal 0050 §6.3). The - # FailureIsolatedEvent's attempt_index should reflect the final retry - # attempt (1) per §6.3's "same lineage tuple NodeEvent carries" - # correlation rule, but python emits the node-level baseline (0): - # RetryMiddleware resets the attempt ContextVar in its finally before - # the outer isolation middleware catches. The 0050 impl claimed 0 was - # spec-confirmed via the design thread; the fixture asserts 1. - # Reconcile with spec and fix attempt_index, then un-defer. - "061-failure-isolation-retry-three-piece-composition": ( - "FailureIsolatedEvent.attempt_index baseline (0) vs final-attempt (1) " - "discrepancy; reconcile with spec + fix" - ), } diff --git a/tests/unit/test_failure_isolation_middleware.py b/tests/unit/test_failure_isolation_middleware.py index 821e1ca..d138a97 100644 --- a/tests/unit/test_failure_isolation_middleware.py +++ b/tests/unit/test_failure_isolation_middleware.py @@ -437,6 +437,13 @@ async def _flaky(_s: _DocState) -> Mapping[str, Any]: .compile() ) + isolated: list[FailureIsolatedEvent] = [] + + async def _capture(event: ObserverEvent) -> None: + if isinstance(event, FailureIsolatedEvent): + isolated.append(event) + + graph.attach_observer(_capture) final = await graph.invoke(_DocState()) await graph.drain() @@ -444,6 +451,11 @@ async def _flaky(_s: _DocState) -> Mapping[str, Any]: # propagated exhaustion exception and substituted the degraded value. assert attempts["n"] == 3 assert final.note == "gave_up" + # The event's attempt_index is the final / exhausting attempt (2 after + # attempts 0/1/2), not the post-reset baseline (proposal 0050 §6.3 + # lineage correlation). + assert len(isolated) == 1 + assert isolated[0].attempt_index == 2 # ---------------------------------------------------------------------------