Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion conformance.toml
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@ since = "0.13.0"
# per-attempt spans require a dedicated within-call sub-event
# (LlmRetryAttemptEvent) scoped to a future cycle. Call-level retry
# ships terminal-only: exactly one LlmCompletionEvent / LlmFailedEvent
# per ``complete()`` call.
# per ``complete()`` call. Failure-isolation conformance fixtures
# (058-063) are wired + passing this cycle EXCEPT 061 (three-piece
# composition): the FailureIsolatedEvent's attempt_index there is the
# node-level baseline (0) while the fixture asserts the final retry
# attempt (1) per §6.3's lineage-correlation rule — RetryMiddleware
# resets the attempt ContextVar before the outer isolation middleware
# catches. That attempt_index reconciliation is pending.
[proposals."0050"]
status = "partial"
since = "0.14.0"
Expand Down
26 changes: 24 additions & 2 deletions tests/conformance/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,36 @@ async def fn(_state: Any) -> Mapping[str, Any]:
return fn


_RAISES_EXCEPTION_KINDS: dict[str, type[Exception]] = {
"ValueError": ValueError,
"RuntimeError": RuntimeError,
"TypeError": TypeError,
"KeyError": KeyError,
}


def _make_raising_fn(
node_name: str,
message: str,
raises_spec: str | Mapping[str, Any],
trace: list[str],
) -> Callable[[Any], Awaitable[Mapping[str, Any]]]:
# Two shapes: a bare message string (fixture 006) raises RuntimeError;
# a ``{message, exception_kind}`` dict (fixture 063) raises the named
# exception type with that message (an uncategorized error, so a
# wrapping failure-isolation event reports a null category).
if isinstance(raises_spec, Mapping):
message = str(raises_spec.get("message", ""))
kind = str(raises_spec.get("exception_kind", "RuntimeError"))
if kind not in _RAISES_EXCEPTION_KINDS:
raise ValueError(f"unsupported raises exception_kind: {kind}")
exc_type = _RAISES_EXCEPTION_KINDS[kind]
else:
message = raises_spec
exc_type = RuntimeError

async def fn(_state: Any) -> Mapping[str, Any]:
trace.append(node_name)
raise RuntimeError(message)
raise exc_type(message)

return fn

Expand Down
24 changes: 22 additions & 2 deletions tests/conformance/harness/directives.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class NodeSpec(_ForbidExtras):
update_pure: dict[str, Any] | None = None
update_pure_from_state: dict[str, Any] | None = None
update_from_field: UpdateFromFieldSpec | None = None
raises: str | None = None
raises: str | dict[str, Any] | None = None
subgraph: str | None = None
fan_out: FanOutSpec | None = None
parallel_branches: ParallelBranchesSpec | None = None
Expand Down Expand Up @@ -516,12 +516,31 @@ class TraceRecorderMiddleware(_AllowExtras):
type: Literal["trace_recorder"]


class FailureIsolationMiddleware(_AllowExtras):
"""Canonical failure-isolation middleware (proposal 0050 §6.3,
fixtures 058-063). Catches an exception escaping the inner chain and
returns a configured degraded partial update, emitting a distinct
``FailureIsolatedEvent``."""

type: Literal["failure_isolation"]
# Static partial-update mapping, or the callable encoding
# ``{callable: state_derived, template, target_field}`` (fixture 059).
degraded_update: dict[str, Any]
event_name: str
# Optional ``{matches_category: <category>}`` predicate (fixture 060).
predicate: dict[str, Any] | None = None
# Optional on_caught hook
# ``{kind, increment_field, capture_message_field}`` (fixture 062).
on_caught: dict[str, Any] | None = None


MiddlewareSpec = Annotated[
RetryMiddleware
| TimingMiddleware
| ErrorRecoveryMiddleware
| ShortCircuitMiddleware
| TraceRecorderMiddleware,
| TraceRecorderMiddleware
| FailureIsolationMiddleware,
Field(discriminator="type"),
]

Expand Down Expand Up @@ -642,6 +661,7 @@ class LlmCallSpec(_AllowExtras):
"EdgeSpec",
"EmitsLogSpec",
"ErrorRecoveryMiddleware",
"FailureIsolationMiddleware",
"FailureSpec",
"FanOutSpec",
"FlakyByIndexSpec",
Expand Down
10 changes: 10 additions & 0 deletions tests/conformance/harness/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ class PipelineUtilitiesExpected(_ForbidExtras):
observer_event_invariants: dict[str, Any] | None = None
# Singular form used by 015 — assert one specific event shape.
expected_observer_event: dict[str, Any] | None = None
# Failure-isolation event (proposal 0050 §6.3, fixtures 058-063).
expected_failure_isolation_event: dict[str, Any] | None = None
# 060 negative case: assert NO failure-isolation event fired.
no_failure_isolation_event: bool | None = None
# 061 three-piece: per-attempt NodeEvent assertions (driven by the
# retry path; modeled here so the fixture parses).
expected_attempt_events: list[dict[str, Any]] | None = None
# Checkpointing fixtures (024–031).
checkpoint_saves: list[dict[str, Any]] | None = None
latest_record_assertions: dict[str, Any] | None = None
Expand Down Expand Up @@ -213,6 +220,9 @@ class ObservabilityExpected(_ForbidExtras):
"timing_records",
"trace_records",
"expected_observer_event",
"expected_failure_isolation_event",
"no_failure_isolation_event",
Comment thread
chris-colinsky marked this conversation as resolved.
"expected_attempt_events",
"recoverable_state",
}
)
Expand Down
41 changes: 15 additions & 26 deletions tests/conformance/test_fixture_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,33 +407,22 @@ def _id(case: tuple[str, Path]) -> str:
"graph-engine/038-reducer-error-non-list-update": (
"Proposal 0023 canonical state reducers; impl not yet shipped"
),
# Proposal 0050 (failure-isolation middleware + call-level retry,
# v0.42.0) — llm-provider fixtures 056-058 (call-level retry) and
# pipeline-utilities fixtures 058-063 (failure-isolation
# middleware) require new directive shapes. Queued for v0.14.0
# retry & reliability primitives batch.
"llm-provider/056-call-level-retry-transient": ("Proposal 0050 call-level retry; queued for v0.14.0"),
"llm-provider/057-call-level-retry-exhaustion": ("Proposal 0050 call-level retry; queued for v0.14.0"),
"llm-provider/058-call-level-retry-non-transient-no-retry": (
"Proposal 0050 call-level retry; queued for v0.14.0"
),
"pipeline-utilities/058-failure-isolation-static-degraded": (
"Proposal 0050 failure-isolation middleware; queued for v0.14.0"
),
"pipeline-utilities/059-failure-isolation-callable-degraded": (
"Proposal 0050 failure-isolation middleware; queued for v0.14.0"
),
"pipeline-utilities/060-failure-isolation-predicate-filtering": (
"Proposal 0050 failure-isolation middleware; queued for v0.14.0"
# Proposal 0050 call-level retry — llm-provider fixtures 056-058
# assert the per-attempt LLM span surface (N spans +
# ``openarmature.llm.attempt_index``) that python deferred under
# decision (b); 0050 is marked ``partial`` accordingly. They stay
# deferred until a future LlmRetryAttemptEvent cycle implements
# per-attempt spans. (pipeline-utilities failure-isolation fixtures
# 058-063 now parse + run via test_pipeline_utilities.py; 061 is
# execution-deferred there for the attempt_index reconciliation.)
"llm-provider/056-call-level-retry-transient": (
"Proposal 0050 call-level retry asserts the deferred per-attempt span surface (0050 partial)"
),
"llm-provider/057-call-level-retry-exhaustion": (
"Proposal 0050 call-level retry asserts the deferred per-attempt span surface (0050 partial)"
),
"pipeline-utilities/061-failure-isolation-retry-three-piece-composition": (
"Proposal 0050 failure-isolation middleware; queued for v0.14.0"
),
"pipeline-utilities/062-failure-isolation-on-caught-callback": (
"Proposal 0050 failure-isolation middleware; queued for v0.14.0"
),
"pipeline-utilities/063-failure-isolation-default-predicate-bare-exception": (
"Proposal 0050 failure-isolation middleware; queued for v0.14.0"
"llm-provider/058-call-level-retry-non-transient-no-retry": (
"Proposal 0050 call-level retry asserts the deferred per-attempt span surface (0050 partial)"
),
# Proposal 0052 (implementation attribution attributes, v0.44.0):
# observability/059 is the Langfuse-side mapping fixture; uses the
Expand Down
Loading