Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/concepts/llms.md
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,6 @@ classifier won't do this for them.
agent-loop pattern with two local tools.
- [Examples: Multimodal prompt](../examples/multimodal-prompt.md)
for content blocks alongside versioned prompts.
- [Examples: Chat with multimodal](../examples/chat-with-multimodal.md)
for call-level `complete(retry=...)` on a multi-turn chat node,
alongside the other LLM-failure-handling placements.
6 changes: 6 additions & 0 deletions docs/concepts/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ failure isolation, which degrades. Reverse the order and the inner
isolation would swallow transients before retry ever saw them, defeating
the retry entirely.

The [fan-out with retry example](../examples/fan-out-with-retry.md)
applies this composition as `instance_middleware` in its `degrade`
mode: each fan-out instance is wrapped isolation-outer / retry-inner,
so an instance whose retries exhaust degrades to a placeholder result
and the batch finishes instead of aborting.

## Related

- [Parallel branches](parallel-branches.md): per-branch middleware
Expand Down
6 changes: 6 additions & 0 deletions docs/examples/chat-with-multimodal.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ turn's `render()` injects the grown history into the placeholder.
framework dispatching them; this example shows how the
prompt-management layer composes a multi-turn conversation. A
production chat agent often combines both.
- Handling transient LLM failures. The `respond` node passes
[`complete(retry=...)`](../concepts/llms.md) for call-level retry
(retrying just the provider call, not the whole node), and `main()`
catches `NodeException` at the `invoke()` boundary to surface the
failure category. The module docstring enumerates the full set of
placement options.

## How to run

Expand Down
51 changes: 37 additions & 14 deletions docs/examples/fan-out-with-retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ The per-instance subgraph is small (`summarize → classify`) and
would also run standalone against a single headline. Fan-out
multiplies it out across the batch.

A second mode, controlled by the `COLLECT_MODE` env var, exercises
the failure path. With `COLLECT_MODE=1` the demo prepends a
sentinel headline that always raises `ProviderUnavailable`; under
`error_policy="collect"` the failure lands in
`state.instance_errors` and the rest of the batch completes.
The `MODE` env var selects the per-instance failure posture. The
default `fail_fast` aborts the batch on the first instance whose
retries exhaust. `collect` and `degrade` both prepend a sentinel
headline that always raises `ProviderUnavailable`, then handle it
differently: `collect` lands the failure in `state.instance_errors`
and finishes the rest of the batch, while `degrade` wraps each
instance in `FailureIsolationMiddleware` so an exhausted instance is
caught and replaced with a placeholder summary, leaving the batch
intact.

## What it teaches

Expand All @@ -42,7 +46,11 @@ sentinel headline that always raises `ProviderUnavailable`; under
- `concurrency=3` capping how many instances run in flight at once.
- `error_policy="fail_fast"` (default, first exhausted-retry
failure aborts the batch) vs `"collect"` (failures land in
`errors_field` and the batch produces partial results).
`errors_field` and the batch produces partial results). The
`degrade` mode keeps `fail_fast` but adds
`FailureIsolationMiddleware` as the outermost instance middleware,
so an exhausted instance is caught and degraded to a placeholder
before the fan-out ever sees the failure.
- A `fan_out_config_observer` reads
`NodeEvent.fan_out_config` on the fan-out node's dispatch event,
recording the resolved `item_count` / `concurrency` /
Expand Down Expand Up @@ -77,10 +85,15 @@ uv sync --group examples
LLM_API_KEY=sk-... uv run python examples/fan-out-with-retry/main.py
```

To exercise the collect path with a synthetic failure:
To exercise a failure posture with a synthetic failure:

```bash
COLLECT_MODE=1 LLM_API_KEY=sk-... \
# record the failure and finish the batch
MODE=collect LLM_API_KEY=sk-... \
uv run python examples/fan-out-with-retry/main.py

# degrade the failed instance to a placeholder and finish the batch
MODE=degrade LLM_API_KEY=sk-... \
uv run python examples/fan-out-with-retry/main.py
```

Expand All @@ -103,7 +116,9 @@ flowchart TD

`headline_runs` is the fan-out node. At dispatch time it expands
into N copies of the per-instance subgraph, one per headline.
`RetryMiddleware` and `TimingMiddleware` wrap each instance.
`RetryMiddleware` and `TimingMiddleware` wrap each instance (plus
`FailureIsolationMiddleware` as the outermost layer in `degrade`
mode).

## Reading the output

Expand All @@ -112,7 +127,7 @@ A clean default-mode run (`fail_fast`, all instances succeed):
```
========================================================================
Summarizing 5 headlines in parallel (concurrency=3)
error_policy='fail_fast'
mode='fail_fast'
========================================================================

[observer] fan-out node 'headline_runs' dispatching: item_count=5 concurrency=3 error_policy='fail_fast'
Expand Down Expand Up @@ -154,8 +169,16 @@ Per-instance timings (in completion order):
a value near 1.0 indicates concurrency didn't help (the upstream
serialized you, or instances themselves are short).

With `COLLECT_MODE=1`, the output includes the sentinel headline
at index 0 with a `(failed after retries; ...)` marker, plus a
With `MODE=collect`, the output includes the sentinel headline at
index 0 with a `(failed after retries; ...)` marker, plus a
`Captured 1 per-instance error(s):` block listing the failed
`fan_out_index` and error category. The other instances complete
as usual.
`fan_out_index` and error category. The other instances complete as
usual.

With `MODE=degrade`, the sentinel at index 0 instead shows a
placeholder result (`summary: (unavailable)`, `topic: other`) and
there is no error block: `FailureIsolationMiddleware` caught the
exhausted-retry failure and returned the degraded partial, so the
fan-out recorded the instance as a (degraded) success. The
per-instance timings still show the sentinel's failed attempts, so
you can see the retries happened before the instance was degraded.
30 changes: 21 additions & 9 deletions examples/chat-with-multimodal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@
(`provider_rate_limit`, `provider_invalid_request`, etc.) in the
error message. The image URL failure mode (OpenAI's
fetcher hitting a CDN that blocks it) lands here as
`provider_invalid_request`. Three legitimate places to handle
`provider_invalid_request`. Four legitimate places to handle
this in production: caller-side `try / except NodeException`
(shown here), `RetryMiddleware` wrapping the respond node for
transient categories, or a `try / except LlmProviderError`
inside the node body returning a fallback response.
(shown here), call-level retry via `complete(retry=...)` for
transient categories (also shown here, on the respond node),
`RetryMiddleware` wrapping the whole respond node, or a
`try / except LlmProviderError` inside the node body returning a
fallback response.

**Configuration** (env vars; OpenAI defaults shown):

Expand Down Expand Up @@ -118,6 +120,7 @@
State,
append,
)
from openarmature.graph.middleware import RetryConfig
from openarmature.llm import (
AssistantMessage,
LlmProviderError,
Expand Down Expand Up @@ -330,9 +333,17 @@ async def respond(state: ChatState) -> dict[str, Any]:
placeholders={"history": state.history},
)

# Call-level retry: retry only the provider wire call on transient
# categories (provider_unavailable, provider_rate_limit, ...), using
# the default classifier and backoff. It is terminal-only, so the
# node still sees exactly one completion (or one final failure) even
# when an attempt was retried underneath. Contrast with a
# RetryMiddleware on the node, which re-runs the whole node body
# (re-render + re-send) on each retry.
response = await _get_provider().complete(
rendered.messages,
config=RuntimeConfig(temperature=0.0, max_tokens=400),
retry=RetryConfig(max_attempts=3),
)

# The rendered messages include [system, *history, current_user]
Expand Down Expand Up @@ -496,10 +507,10 @@ async def main() -> None:
# it's an ``LlmProviderError`` we surface the canonical
# ``.category`` string (``provider_rate_limit``,
# ``provider_invalid_request``, etc.) so the failure mode is
# immediately greppable. This is one of three legitimate places
# to handle the error; see the docstring for the other two
# (``RetryMiddleware`` wrapping the node, ``try/except`` inside
# the node body).
# immediately greppable. This is one of four legitimate places
# to handle the error; see the docstring for the others
# (call-level ``complete(retry=...)``, ``RetryMiddleware`` wrapping
# the node, ``try/except`` inside the node body).
final: ChatState | None = None
try:
final = await graph.invoke(initial)
Expand All @@ -512,8 +523,9 @@ async def main() -> None:
print()
print(f"*** node {exc.node_name!r} failed ({category}): {cause} ***")
print()
print("Three places to handle this in production code:")
print("Four places to handle this in production code:")
print(" - Caller-side try/except NodeException (this example).")
print(" - Call-level complete(retry=...) on the wire call (this example).")
print(" - RetryMiddleware on the node for transient categories.")
print(" - try/except inside the node body returning a fallback.")
finally:
Expand Down
120 changes: 79 additions & 41 deletions examples/fan-out-with-retry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@
- ``instance_middleware=(RetryMiddleware(...), TimingMiddleware(...))``
wraps EACH instance's whole subgraph invocation. Retries are
per-instance: a failure on headline 3 doesn't restart headlines 0-2.
In ``degrade`` mode a ``FailureIsolationMiddleware`` is prepended as
the outermost layer (retry stays inner, so it still sees raw
transients first).
- ``concurrency=3`` caps how many instances run in flight at once. Use
this to be polite to the upstream API.
- ``error_policy`` defaults to ``"fail_fast"``; the first instance
failure (after retries exhaust) raises and cancels siblings. Set
the ``COLLECT_MODE`` env var to switch to ``"collect"``: each
instance runs independently and per-instance failures land in
``state.instance_errors`` instead of aborting the batch. The
``errors_field="instance_errors"`` knob names where the records go.
Under COLLECT_MODE, the demo prepends a sentinel headline
(``[FORCE_FAIL] ...``) that ``summarize`` raises
``ProviderUnavailable`` on; retry exhausts, the error lands in
``instance_errors``, and the rest of the batch completes. Without
the sentinel, ``COLLECT_MODE`` would have nothing to capture.
- The ``MODE`` env var selects the per-instance failure posture.
``"fail_fast"`` (default) raises on the first instance whose retries
exhaust and cancels its siblings. ``"collect"`` lets each instance
run independently and lands per-instance failures in
``state.instance_errors`` (named by ``errors_field``) instead of
aborting. ``"degrade"`` wraps each instance in
``FailureIsolationMiddleware`` (outermost) so an exhausted instance
is caught and returns a placeholder partial, leaving the batch intact
with a degraded entry in place. ``collect`` and ``degrade`` both
prepend a sentinel headline (``[FORCE_FAIL] ...``) that ``summarize``
raises ``ProviderUnavailable`` on, so there is a failure to handle;
``fail_fast`` keeps the list clean for the happy path.
- A ``TimingRecord`` is captured per instance via an ``on_complete``
callback. ``TimingRecord`` carries the per-call duration but not the
``fan_out_index``; that index lives on observer NodeEvents instead.
Expand All @@ -56,6 +60,8 @@
- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.**
- ``LLM_MODEL`` defaults to ``gpt-4o-mini``.
- ``LLM_API_KEY`` required (empty for local servers that don't authenticate).
- ``MODE`` defaults to ``fail_fast``. One of ``fail_fast`` / ``collect`` /
``degrade`` (see the failure-posture bullet above).

Run with:

Expand Down Expand Up @@ -84,6 +90,8 @@
append,
)
from openarmature.graph.middleware import (
FailureIsolationMiddleware,
Middleware,
RetryConfig,
RetryMiddleware,
TimingMiddleware,
Expand Down Expand Up @@ -160,16 +168,17 @@ class HeadlineState(State):


async def summarize(s: HeadlineState) -> Mapping[str, Any]:
# Sentinel for the COLLECT_MODE demo. Raising a transient error
# (ProviderUnavailable carries the ``provider_unavailable``
# category, which retry's default classifier recognizes as
# retryable) lets the retry middleware exhaust its 3 attempts;
# the final failure then surfaces according to the fan-out's
# error_policy. Under fail_fast (default), the batch aborts.
# Under collect, the failure lands in instance_errors and the
# batch produces partial results.
# Sentinel for the collect / degrade failure-path demos (those modes
# prepend a [FORCE_FAIL] headline). Raising a transient error
# (ProviderUnavailable carries the ``provider_unavailable`` category,
# which retry's default classifier recognizes as retryable) lets the
# retry middleware exhaust its 3 attempts; the final failure then
# surfaces according to MODE: under collect it lands in
# instance_errors and the batch produces partial results; under
# degrade FailureIsolationMiddleware catches it and substitutes a
# placeholder so the batch finishes intact.
if "[FORCE_FAIL]" in s.headline:
raise ProviderUnavailable("synthetic failure: provider unavailable (COLLECT_MODE demo)")
raise ProviderUnavailable("synthetic failure: provider unavailable (failure-path demo)")
content = await _chat(
system=(
"Rewrite the headline as one short sentence (~15 words) that would work as a lead. No preamble."
Expand Down Expand Up @@ -249,16 +258,26 @@ async def present(s: BatchState) -> Mapping[str, Any]:
return {"trace": ["present"]}


def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
def build_graph(mode: str = "fail_fast") -> CompiledGraph[BatchState]:
"""Build the fan-out demo graph.

``error_policy`` switches between ``"fail_fast"`` (default; first
exhausted-retry failure raises and cancels the rest) and
``"collect"`` (each instance runs independently; failures land in
``state.instance_errors`` and the batch produces partial results).
``mode`` selects the per-instance failure posture:

- ``"fail_fast"`` (default): the first instance whose retries
exhaust raises and cancels the rest.
- ``"collect"``: each instance runs independently; failures land in
``state.instance_errors`` and the batch produces partial results.
- ``"degrade"``: each instance is additionally wrapped (outermost)
in ``FailureIsolationMiddleware``; an instance whose retries
exhaust is caught and returns a placeholder partial, so the batch
completes with a degraded entry in place rather than aborting or
dropping it.

The smoke test calls this with no argument, exercising the default
path; main() lets the COLLECT_MODE env var flip to collect.
path; main() lets the MODE env var pick the posture.
"""
if mode not in ("fail_fast", "collect", "degrade"):
raise ValueError(f"mode must be one of fail_fast / collect / degrade; got {mode!r}")
headline_subgraph = build_headline_subgraph()

retry = RetryMiddleware(
Expand All @@ -275,6 +294,25 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
clock=time.monotonic,
)

instance_middleware: tuple[Middleware, ...] = (retry, timing)
error_policy = "fail_fast"
if mode == "collect":
error_policy = "collect"
elif mode == "degrade":
Comment thread
chris-colinsky marked this conversation as resolved.
# Outermost instance middleware: catches the exception retry
# re-raises once its attempts exhaust and returns a degraded
# partial in place of the instance result, so the batch finishes
# instead of aborting (fail_fast) or dropping the instance
# (collect). Retry stays inner so it still sees raw transients
# first. The degraded mapping is keyed the way the fan-out
# projects an instance: the collect_field (``summary``) plus
# each parent extra_outputs key (``topics``).
degrade = FailureIsolationMiddleware(
degraded_update={"summary": "(unavailable)", "topics": "other"},
event_name="headline_degraded",
)
instance_middleware = (degrade, retry, timing)

return (
GraphBuilder(BatchState)
.add_node("announce", announce)
Expand All @@ -287,7 +325,7 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
target_field="summaries",
extra_outputs={"topics": "topic"},
concurrency=3,
instance_middleware=(retry, timing),
instance_middleware=instance_middleware,
error_policy=error_policy,
errors_field="instance_errors",
)
Expand Down Expand Up @@ -336,23 +374,23 @@ async def main() -> None:
# doesn't accumulate timings across invocations.
_timings.clear()

# Set COLLECT_MODE=1 to switch the fan-out error policy from the
# default fail_fast to collect. Under collect, each instance runs
# independently and per-instance failures (after retries exhaust)
# land in state.instance_errors instead of aborting the batch.
error_policy = "collect" if os.environ.get("COLLECT_MODE") else "fail_fast"
graph = build_graph(error_policy=error_policy)
# MODE selects the per-instance failure posture: fail_fast (default,
# abort on the first exhausted-retry failure), collect (record
# failures in state.instance_errors and finish the batch), or
# degrade (FailureIsolationMiddleware catches an exhausted instance
# and substitutes a placeholder so the batch finishes intact).
mode = os.environ.get("MODE", "fail_fast")
graph = build_graph(mode=mode)
graph.attach_observer(fan_out_config_observer)

# Under COLLECT_MODE, prepend a deliberately-failing headline so
# the collect path is exercised end-to-end: retry middleware
# exhausts on the sentinel, the failure lands in
# state.instance_errors, and the rest of the batch completes.
# Default (fail_fast) keeps the headline list clean so the demo's
# collect and degrade both need a failure to demonstrate, so prepend
# a deliberately-failing headline that summarize() always raises on.
# collect lands it in state.instance_errors; degrade catches it and
# substitutes a placeholder. fail_fast keeps the list clean so the
# happy path runs to completion.
if error_policy == "collect":
if mode in ("collect", "degrade"):
headlines = [
"[FORCE_FAIL] Synthetic failing headline for the COLLECT_MODE demo",
"[FORCE_FAIL] Synthetic failing headline for the failure-path demo",
*HEADLINES,
]
else:
Expand All @@ -361,7 +399,7 @@ async def main() -> None:

print("=" * 72)
print(f"Summarizing {len(headlines)} headlines in parallel (concurrency=3)")
print(f"error_policy={error_policy!r}")
print(f"mode={mode!r}")
print("=" * 72)
print()

Expand Down