From 10384adb36ff3c61e3de371265ca3481e34ff7dc Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Fri, 12 Jun 2026 08:41:17 -0700 Subject: [PATCH 1/2] Wire 0050 failure-isolation conformance fixtures Wire proposal 0050's failure-isolation conformance fixtures (pipeline-utilities/058-063) into the harness: a failure_isolation middleware directive, the expected_failure_isolation_event assertion with a FailureIsolatedEvent capture observer, callable/predicate/ on_caught variant conventions, and an extended raises directive. 058/059/060/062/063 pass; 061 is execution-deferred for a FailureIsolatedEvent.attempt_index discrepancy being reconciled with spec. The llm-provider/056-058 call-level-retry fixtures stay deferred: they assert the per-attempt span surface 0050 deferred (partial). --- conformance.toml | 8 +- tests/conformance/adapter.py | 26 ++- tests/conformance/harness/directives.py | 24 ++- tests/conformance/harness/expectations.py | 9 + tests/conformance/test_fixture_parsing.py | 41 ++-- tests/conformance/test_pipeline_utilities.py | 188 ++++++++++++++++++- 6 files changed, 264 insertions(+), 32 deletions(-) diff --git a/conformance.toml b/conformance.toml index 1a4b602..7d8c3a8 100644 --- a/conformance.toml +++ b/conformance.toml @@ -385,7 +385,13 @@ since = "0.13.0" # per-attempt spans require a dedicated within-call sub-event # (LlmRetryAttemptEvent) scoped to a future cycle. Call-level retry # ships terminal-only: exactly one LlmCompletionEvent / LlmFailedEvent -# per ``complete()`` call. +# per ``complete()`` call. Failure-isolation conformance fixtures +# (058-063) are wired + passing this cycle EXCEPT 061 (three-piece +# composition): the FailureIsolatedEvent's attempt_index there is the +# node-level baseline (0) while the fixture asserts the final retry +# attempt (1) per §6.3's lineage-correlation rule — RetryMiddleware +# resets the attempt ContextVar before the outer isolation middleware +# catches. That attempt_index reconciliation is pending. [proposals."0050"] status = "partial" since = "0.14.0" diff --git a/tests/conformance/adapter.py b/tests/conformance/adapter.py index 89a0d71..d3264e5 100644 --- a/tests/conformance/adapter.py +++ b/tests/conformance/adapter.py @@ -142,14 +142,36 @@ async def fn(_state: Any) -> Mapping[str, Any]: return fn +_RAISES_EXCEPTION_KINDS: dict[str, type[Exception]] = { + "ValueError": ValueError, + "RuntimeError": RuntimeError, + "TypeError": TypeError, + "KeyError": KeyError, +} + + def _make_raising_fn( node_name: str, - message: str, + raises_spec: str | Mapping[str, Any], trace: list[str], ) -> Callable[[Any], Awaitable[Mapping[str, Any]]]: + # Two shapes: a bare message string (fixture 006) raises RuntimeError; + # a ``{message, exception_kind}`` dict (fixture 063) raises the named + # exception type with that message (an uncategorized error, so a + # wrapping failure-isolation event reports a null category). + if isinstance(raises_spec, Mapping): + message = str(raises_spec.get("message", "")) + kind = str(raises_spec.get("exception_kind", "RuntimeError")) + if kind not in _RAISES_EXCEPTION_KINDS: + raise ValueError(f"unsupported raises exception_kind: {kind}") + exc_type = _RAISES_EXCEPTION_KINDS[kind] + else: + message = raises_spec + exc_type = RuntimeError + async def fn(_state: Any) -> Mapping[str, Any]: trace.append(node_name) - raise RuntimeError(message) + raise exc_type(message) return fn diff --git a/tests/conformance/harness/directives.py b/tests/conformance/harness/directives.py index 721542d..b7c22e6 100644 --- a/tests/conformance/harness/directives.py +++ b/tests/conformance/harness/directives.py @@ -415,7 +415,7 @@ class NodeSpec(_ForbidExtras): update_pure: dict[str, Any] | None = None update_pure_from_state: dict[str, Any] | None = None update_from_field: UpdateFromFieldSpec | None = None - raises: str | None = None + raises: str | dict[str, Any] | None = None subgraph: str | None = None fan_out: FanOutSpec | None = None parallel_branches: ParallelBranchesSpec | None = None @@ -516,12 +516,31 @@ class TraceRecorderMiddleware(_AllowExtras): type: Literal["trace_recorder"] +class FailureIsolationMiddleware(_AllowExtras): + """Canonical failure-isolation middleware (proposal 0050 §6.3, + fixtures 058-063). Catches an exception escaping the inner chain and + returns a configured degraded partial update, emitting a distinct + ``FailureIsolatedEvent``.""" + + type: Literal["failure_isolation"] + # Static partial-update mapping, or the callable encoding + # ``{callable: state_derived, template, target_field}`` (fixture 059). + degraded_update: dict[str, Any] + event_name: str + # Optional ``{matches_category: }`` predicate (fixture 060). + predicate: dict[str, Any] | None = None + # Optional on_caught hook + # ``{kind, increment_field, capture_message_field}`` (fixture 062). + on_caught: dict[str, Any] | None = None + + MiddlewareSpec = Annotated[ RetryMiddleware | TimingMiddleware | ErrorRecoveryMiddleware | ShortCircuitMiddleware - | TraceRecorderMiddleware, + | TraceRecorderMiddleware + | FailureIsolationMiddleware, Field(discriminator="type"), ] @@ -642,6 +661,7 @@ class LlmCallSpec(_AllowExtras): "EdgeSpec", "EmitsLogSpec", "ErrorRecoveryMiddleware", + "FailureIsolationMiddleware", "FailureSpec", "FanOutSpec", "FlakyByIndexSpec", diff --git a/tests/conformance/harness/expectations.py b/tests/conformance/harness/expectations.py index 1527535..7e610af 100644 --- a/tests/conformance/harness/expectations.py +++ b/tests/conformance/harness/expectations.py @@ -135,6 +135,13 @@ class PipelineUtilitiesExpected(_ForbidExtras): observer_event_invariants: dict[str, Any] | None = None # Singular form used by 015 — assert one specific event shape. expected_observer_event: dict[str, Any] | None = None + # Failure-isolation event (proposal 0050 §6.3, fixtures 058-063). + expected_failure_isolation_event: dict[str, Any] | None = None + # 060 negative case: assert NO failure-isolation event fired. + no_failure_isolation_event: bool | None = None + # 061 three-piece: per-attempt NodeEvent assertions (driven by the + # retry path; modeled here so the fixture parses). + expected_attempt_events: list[dict[str, Any]] | None = None # Checkpointing fixtures (024–031). checkpoint_saves: list[dict[str, Any]] | None = None latest_record_assertions: dict[str, Any] | None = None @@ -213,6 +220,8 @@ class ObservabilityExpected(_ForbidExtras): "timing_records", "trace_records", "expected_observer_event", + "expected_failure_isolation_event", + "no_failure_isolation_event", "recoverable_state", } ) diff --git a/tests/conformance/test_fixture_parsing.py b/tests/conformance/test_fixture_parsing.py index 643e99a..0915f52 100644 --- a/tests/conformance/test_fixture_parsing.py +++ b/tests/conformance/test_fixture_parsing.py @@ -407,33 +407,22 @@ def _id(case: tuple[str, Path]) -> str: "graph-engine/038-reducer-error-non-list-update": ( "Proposal 0023 canonical state reducers; impl not yet shipped" ), - # Proposal 0050 (failure-isolation middleware + call-level retry, - # v0.42.0) — llm-provider fixtures 056-058 (call-level retry) and - # pipeline-utilities fixtures 058-063 (failure-isolation - # middleware) require new directive shapes. Queued for v0.14.0 - # retry & reliability primitives batch. - "llm-provider/056-call-level-retry-transient": ("Proposal 0050 call-level retry; queued for v0.14.0"), - "llm-provider/057-call-level-retry-exhaustion": ("Proposal 0050 call-level retry; queued for v0.14.0"), - "llm-provider/058-call-level-retry-non-transient-no-retry": ( - "Proposal 0050 call-level retry; queued for v0.14.0" - ), - "pipeline-utilities/058-failure-isolation-static-degraded": ( - "Proposal 0050 failure-isolation middleware; queued for v0.14.0" - ), - "pipeline-utilities/059-failure-isolation-callable-degraded": ( - "Proposal 0050 failure-isolation middleware; queued for v0.14.0" - ), - "pipeline-utilities/060-failure-isolation-predicate-filtering": ( - "Proposal 0050 failure-isolation middleware; queued for v0.14.0" + # Proposal 0050 call-level retry — llm-provider fixtures 056-058 + # assert the per-attempt LLM span surface (N spans + + # ``openarmature.llm.attempt_index``) that python deferred under + # decision (b); 0050 is marked ``partial`` accordingly. They stay + # deferred until a future LlmRetryAttemptEvent cycle implements + # per-attempt spans. (pipeline-utilities failure-isolation fixtures + # 058-063 now parse + run via test_pipeline_utilities.py; 061 is + # execution-deferred there for the attempt_index reconciliation.) + "llm-provider/056-call-level-retry-transient": ( + "Proposal 0050 call-level retry asserts the deferred per-attempt span surface (0050 partial)" + ), + "llm-provider/057-call-level-retry-exhaustion": ( + "Proposal 0050 call-level retry asserts the deferred per-attempt span surface (0050 partial)" ), - "pipeline-utilities/061-failure-isolation-retry-three-piece-composition": ( - "Proposal 0050 failure-isolation middleware; queued for v0.14.0" - ), - "pipeline-utilities/062-failure-isolation-on-caught-callback": ( - "Proposal 0050 failure-isolation middleware; queued for v0.14.0" - ), - "pipeline-utilities/063-failure-isolation-default-predicate-bare-exception": ( - "Proposal 0050 failure-isolation middleware; queued for v0.14.0" + "llm-provider/058-call-level-retry-non-transient-no-retry": ( + "Proposal 0050 call-level retry asserts the deferred per-attempt span surface (0050 partial)" ), # Proposal 0052 (implementation attribution attributes, v0.44.0): # observability/059 is the Langfuse-side mapping fixture; uses the diff --git a/tests/conformance/test_pipeline_utilities.py b/tests/conformance/test_pipeline_utilities.py index 80acac5..a8766b9 100644 --- a/tests/conformance/test_pipeline_utilities.py +++ b/tests/conformance/test_pipeline_utilities.py @@ -15,6 +15,7 @@ from __future__ import annotations +import re from collections.abc import Callable, Mapping from pathlib import Path from typing import Any, cast @@ -23,11 +24,15 @@ import yaml from openarmature.graph import ( + FailureIsolatedEvent, NodeException, + ObserverEvent, ParallelBranchesBranchFailed, RuntimeGraphError, ) from openarmature.graph.middleware import ( + DegradedUpdate, + FailureIsolationMiddleware, Middleware, OnCompleteCallback, RetryConfig, @@ -78,6 +83,14 @@ def _load(path: Path) -> dict[str, Any]: # the `cases:` shape carries seeded-record + migrations + resume blocks. _LAST_DRIVEN_FIXTURE = 38 +# Failure-isolation fixtures (058-063, proposal 0050 §6.3) are middleware +# fixtures this runner handles. They sit past _LAST_DRIVEN_FIXTURE only +# because the 039-057 range (state migration / checkpoint fan-out) is owned +# by dedicated runners (test_state_migration.py / test_checkpoint.py), not +# because this runner can't drive them. Fixture 064 (cause fidelity) joins +# when the spec pin advances to v0.55.0. +_FAILURE_ISOLATION_FIXTURES = frozenset(range(58, 64)) + def _fixture_paths() -> list[Path]: paths = sorted(CONFORMANCE_DIR.glob("[0-9][0-9][0-9]-*.yaml")) @@ -87,7 +100,7 @@ def _fixture_paths() -> list[Path]: number = int(p.stem.split("-", 1)[0]) except ValueError: continue - if number <= _LAST_DRIVEN_FIXTURE: + if number <= _LAST_DRIVEN_FIXTURE or number in _FAILURE_ISOLATION_FIXTURES: out.append(p) return out @@ -120,6 +133,18 @@ def _fixture_id(path: Path) -> str: "029-checkpoint-subgraph-resume": "checkpointing (test_checkpoint.py)", "030-checkpoint-not-found": "checkpointing (test_checkpoint.py)", "031-checkpoint-correlation-id-preserved-across-resume": "checkpointing (test_checkpoint.py)", + # Failure-isolation three-piece composition (proposal 0050 §6.3). The + # FailureIsolatedEvent's attempt_index should reflect the final retry + # attempt (1) per §6.3's "same lineage tuple NodeEvent carries" + # correlation rule, but python emits the node-level baseline (0): + # RetryMiddleware resets the attempt ContextVar in its finally before + # the outer isolation middleware catches. The 0050 impl claimed 0 was + # spec-confirmed via the design thread; the fixture asserts 1. + # Reconcile with spec and fix attempt_index, then un-defer. + "061-failure-isolation-retry-three-piece-composition": ( + "FailureIsolatedEvent.attempt_index baseline (0) vs final-attempt (1) " + "discrepancy; reconcile with spec + fix" + ), } @@ -162,6 +187,7 @@ def _unsupported_middleware(spec: dict[str, Any]) -> str | None: "state_inspector", "retry", "timing", + "failure_isolation", } ) per_graph = cast("list[dict[str, Any]]", middleware_block.get("per_graph") or []) @@ -193,6 +219,10 @@ def __init__(self) -> None: self.trace_records: dict[str, list[TraceRecord]] = {} self.timing_records: dict[str, list[TimingRecord]] = {} self.state_inspector: dict[str, list[bool]] = {} + # Failure-isolation on_caught side channel (fixture 062): each + # entry records {increment_field, capture_message_field, count, + # message}; the harness overlays count/message onto final_state. + self.on_caught: list[dict[str, Any]] = [] # --------------------------------------------------------------------------- @@ -255,6 +285,8 @@ async def on_complete(record: TimingRecord) -> None: on_complete=cb, clock=clock, ) + if mw_type == "failure_isolation": + return _build_failure_isolation(config, sinks) raise ValueError(f"unknown middleware type: {mw_type}") @@ -352,6 +384,88 @@ def classifier(_exc: Exception, state: Any) -> bool: raise ValueError(f"unknown classifier type: {cls_type}") +def _render_state_template(template: str, state: Any) -> str: + """Render a ``{{ state. }}`` template against a State instance + (fixture 059's callable degraded_update). Minimal substitution: the + only template shape the failure-isolation fixtures use.""" + return re.sub( + r"\{\{\s*state\.(\w+)\s*\}\}", + lambda m: str(getattr(state, m.group(1))), + template, + ) + + +def _build_isolation_predicate( + config: Mapping[str, Any] | None, +) -> Callable[[Exception], bool] | None: + """Build a FailureIsolationMiddleware predicate from a fixture + ``predicate`` block. Supports ``{matches_category: }`` + (fixture 060): catch only exceptions carrying that category.""" + if config is None: + return None + if "matches_category" in config: + target = cast("str", config["matches_category"]) + + def predicate(exc: Exception) -> bool: + return getattr(exc, "category", None) == target + + return predicate + raise ValueError(f"unsupported failure_isolation predicate: {dict(config)}") + + +def _build_failure_isolation(config: Mapping[str, Any], sinks: CaptureSinks) -> Middleware: + """Build the canonical FailureIsolationMiddleware from a fixture + ``failure_isolation`` config (fixtures 058-063).""" + degraded_raw = config["degraded_update"] + degraded: DegradedUpdate + if isinstance(degraded_raw, dict): + degraded_dict = cast("dict[str, Any]", degraded_raw) + if degraded_dict.get("callable") == "state_derived": + # Callable form (059): the callable receives the pre-merge + # state and renders the template into the target field. + template = cast("str", degraded_dict["template"]) + target = cast("str", degraded_dict["target_field"]) + + def degraded_from_state(state: Any) -> Mapping[str, Any]: + return {target: _render_state_template(template, state)} + + degraded = degraded_from_state + else: + degraded = dict(degraded_dict) + else: + degraded = dict(cast("Mapping[str, Any]", degraded_raw)) + + on_caught = None + on_caught_cfg = cast("Mapping[str, Any] | None", config.get("on_caught")) + if on_caught_cfg is not None: + kind = on_caught_cfg.get("kind") + if kind != "record_to_state_side_channel": + raise ValueError(f"unsupported on_caught kind: {kind}") + # The callback only receives the exception, so it records the + # invocation count + message into a side channel the harness + # overlays onto final_state after the run (fixture 062). + record: dict[str, Any] = { + "increment_field": cast("str", on_caught_cfg["increment_field"]), + "capture_message_field": cast("str", on_caught_cfg["capture_message_field"]), + "count": 0, + "message": "", + } + sinks.on_caught.append(record) + + async def on_caught_cb(_exc: Exception) -> None: + record["count"] += 1 + record["message"] = str(_exc) + + on_caught = on_caught_cb + + return FailureIsolationMiddleware( + degraded_update=degraded, + event_name=cast("str", config["event_name"]), + predicate=_build_isolation_predicate(cast("Mapping[str, Any] | None", config.get("predicate"))), + on_caught=on_caught, + ) + + def _build_clock_stub(config: Mapping[str, Any]) -> Callable[[], float]: """Return a deterministic-monotonic clock function per the fixture's ``clock_stub`` config. Each call advances the counter by a fixed @@ -417,6 +531,17 @@ async def _run_one(spec: Mapping[str, Any], monkeypatch: pytest.MonkeyPatch) -> sinks = CaptureSinks() clock = _build_clock_stub(spec["clock_stub"]) if "clock_stub" in spec else None + + # Capture failure-isolation events (proposal 0050 §6.3, fixtures + # 058-063) for the expected_failure_isolation_event / + # no_failure_isolation_event assertions. Attached to every graph the + # fixture runs below; only FailureIsolatedEvents are collected. + captured_isolation: list[FailureIsolatedEvent] = [] + + async def _capture_isolation(event: ObserverEvent) -> None: + if isinstance(event, FailureIsolatedEvent): + captured_isolation.append(event) + graph_mw, node_mw = _translate_middleware_block(spec.get("middleware"), sinks, clock) fan_out_inst_mw = _translate_fan_out_instance_middleware(spec, sinks, clock) del monkeypatch # retained in signature for future stubs that need it @@ -478,6 +603,7 @@ async def _run_one(spec: Mapping[str, Any], monkeypatch: pytest.MonkeyPatch) -> parallel_branches_branch_middleware=branch_middleware, ) compiled = built.builder.compile() + compiled.attach_observer(_capture_isolation) initial = built.initial_state(spec.get("initial_state", {})) with pytest.raises(RuntimeGraphError) as excinfo: await compiled.invoke(initial) @@ -512,6 +638,8 @@ async def _run_one(spec: Mapping[str, Any], monkeypatch: pytest.MonkeyPatch) -> cast("Mapping[str, list[Mapping[str, Any]]] | None", expected.get("trace_records")), sinks, ) + if expected.get("no_failure_isolation_event"): + assert captured_isolation == [], f"expected no FailureIsolatedEvent, got {captured_isolation}" return # Per-run state: each run uses its own freshly built middleware so @@ -561,6 +689,8 @@ async def _run_one(spec: Mapping[str, Any], monkeypatch: pytest.MonkeyPatch) -> obs = make_observer_fn(ofx, run_delivery) if ofx.attach == "graph" and ofx.target == "outer": run_compiled.attach_observer(obs, phases=phases) + if run_idx == 0: + run_compiled.attach_observer(_capture_isolation) run_final = await run_compiled.invoke(run_initial) await run_compiled.drain() final_states.append(run_final.model_dump()) @@ -568,6 +698,13 @@ async def _run_one(spec: Mapping[str, Any], monkeypatch: pytest.MonkeyPatch) -> if run_idx == 0: observer_fixtures = run_observer_fixtures + # Overlay the on_caught side channel (fixture 062) onto the final + # state: the callback can't write graph state directly, so the harness + # reflects its recorded count + message into the fields the fixture names. + for rec in sinks.on_caught: + final_states[0][rec["increment_field"]] = rec["count"] + final_states[0][rec["capture_message_field"]] = rec["message"] + if "final_state" in expected: _assert_final_state(final_states[0], expected["final_state"], spec) if "execution_order" in expected: @@ -586,6 +723,14 @@ async def _run_one(spec: Mapping[str, Any], monkeypatch: pytest.MonkeyPatch) -> sinks, ) + if "expected_failure_isolation_event" in expected: + _assert_failure_isolation_event( + captured_isolation, + cast("Mapping[str, Any]", expected["expected_failure_isolation_event"]), + ) + if expected.get("no_failure_isolation_event"): + assert captured_isolation == [], f"expected no FailureIsolatedEvent, got {captured_isolation}" + if "observer_event_invariants" in expected: _check_parallel_branches_invariants( cast("Mapping[str, Any]", expected["observer_event_invariants"]), @@ -640,6 +785,47 @@ def _collect_parallel_branches_errors_fields(spec: Mapping[str, Any]) -> set[str return out +def _state_to_dict(state: Any) -> dict[str, Any]: + """Dump a State (or mapping) to a plain dict for comparison.""" + if hasattr(state, "model_dump"): + return cast("dict[str, Any]", state.model_dump()) + return dict(cast("Mapping[str, Any]", state)) + + +def _assert_failure_isolation_event( + captured: list[FailureIsolatedEvent], + expected: Mapping[str, Any], +) -> None: + """Assert the single captured FailureIsolatedEvent against the + fixture's ``expected_failure_isolation_event`` block. Only the keys + the fixture supplies are checked (some fixtures assert just + event_name + caught_exception).""" + assert len(captured) == 1, f"expected exactly one FailureIsolatedEvent, got {len(captured)}" + ev = captured[0] + if "event_name" in expected: + assert ev.event_name == expected["event_name"] + lineage = cast("Mapping[str, Any] | None", expected.get("wrapped_node_lineage")) + if lineage is not None: + if "namespace" in lineage: + assert list(ev.namespace) == lineage["namespace"] + if "attempt_index" in lineage: + assert ev.attempt_index == lineage["attempt_index"] + if "fan_out_index" in lineage: + assert ev.fan_out_index == lineage["fan_out_index"] + if "branch_name" in lineage: + assert ev.branch_name == lineage["branch_name"] + if "pre_state" in expected: + assert _state_to_dict(ev.pre_state) == expected["pre_state"] + if "post_state" in expected: + assert dict(ev.post_state) == expected["post_state"] + ce = cast("Mapping[str, Any] | None", expected.get("caught_exception")) + if ce is not None: + if "category" in ce: + assert ev.caught_exception.category == ce["category"] + if "message" in ce: + assert ev.caught_exception.message == ce["message"] + + def _assert_final_state( actual: Mapping[str, Any], expected: Mapping[str, Any], From 825075bbe8be3bbc2b119f7824945e1c9a01d547 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Fri, 12 Jun 2026 08:49:14 -0700 Subject: [PATCH 2/2] Add expected_attempt_events to PU expected keys It is a pipeline-utilities-only field, so the expected-block discriminator should route on it alongside the other PU keys. --- tests/conformance/harness/expectations.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/conformance/harness/expectations.py b/tests/conformance/harness/expectations.py index 7e610af..a78db84 100644 --- a/tests/conformance/harness/expectations.py +++ b/tests/conformance/harness/expectations.py @@ -222,6 +222,7 @@ class ObservabilityExpected(_ForbidExtras): "expected_observer_event", "expected_failure_isolation_event", "no_failure_isolation_event", + "expected_attempt_events", "recoverable_state", } )