Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions conformance.toml
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,13 @@ since = "0.13.0"
# (LlmRetryAttemptEvent) scoped to a future cycle. Call-level retry
# ships terminal-only: exactly one LlmCompletionEvent / LlmFailedEvent
# per ``complete()`` call. Failure-isolation conformance fixtures
# (058-063) are wired + passing this cycle EXCEPT 061 (three-piece
# composition): the FailureIsolatedEvent's attempt_index there is the
# node-level baseline (0) while the fixture asserts the final retry
# attempt (1) per §6.3's lineage-correlation rule — RetryMiddleware
# resets the attempt ContextVar before the outer isolation middleware
# catches. That attempt_index reconciliation is pending.
# (058-063) are all wired + passing: the FailureIsolatedEvent's
# attempt_index reports the final / exhausting attempt per §6.3's
# lineage-correlation rule (spec ruled this in the attempt-index coord
# thread; RetryMiddleware now records the final attempt in a
# terminal-attempt scope the outer isolation reads, rather than the
# post-reset baseline). ``partial`` is now solely about the
# call-level-retry per-attempt span surface above.
[proposals."0050"]
status = "partial"
since = "0.14.0"
Expand Down
104 changes: 61 additions & 43 deletions src/openarmature/graph/middleware/failure_isolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
from typing import Any

from openarmature.observability.correlation import (
_current_terminal_attempt_index,
_reset_terminal_attempt_index,
_set_terminal_attempt_index,
current_attempt_index,
current_branch_name,
current_dispatch,
Expand Down Expand Up @@ -131,38 +134,49 @@ def __init__(
self.on_caught = on_caught

async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]:
# Establish a clean terminal-attempt scope: an inner
# RetryMiddleware records its final / exhausting attempt here on
# give-up, and _emit_event reports it (proposal 0050 §6.3). The
# ``None`` on entry shadows any stale ambient value so this call
# reads correctly; the finally restores the prior value (token
# semantics), and the next isolation call shadows again on entry.
terminal_token = _set_terminal_attempt_index(None)
try:
return await next_(state)
except Exception as exc:
# BaseException (cancellation) never enters here — it
# extends BaseException, not Exception. Same rule as
# RetryMiddleware: cancellation MUST propagate.
if self.predicate is not None and not self.predicate(exc):
raise
# Resolve the degraded update once, at catch time, and reuse
# it for the event's post_state and the node return so a
# callable degraded_update is invoked exactly once. The
# observable order the spec prescribes — emit the event, then
# on_caught, then return the update — is preserved below;
# resolving here first only populates post_state.
degraded = self._resolve_degraded(state)
self._emit_event(state, exc, degraded)
if self.on_caught is not None:
try:
await self.on_caught(exc)
except Exception as hook_error: # noqa: BLE001
# on_caught is caller telemetry; a bug in it MUST NOT
# turn a recovered node back into a crash. Isolate it
# the way the observer-delivery contract isolates
# observer exceptions (warn, don't propagate).
# BaseException (cancellation) still propagates by not
# being caught here.
warnings.warn(
f"FailureIsolationMiddleware on_caught raised "
f"{type(hook_error).__name__}: {hook_error}",
stacklevel=2,
)
return degraded
try:
return await next_(state)
except Exception as exc:
# BaseException (cancellation) never enters here — it
# extends BaseException, not Exception. Same rule as
# RetryMiddleware: cancellation MUST propagate.
if self.predicate is not None and not self.predicate(exc):
raise
# Resolve the degraded update once, at catch time, and
# reuse it for the event's post_state and the node return
# so a callable degraded_update is invoked exactly once.
# The observable order the spec prescribes — emit the
# event, then on_caught, then return the update — is
# preserved below; resolving here first only populates
# post_state.
degraded = self._resolve_degraded(state)
self._emit_event(state, exc, degraded)
if self.on_caught is not None:
try:
await self.on_caught(exc)
except Exception as hook_error: # noqa: BLE001
# on_caught is caller telemetry; a bug in it MUST
# NOT turn a recovered node back into a crash.
# Isolate it the way the observer-delivery contract
# isolates observer exceptions (warn, don't
# propagate). BaseException (cancellation) still
# propagates by not being caught here.
warnings.warn(
f"FailureIsolationMiddleware on_caught raised "
f"{type(hook_error).__name__}: {hook_error}",
stacklevel=2,
)
return degraded
finally:
_reset_terminal_attempt_index(terminal_token)

def _resolve_degraded(self, state: Any) -> Mapping[str, Any]:
if callable(self.degraded_update):
Expand Down Expand Up @@ -192,22 +206,26 @@ def _emit_event(self, state: Any, exc: Exception, degraded: Mapping[str, Any]) -
cause = _resolve_cause(exc)
cause_category = getattr(cause, "category", None)
category = cause_category if isinstance(cause_category, str) else None
# ``attempt_index`` here is deliberately the NODE-level baseline,
# not a per-attempt wire index: failure isolation is a node-level
# concern ("the node, across its retries, was isolated"). When
# this middleware is OUTER of RetryMiddleware, retry has already
# reset the attempt ContextVar to that baseline (0) in its
# ``finally`` by the time the terminal exception reaches this
# catch, which is the frame we want (spec-confirmed). Parenting is
# unaffected: the node's attempt spans are already closed by
# delivery time (their completed event precedes this one on the
# serial queue), so observers parent the marker under the
# invocation span and correlate by ``namespace`` + node name.
# ``attempt_index`` is the wrapped node's final / exhausting
# attempt (proposal 0050 §6.3: "the same lineage tuple NodeEvent
# carries, for correlation with the wrapped node's other events").
# When this middleware is OUTER of RetryMiddleware, retry records
# that index in the terminal-attempt scope on give-up — its own
# ``finally`` has reset the live attempt-index var to the baseline
# by the time the exception reaches this catch, so we read the
# recorded terminal index instead. With no retry, nothing is
# recorded and we fall back to the live attempt index (0 at a node
# body). Parenting is unaffected: the node's attempt spans are
# already closed by delivery time (their completed event precedes
# this one on the serial queue), so observers parent the marker
# under the invocation span and correlate by ``namespace`` + name.
terminal_attempt = _current_terminal_attempt_index()
attempt_index = terminal_attempt if terminal_attempt is not None else current_attempt_index()
dispatch(
FailureIsolatedEvent(
event_name=self.event_name,
namespace=current_namespace_prefix(),
attempt_index=current_attempt_index(),
attempt_index=attempt_index,
fan_out_index=current_fan_out_index(),
branch_name=current_branch_name(),
pre_state=state,
Expand Down
11 changes: 10 additions & 1 deletion src/openarmature/graph/middleware/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
from typing import Any

from openarmature.llm.errors import TRANSIENT_CATEGORIES
from openarmature.observability.correlation import _reset_attempt_index, _set_attempt_index
from openarmature.observability.correlation import (
_record_terminal_attempt_index,
_reset_attempt_index,
_set_attempt_index,
)
from openarmature.observability.metadata import (
_invocation_metadata_var,
_reset_invocation_metadata,
Expand Down Expand Up @@ -202,6 +206,11 @@ async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]:
# not the failed attempt's transient state.
_reset_invocation_metadata(metadata_token)
if attempt + 1 >= self.config.max_attempts or not classifier(exc, state):
# Record the final / exhausting attempt so an OUTER
# FailureIsolationMiddleware reports it rather than
# the post-reset baseline (proposal 0050 §6.3). The
# enclosing isolation scope owns the cleanup.
_record_terminal_attempt_index(attempt)
raise
if self.config.on_retry is not None:
await self.config.on_retry(exc, attempt)
Expand Down
59 changes: 59 additions & 0 deletions src/openarmature/observability/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,61 @@ def _reset_attempt_index(token: Token[int]) -> None:
_attempt_index_var.reset(token)


# ---------------------------------------------------------------------------
# Terminal attempt index — for FailureIsolationMiddleware (proposal 0050
# §6.3). RetryMiddleware resets ``attempt_index`` in its per-iteration
# ``finally`` as the exhausted exception unwinds, so an OUTER
# FailureIsolationMiddleware would otherwise read the post-reset baseline
# rather than the final / exhausting attempt the §6.3 lineage-correlation
# rule mandates. On give-up, retry records the final attempt here; the
# enclosing FailureIsolationMiddleware establishes the scope (None on
# entry, reset on exit) and reads it, falling back to ``attempt_index``
# when no retry exhausted. A retry that records without an enclosing
# isolation scope leaves a non-None value in the ambient context, but it
# is never OBSERVED: the sole reader (FailureIsolationMiddleware) shadows
# any such stale value with its own ``None`` on entry before reading.
#
# Two setters by design: ``_set`` / ``_reset`` bracket the isolation SCOPE
# (token-based, mirroring ``_attempt_index``); ``_record`` is retry's
# fire-and-forget write WITHIN that scope (no token, since the scope owner
# does the reset), keeping retry off ContextVar bookkeeping it doesn't own.
#
# Known limitation: the INNERMOST isolation consumes the record (its reset
# discards it), so a nested OUTER isolation catching an inner-isolation-
# rejected exception after a retry exhaustion reads the live baseline. That
# nesting is contrived and intentionally unguarded.
# ---------------------------------------------------------------------------


_terminal_attempt_index_var: ContextVar[int | None] = ContextVar(
"openarmature.terminal_attempt_index", default=None
)


def _current_terminal_attempt_index() -> int | None:
"""Return the final / exhausting attempt index recorded by a retry
that gave up within the current FailureIsolationMiddleware scope, or
``None`` when no retry exhausted. Internal."""
return _terminal_attempt_index_var.get()


def _set_terminal_attempt_index(value: int | None) -> Token[int | None]:
"""Establish a terminal-attempt scope (FailureIsolationMiddleware sets
``None`` on entry). Internal."""
return _terminal_attempt_index_var.set(value)


def _reset_terminal_attempt_index(token: Token[int | None]) -> None:
_terminal_attempt_index_var.reset(token)


def _record_terminal_attempt_index(value: int) -> None:
"""Record the final / exhausting attempt within the current scope
(RetryMiddleware on give-up). The enclosing FailureIsolationMiddleware
scope owns the cleanup, so no token is returned. Internal."""
_terminal_attempt_index_var.set(value)


# ---------------------------------------------------------------------------
# Active observer span — for engine-side OTel context attach inside
# ``innermost``. Populated synchronously by an observer's ``prepare_sync``
Expand Down Expand Up @@ -567,6 +622,8 @@ def _reset_active_observer_span(token: Token[object | None]) -> None:
# ``openarmature.graph.compiled`` can drive set/reset without
# pyright's strict ``reportUnusedFunction`` flagging them as
# dead. Underscore-prefixed; not part of the user-facing API.
"_current_terminal_attempt_index",
"_record_terminal_attempt_index",
"_reset_active_dispatch",
"_reset_active_observer_span",
"_reset_active_observers",
Expand All @@ -578,6 +635,7 @@ def _reset_active_observer_span(token: Token[object | None]) -> None:
"_reset_fan_out_index_chain",
"_reset_invocation_id",
"_reset_namespace_prefix",
"_reset_terminal_attempt_index",
"_set_active_dispatch",
"_set_active_observer_span",
"_set_active_observers",
Expand All @@ -589,4 +647,5 @@ def _reset_active_observer_span(token: Token[object | None]) -> None:
"_set_fan_out_index_chain",
"_set_invocation_id",
"_set_namespace_prefix",
"_set_terminal_attempt_index",
]
12 changes: 0 additions & 12 deletions tests/conformance/test_pipeline_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,6 @@ def _fixture_id(path: Path) -> str:
"029-checkpoint-subgraph-resume": "checkpointing (test_checkpoint.py)",
"030-checkpoint-not-found": "checkpointing (test_checkpoint.py)",
"031-checkpoint-correlation-id-preserved-across-resume": "checkpointing (test_checkpoint.py)",
# Failure-isolation three-piece composition (proposal 0050 §6.3). The
# FailureIsolatedEvent's attempt_index should reflect the final retry
# attempt (1) per §6.3's "same lineage tuple NodeEvent carries"
# correlation rule, but python emits the node-level baseline (0):
# RetryMiddleware resets the attempt ContextVar in its finally before
# the outer isolation middleware catches. The 0050 impl claimed 0 was
# spec-confirmed via the design thread; the fixture asserts 1.
# Reconcile with spec and fix attempt_index, then un-defer.
"061-failure-isolation-retry-three-piece-composition": (
"FailureIsolatedEvent.attempt_index baseline (0) vs final-attempt (1) "
"discrepancy; reconcile with spec + fix"
),
}


Expand Down
12 changes: 12 additions & 0 deletions tests/unit/test_failure_isolation_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,25 @@ async def _flaky(_s: _DocState) -> Mapping[str, Any]:
.compile()
)

isolated: list[FailureIsolatedEvent] = []

async def _capture(event: ObserverEvent) -> None:
if isinstance(event, FailureIsolatedEvent):
isolated.append(event)

graph.attach_observer(_capture)
final = await graph.invoke(_DocState())
await graph.drain()

# Retry exhausted its 3 attempts; failure isolation then caught the
# propagated exhaustion exception and substituted the degraded value.
assert attempts["n"] == 3
assert final.note == "gave_up"
# The event's attempt_index is the final / exhausting attempt (2 after
# attempts 0/1/2), not the post-reset baseline (proposal 0050 §6.3
# lineage correlation).
assert len(isolated) == 1
assert isolated[0].attempt_index == 2


# ---------------------------------------------------------------------------
Expand Down