From 251eb975ebcabaf415c6b8c8237b21b442a16945 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 11 Jun 2026 15:37:42 -0700 Subject: [PATCH 1/3] Add degrade mode to fan-out-with-retry example Demonstrate FailureIsolationMiddleware as a third per-instance failure posture beside fail_fast and collect: wrapped outermost in the instance middleware (retry stays inner), an instance whose retries exhaust is caught and degraded to a placeholder so the batch finishes intact. Rename the demo env knob COLLECT_MODE to MODE (fail_fast/collect/degrade) and update the example and middleware concept docs. --- docs/concepts/middleware.md | 6 ++ docs/examples/fan-out-with-retry.md | 51 ++++++++---- examples/fan-out-with-retry/main.py | 118 ++++++++++++++++++---------- 3 files changed, 120 insertions(+), 55 deletions(-) diff --git a/docs/concepts/middleware.md b/docs/concepts/middleware.md index 64d1743..c009fed 100644 --- a/docs/concepts/middleware.md +++ b/docs/concepts/middleware.md @@ -291,6 +291,12 @@ failure isolation, which degrades. Reverse the order and the inner isolation would swallow transients before retry ever saw them, defeating the retry entirely. +The [fan-out with retry example](../examples/fan-out-with-retry.md) +applies this composition as `instance_middleware` in its `degrade` +mode: each fan-out instance is wrapped isolation-outer / retry-inner, +so an instance whose retries exhaust degrades to a placeholder result +and the batch finishes instead of aborting. + ## Related - [Parallel branches](parallel-branches.md): per-branch middleware diff --git a/docs/examples/fan-out-with-retry.md b/docs/examples/fan-out-with-retry.md index a945509..77ad84f 100644 --- a/docs/examples/fan-out-with-retry.md +++ b/docs/examples/fan-out-with-retry.md @@ -19,11 +19,15 @@ The per-instance subgraph is small (`summarize → classify`) and would also run standalone against a single headline. Fan-out multiplies it out across the batch. -A second mode, controlled by the `COLLECT_MODE` env var, exercises -the failure path. With `COLLECT_MODE=1` the demo prepends a -sentinel headline that always raises `ProviderUnavailable`; under -`error_policy="collect"` the failure lands in -`state.instance_errors` and the rest of the batch completes. +The `MODE` env var selects the per-instance failure posture. The +default `fail_fast` aborts the batch on the first instance whose +retries exhaust. `collect` and `degrade` both prepend a sentinel +headline that always raises `ProviderUnavailable`, then handle it +differently: `collect` lands the failure in `state.instance_errors` +and finishes the rest of the batch, while `degrade` wraps each +instance in `FailureIsolationMiddleware` so an exhausted instance is +caught and replaced with a placeholder summary, leaving the batch +intact. ## What it teaches @@ -42,7 +46,11 @@ sentinel headline that always raises `ProviderUnavailable`; under - `concurrency=3` capping how many instances run in flight at once. - `error_policy="fail_fast"` (default, first exhausted-retry failure aborts the batch) vs `"collect"` (failures land in - `errors_field` and the batch produces partial results). + `errors_field` and the batch produces partial results). The + `degrade` mode keeps `fail_fast` but adds + `FailureIsolationMiddleware` as the outermost instance middleware, + so an exhausted instance is caught and degraded to a placeholder + before the fan-out ever sees the failure. - A `fan_out_config_observer` reads `NodeEvent.fan_out_config` on the fan-out node's dispatch event, recording the resolved `item_count` / `concurrency` / @@ -77,10 +85,15 @@ uv sync --group examples LLM_API_KEY=sk-... uv run python examples/fan-out-with-retry/main.py ``` -To exercise the collect path with a synthetic failure: +To exercise a failure posture with a synthetic failure: ```bash -COLLECT_MODE=1 LLM_API_KEY=sk-... \ +# record the failure and finish the batch +MODE=collect LLM_API_KEY=sk-... \ + uv run python examples/fan-out-with-retry/main.py + +# degrade the failed instance to a placeholder and finish the batch +MODE=degrade LLM_API_KEY=sk-... \ uv run python examples/fan-out-with-retry/main.py ``` @@ -103,7 +116,9 @@ flowchart TD `headline_runs` is the fan-out node. At dispatch time it expands into N copies of the per-instance subgraph, one per headline. -`RetryMiddleware` and `TimingMiddleware` wrap each instance. +`RetryMiddleware` and `TimingMiddleware` wrap each instance (plus +`FailureIsolationMiddleware` as the outermost layer in `degrade` +mode). ## Reading the output @@ -112,7 +127,7 @@ A clean default-mode run (`fail_fast`, all instances succeed): ``` ======================================================================== Summarizing 5 headlines in parallel (concurrency=3) -error_policy='fail_fast' +mode='fail_fast' ======================================================================== [observer] fan-out node 'headline_runs' dispatching: item_count=5 concurrency=3 error_policy='fail_fast' @@ -154,8 +169,16 @@ Per-instance timings (in completion order): a value near 1.0 indicates concurrency didn't help (the upstream serialized you, or instances themselves are short). -With `COLLECT_MODE=1`, the output includes the sentinel headline -at index 0 with a `(failed after retries; ...)` marker, plus a +With `MODE=collect`, the output includes the sentinel headline at +index 0 with a `(failed after retries; ...)` marker, plus a `Captured 1 per-instance error(s):` block listing the failed -`fan_out_index` and error category. The other instances complete -as usual. +`fan_out_index` and error category. The other instances complete as +usual. + +With `MODE=degrade`, the sentinel at index 0 instead shows a +placeholder result (`summary: (unavailable)`, `topic: other`) and +there is no error block: `FailureIsolationMiddleware` caught the +exhausted-retry failure and returned the degraded partial, so the +fan-out recorded the instance as a (degraded) success. The +per-instance timings still show the sentinel's failed attempts, so +you can see the retries happened before the instance was degraded. diff --git a/examples/fan-out-with-retry/main.py b/examples/fan-out-with-retry/main.py index c777cf7..35b6194 100644 --- a/examples/fan-out-with-retry/main.py +++ b/examples/fan-out-with-retry/main.py @@ -25,19 +25,23 @@ - ``instance_middleware=(RetryMiddleware(...), TimingMiddleware(...))`` wraps EACH instance's whole subgraph invocation. Retries are per-instance: a failure on headline 3 doesn't restart headlines 0-2. + In ``degrade`` mode a ``FailureIsolationMiddleware`` is prepended as + the outermost layer (retry stays inner, so it still sees raw + transients first). - ``concurrency=3`` caps how many instances run in flight at once. Use this to be polite to the upstream API. -- ``error_policy`` defaults to ``"fail_fast"``; the first instance - failure (after retries exhaust) raises and cancels siblings. Set - the ``COLLECT_MODE`` env var to switch to ``"collect"``: each - instance runs independently and per-instance failures land in - ``state.instance_errors`` instead of aborting the batch. The - ``errors_field="instance_errors"`` knob names where the records go. - Under COLLECT_MODE, the demo prepends a sentinel headline - (``[FORCE_FAIL] ...``) that ``summarize`` raises - ``ProviderUnavailable`` on; retry exhausts, the error lands in - ``instance_errors``, and the rest of the batch completes. Without - the sentinel, ``COLLECT_MODE`` would have nothing to capture. +- The ``MODE`` env var selects the per-instance failure posture. + ``"fail_fast"`` (default) raises on the first instance whose retries + exhaust and cancels its siblings. ``"collect"`` lets each instance + run independently and lands per-instance failures in + ``state.instance_errors`` (named by ``errors_field``) instead of + aborting. ``"degrade"`` wraps each instance in + ``FailureIsolationMiddleware`` (outermost) so an exhausted instance + is caught and returns a placeholder partial, leaving the batch intact + with a degraded entry in place. ``collect`` and ``degrade`` both + prepend a sentinel headline (``[FORCE_FAIL] ...``) that ``summarize`` + raises ``ProviderUnavailable`` on, so there is a failure to handle; + ``fail_fast`` keeps the list clean for the happy path. - A ``TimingRecord`` is captured per instance via an ``on_complete`` callback. ``TimingRecord`` carries the per-call duration but not the ``fan_out_index``; that index lives on observer NodeEvents instead. @@ -56,6 +60,8 @@ - ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** - ``LLM_MODEL`` defaults to ``gpt-4o-mini``. - ``LLM_API_KEY`` required (empty for local servers that don't authenticate). +- ``MODE`` defaults to ``fail_fast``. One of ``fail_fast`` / ``collect`` / + ``degrade`` (see the failure-posture bullet above). Run with: @@ -84,6 +90,8 @@ append, ) from openarmature.graph.middleware import ( + FailureIsolationMiddleware, + Middleware, RetryConfig, RetryMiddleware, TimingMiddleware, @@ -160,16 +168,17 @@ class HeadlineState(State): async def summarize(s: HeadlineState) -> Mapping[str, Any]: - # Sentinel for the COLLECT_MODE demo. Raising a transient error - # (ProviderUnavailable carries the ``provider_unavailable`` - # category, which retry's default classifier recognizes as - # retryable) lets the retry middleware exhaust its 3 attempts; - # the final failure then surfaces according to the fan-out's - # error_policy. Under fail_fast (default), the batch aborts. - # Under collect, the failure lands in instance_errors and the - # batch produces partial results. + # Sentinel for the collect / degrade failure-path demos (those modes + # prepend a [FORCE_FAIL] headline). Raising a transient error + # (ProviderUnavailable carries the ``provider_unavailable`` category, + # which retry's default classifier recognizes as retryable) lets the + # retry middleware exhaust its 3 attempts; the final failure then + # surfaces according to MODE: under collect it lands in + # instance_errors and the batch produces partial results; under + # degrade FailureIsolationMiddleware catches it and substitutes a + # placeholder so the batch finishes intact. if "[FORCE_FAIL]" in s.headline: - raise ProviderUnavailable("synthetic failure: provider unavailable (COLLECT_MODE demo)") + raise ProviderUnavailable("synthetic failure: provider unavailable (failure-path demo)") content = await _chat( system=( "Rewrite the headline as one short sentence (~15 words) that would work as a lead. No preamble." @@ -249,15 +258,23 @@ async def present(s: BatchState) -> Mapping[str, Any]: return {"trace": ["present"]} -def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]: +def build_graph(mode: str = "fail_fast") -> CompiledGraph[BatchState]: """Build the fan-out demo graph. - ``error_policy`` switches between ``"fail_fast"`` (default; first - exhausted-retry failure raises and cancels the rest) and - ``"collect"`` (each instance runs independently; failures land in - ``state.instance_errors`` and the batch produces partial results). + ``mode`` selects the per-instance failure posture: + + - ``"fail_fast"`` (default): the first instance whose retries + exhaust raises and cancels the rest. + - ``"collect"``: each instance runs independently; failures land in + ``state.instance_errors`` and the batch produces partial results. + - ``"degrade"``: each instance is additionally wrapped (outermost) + in ``FailureIsolationMiddleware``; an instance whose retries + exhaust is caught and returns a placeholder partial, so the batch + completes with a degraded entry in place rather than aborting or + dropping it. + The smoke test calls this with no argument, exercising the default - path; main() lets the COLLECT_MODE env var flip to collect. + path; main() lets the MODE env var pick the posture. """ headline_subgraph = build_headline_subgraph() @@ -275,6 +292,25 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]: clock=time.monotonic, ) + instance_middleware: tuple[Middleware, ...] = (retry, timing) + error_policy = "fail_fast" + if mode == "collect": + error_policy = "collect" + elif mode == "degrade": + # Outermost instance middleware: catches the exception retry + # re-raises once its attempts exhaust and returns a degraded + # partial in place of the instance result, so the batch finishes + # instead of aborting (fail_fast) or dropping the instance + # (collect). Retry stays inner so it still sees raw transients + # first. The degraded mapping is keyed the way the fan-out + # projects an instance: the collect_field (``summary``) plus + # each parent extra_outputs key (``topics``). + degrade = FailureIsolationMiddleware( + degraded_update={"summary": "(unavailable)", "topics": "other"}, + event_name="headline_degraded", + ) + instance_middleware = (degrade, retry, timing) + return ( GraphBuilder(BatchState) .add_node("announce", announce) @@ -287,7 +323,7 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]: target_field="summaries", extra_outputs={"topics": "topic"}, concurrency=3, - instance_middleware=(retry, timing), + instance_middleware=instance_middleware, error_policy=error_policy, errors_field="instance_errors", ) @@ -336,23 +372,23 @@ async def main() -> None: # doesn't accumulate timings across invocations. _timings.clear() - # Set COLLECT_MODE=1 to switch the fan-out error policy from the - # default fail_fast to collect. Under collect, each instance runs - # independently and per-instance failures (after retries exhaust) - # land in state.instance_errors instead of aborting the batch. - error_policy = "collect" if os.environ.get("COLLECT_MODE") else "fail_fast" - graph = build_graph(error_policy=error_policy) + # MODE selects the per-instance failure posture: fail_fast (default, + # abort on the first exhausted-retry failure), collect (record + # failures in state.instance_errors and finish the batch), or + # degrade (FailureIsolationMiddleware catches an exhausted instance + # and substitutes a placeholder so the batch finishes intact). + mode = os.environ.get("MODE", "fail_fast") + graph = build_graph(mode=mode) graph.attach_observer(fan_out_config_observer) - # Under COLLECT_MODE, prepend a deliberately-failing headline so - # the collect path is exercised end-to-end: retry middleware - # exhausts on the sentinel, the failure lands in - # state.instance_errors, and the rest of the batch completes. - # Default (fail_fast) keeps the headline list clean so the demo's + # collect and degrade both need a failure to demonstrate, so prepend + # a deliberately-failing headline that summarize() always raises on. + # collect lands it in state.instance_errors; degrade catches it and + # substitutes a placeholder. fail_fast keeps the list clean so the # happy path runs to completion. - if error_policy == "collect": + if mode in ("collect", "degrade"): headlines = [ - "[FORCE_FAIL] Synthetic failing headline for the COLLECT_MODE demo", + "[FORCE_FAIL] Synthetic failing headline for the failure-path demo", *HEADLINES, ] else: @@ -361,7 +397,7 @@ async def main() -> None: print("=" * 72) print(f"Summarizing {len(headlines)} headlines in parallel (concurrency=3)") - print(f"error_policy={error_policy!r}") + print(f"mode={mode!r}") print("=" * 72) print() From cbbd6c7fed585b3c6d3bbce92d489274d1987219 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 11 Jun 2026 15:38:00 -0700 Subject: [PATCH 2/3] Add call-level retry to chat-with-multimodal Pass complete(retry=RetryConfig(...)) on the respond node to retry the LLM wire call on transient categories, and grow the example's failure-handling rundown from three placements to four. Update the example and LLMs concept docs. --- docs/concepts/llms.md | 3 +++ docs/examples/chat-with-multimodal.md | 6 ++++++ examples/chat-with-multimodal/main.py | 30 +++++++++++++++++++-------- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/docs/concepts/llms.md b/docs/concepts/llms.md index be9c2f7..ebcf1df 100644 --- a/docs/concepts/llms.md +++ b/docs/concepts/llms.md @@ -643,3 +643,6 @@ classifier won't do this for them. agent-loop pattern with two local tools. - [Examples: Multimodal prompt](../examples/multimodal-prompt.md) for content blocks alongside versioned prompts. +- [Examples: Chat with multimodal](../examples/chat-with-multimodal.md) + for call-level `complete(retry=...)` on a multi-turn chat node, + alongside the other LLM-failure-handling placements. diff --git a/docs/examples/chat-with-multimodal.md b/docs/examples/chat-with-multimodal.md index feda8f9..579a431 100644 --- a/docs/examples/chat-with-multimodal.md +++ b/docs/examples/chat-with-multimodal.md @@ -65,6 +65,12 @@ turn's `render()` injects the grown history into the placeholder. framework dispatching them; this example shows how the prompt-management layer composes a multi-turn conversation. A production chat agent often combines both. +- Handling transient LLM failures. The `respond` node passes + [`complete(retry=...)`](../concepts/llms.md) for call-level retry + (retrying just the provider call, not the whole node), and `main()` + catches `NodeException` at the `invoke()` boundary to surface the + failure category. The module docstring enumerates the full set of + placement options. ## How to run diff --git a/examples/chat-with-multimodal/main.py b/examples/chat-with-multimodal/main.py index 1dbdaae..57c82c0 100644 --- a/examples/chat-with-multimodal/main.py +++ b/examples/chat-with-multimodal/main.py @@ -51,11 +51,13 @@ (`provider_rate_limit`, `provider_invalid_request`, etc.) in the error message. The image URL failure mode (OpenAI's fetcher hitting a CDN that blocks it) lands here as - `provider_invalid_request`. Three legitimate places to handle + `provider_invalid_request`. Four legitimate places to handle this in production: caller-side `try / except NodeException` - (shown here), `RetryMiddleware` wrapping the respond node for - transient categories, or a `try / except LlmProviderError` - inside the node body returning a fallback response. + (shown here), call-level retry via `complete(retry=...)` for + transient categories (also shown here, on the respond node), + `RetryMiddleware` wrapping the whole respond node, or a + `try / except LlmProviderError` inside the node body returning a + fallback response. **Configuration** (env vars; OpenAI defaults shown): @@ -118,6 +120,7 @@ State, append, ) +from openarmature.graph.middleware import RetryConfig from openarmature.llm import ( AssistantMessage, LlmProviderError, @@ -330,9 +333,17 @@ async def respond(state: ChatState) -> dict[str, Any]: placeholders={"history": state.history}, ) + # Call-level retry: retry only the provider wire call on transient + # categories (provider_unavailable, provider_rate_limit, ...), using + # the default classifier and backoff. It is terminal-only, so the + # node still sees exactly one completion (or one final failure) even + # when an attempt was retried underneath. Contrast with a + # RetryMiddleware on the node, which re-runs the whole node body + # (re-render + re-send) on each retry. response = await _get_provider().complete( rendered.messages, config=RuntimeConfig(temperature=0.0, max_tokens=400), + retry=RetryConfig(max_attempts=3), ) # The rendered messages include [system, *history, current_user] @@ -496,10 +507,10 @@ async def main() -> None: # it's an ``LlmProviderError`` we surface the canonical # ``.category`` string (``provider_rate_limit``, # ``provider_invalid_request``, etc.) so the failure mode is - # immediately greppable. This is one of three legitimate places - # to handle the error; see the docstring for the other two - # (``RetryMiddleware`` wrapping the node, ``try/except`` inside - # the node body). + # immediately greppable. This is one of four legitimate places + # to handle the error; see the docstring for the others + # (call-level ``complete(retry=...)``, ``RetryMiddleware`` wrapping + # the node, ``try/except`` inside the node body). final: ChatState | None = None try: final = await graph.invoke(initial) @@ -512,8 +523,9 @@ async def main() -> None: print() print(f"*** node {exc.node_name!r} failed ({category}): {cause} ***") print() - print("Three places to handle this in production code:") + print("Four places to handle this in production code:") print(" - Caller-side try/except NodeException (this example).") + print(" - Call-level complete(retry=...) on the wire call (this example).") print(" - RetryMiddleware on the node for transient categories.") print(" - try/except inside the node body returning a fallback.") finally: From 040a5de603280d0c14767c9f4c525f96d46c5eb3 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 11 Jun 2026 16:20:51 -0700 Subject: [PATCH 3/3] Validate mode in fan-out-with-retry build_graph Raise ValueError on an unknown mode instead of silently falling back to fail_fast, so a mistyped MODE fails loudly. The docs assert the three valid values. --- examples/fan-out-with-retry/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/fan-out-with-retry/main.py b/examples/fan-out-with-retry/main.py index 35b6194..37843fb 100644 --- a/examples/fan-out-with-retry/main.py +++ b/examples/fan-out-with-retry/main.py @@ -276,6 +276,8 @@ def build_graph(mode: str = "fail_fast") -> CompiledGraph[BatchState]: The smoke test calls this with no argument, exercising the default path; main() lets the MODE env var pick the posture. """ + if mode not in ("fail_fast", "collect", "degrade"): + raise ValueError(f"mode must be one of fail_fast / collect / degrade; got {mode!r}") headline_subgraph = build_headline_subgraph() retry = RetryMiddleware(