From 4912d6909b7f2e649983e946b4802a768e7b1db4 Mon Sep 17 00:00:00 2001 From: Mickael Farina Date: Sun, 31 May 2026 17:58:42 +0200 Subject: [PATCH] =?UTF-8?q?fix(agents):=20L1=20=E2=80=94=20crew/agent=20ru?= =?UTF-8?q?ntime=20reliability=20(review=20sweep)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four findings from the codec_agents.py review: 1. Parallel crews no longer sink on one agent's failure (codec_agents.py:872) asyncio.gather(*coros) had no return_exceptions=True, so if ONE agent raised (e.g. a stuck-detection abort), every other agent's result was discarded and the whole crew failed. Sequential mode isolates per-agent; parallel now does too — a failed agent contributes an error marker, the rest still return + are joined. Each failure gets its own agent_finish audit (outcome=error), matching sequential. 2. Per-tool wall-clock budget (codec_agents.py:_execute_tool_with_hooks) A skill tool with no internal timeout (input(), a no-timeout network call, a deadlock) would hang the agent — and its default-thread-pool worker — forever. The executor call is now wrapped in asyncio.wait_for(120s); on timeout the agent gets a recoverable error string (+ a tool_result/timeout audit) and its ReAct loop continues. Documented residual: the abandoned worker thread can't be killed and stays parked until the blocking call returns — the generous budget keeps that rare. 3. Dead tautology removed (codec_agents.py:838) `if agent.tools != agent.tools or before != len(agent.tools)` — the first operand compares a list to itself (always False); the condition collapsed to the inner `if stripped:`. Simplified; allowlist scoping behavior is unchanged (pinned by test). 4. Dead eager module global removed (codec_agents.py:72) `SERPER_API_KEY = _serper_api_key()` was never read (web_search routes through codec_search.search, which fetches the key itself) yet did a Keychain shellout at every import. Dropped; the _serper_api_key() getter stays for live reads (+ satisfies the test_remaining_secrets source scan). Test surface: tests/test_agents_reliability_l1.py (7 tests) — parallel degrade-vs-all-succeed, hanging-tool-bounded (coroutine returns at the budget, not the orphaned thread's finish), fast-tool-still-works, allowlist-strips + no-allowlist-keeps-all, dead-global-gone. Existing 236 agent/crew tests still pass. Full suite: 2,062 passed / 77 skipped. ruff clean. Co-Authored-By: Claude Opus 4.8 --- codec_agents.py | 51 ++++++++++-- tests/test_agents_reliability_l1.py | 124 ++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 7 deletions(-) create mode 100644 tests/test_agents_reliability_l1.py diff --git a/codec_agents.py b/codec_agents.py index e0f0234..2a673cb 100644 --- a/codec_agents.py +++ b/codec_agents.py @@ -28,6 +28,13 @@ _VALID_TOOL_NAME_RE = re.compile(r'^[A-Za-z0-9_.\-]+$') _MAX_TOOL_NAME_LEN = 100 _MAX_TOOL_INPUT_LEN = 50000 +# L1 / SR-61: per-tool wall-clock budget. A skill tool with no internal timeout +# (input(), a no-timeout network call, a deadlock) would otherwise hang the +# agent — and its default-thread-pool worker — forever. wait_for returns control +# to the agent as a recoverable error string. (Residual: the abandoned worker +# thread can't be killed and stays parked until the blocking call returns — a +# generous budget keeps that rare; tools doing real work finish well inside it.) +_TOOL_CALL_TIMEOUT_SECONDS = 120 # ── CONFIG ── CONFIG_PATH = os.path.expanduser("~/.codec/config.json") @@ -62,7 +69,10 @@ def _serper_api_key() -> str: except Exception: return os.environ.get("SERPER_API_KEY", "") -SERPER_API_KEY = _serper_api_key() +# L1 / SR-61: dropped the eager module global `SERPER_API_KEY = _serper_api_key()` +# — it was never read (web_search routes through codec_search.search, which +# fetches the key itself) yet did a Keychain shellout at every import. The +# getter above stays for callers that want a live read. # ── HTTP connection pools (reuse TCP connections across calls) ── _HTTP_HEADERS = { @@ -470,7 +480,19 @@ def _inner(t, _c): ) ctx = contextvars.copy_context() - result = await loop.run_in_executor(None, ctx.run, _run_tool_with_hooks) + try: + result = await asyncio.wait_for( + loop.run_in_executor(None, ctx.run, _run_tool_with_hooks), + timeout=_TOOL_CALL_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + # L1 / SR-61: surface as a tool-error string so the agent's ReAct + # loop can recover (try a different tool / FINAL) instead of hanging. + _audit("tool_result", tool=tool_name, agent=self.name, outcome="timeout", + error=f"tool exceeded {_TOOL_CALL_TIMEOUT_SECONDS}s budget") + return (f"Tool '{tool_name}' timed out after " + f"{_TOOL_CALL_TIMEOUT_SECONDS}s. Try a different approach or " + f"give your FINAL answer.") if isinstance(result, HookVeto): result = (f"Tool '{tool_name}' was vetoed by plugin " f"'{result.plugin_name}': {result.reason}") @@ -816,10 +838,12 @@ def __post_init__(self): for agent in self.agents: before = len(agent.tools) agent.tools = [t for t in agent.tools if t.name in allowed] - if agent.tools != agent.tools or before != len(agent.tools): - stripped = before - len(agent.tools) - if stripped: - print(f"[Crew] Scoped {agent.name}: removed {stripped} tool(s) outside allowlist") + # L1 / SR-61: was `if agent.tools != agent.tools or ...` — the + # first operand is always False (list compared to itself); the + # condition reduced to the `if stripped:` below. Simplified. + stripped = before - len(agent.tools) + if stripped: + print(f"[Crew] Scoped {agent.name}: removed {stripped} tool(s) outside allowlist") async def run(self, callback: Optional[Callable] = None) -> str: # One correlation_id per crew run. All nested agent_start / agent_finish / @@ -869,7 +893,20 @@ async def run(self, callback: Optional[Callable] = None) -> str: # spawned N concurrent agent.run coroutines unbounded. pairs = list(zip(self.agents, self.tasks))[:self.max_steps] coros = [a.run(t, callback=callback) for a, t in pairs] - results = await asyncio.gather(*coros) + # L1 / SR-61: return_exceptions=True so ONE agent's failure + # (e.g. a stuck-detection abort raising) doesn't discard every + # other agent's result. Sequential mode isolates per-agent + # (try/raise above); parallel now degrades gracefully too — a + # failed agent contributes an error marker, the rest still return. + raw = await asyncio.gather(*coros, return_exceptions=True) + results = [] + for (agent, _task), r in zip(pairs, raw): + if isinstance(r, Exception): + _audit("agent_finish", agent=agent.name, outcome="error", + error_type=type(r).__name__, error=str(r)[:500]) + results.append(f"[{agent.name} failed: {type(r).__name__}: {r}]") + else: + results.append(r) final = "\n\n---\n\n".join(results) else: final = f"Unknown crew mode: {self.mode}" diff --git a/tests/test_agents_reliability_l1.py b/tests/test_agents_reliability_l1.py new file mode 100644 index 0000000..ecefe9f --- /dev/null +++ b/tests/test_agents_reliability_l1.py @@ -0,0 +1,124 @@ +"""L1 — regression tests for codec_agents.py reliability fixes (review sweep). + + - parallel crews degrade gracefully (one agent's exception no longer sinks all) + - a hanging tool is bounded by a per-tool wall-clock budget (no infinite hang) + - the Crew allowlist still strips out-of-allowlist tools (tautology simplification) + - the dead eager SERPER_API_KEY module global is gone +""" +from __future__ import annotations + +import asyncio +import sys +import time +from pathlib import Path + +_REPO = Path(__file__).resolve().parents[1] +if str(_REPO) not in sys.path: + sys.path.insert(0, str(_REPO)) + +import codec_agents # noqa: E402 +from codec_agents import Agent, Crew, Tool # noqa: E402 + + +# ── parallel crew graceful degradation ───────────────────────────────────── +def test_parallel_crew_survives_one_agent_exception(): + good = Agent(name="Good", role="r", tools=[]) + bad = Agent(name="Bad", role="r", tools=[]) + + async def _good_run(task, context="", callback=None): + return "good-result" + + async def _bad_run(task, context="", callback=None): + raise RuntimeError("boom") + + good.run = _good_run + bad.run = _bad_run + + crew = Crew(agents=[good, bad], tasks=["t1", "t2"], mode="parallel") + final = asyncio.run(crew.run()) + + # the good agent's result survives; the bad one becomes an error marker + assert "good-result" in final + assert "Bad failed" in final + assert "RuntimeError" in final + + +def test_parallel_crew_all_succeed_still_joins(): + a = Agent(name="A", role="r", tools=[]) + b = Agent(name="B", role="r", tools=[]) + + async def _ra(task, context="", callback=None): + return "ra" + + async def _rb(task, context="", callback=None): + return "rb" + + a.run, b.run = _ra, _rb + crew = Crew(agents=[a, b], tasks=["t", "t"], mode="parallel") + final = asyncio.run(crew.run()) + assert "ra" in final and "rb" in final + assert "failed" not in final + + +# ── per-tool timeout ─────────────────────────────────────────────────────── +def test_hanging_tool_is_bounded(monkeypatch): + # shrink the budget so the test is fast; the tool blocks longer than that + monkeypatch.setattr(codec_agents, "_TOOL_CALL_TIMEOUT_SECONDS", 0.3) + + def _slow(_s): + time.sleep(2.0) # exceeds the 0.3s budget + return "never" + + tool = Tool(name="slow", description="blocks", fn=_slow) + agent = Agent(name="T", role="r", tools=[tool]) + + # Measure the COROUTINE's own return time, not asyncio.run() — the latter + # also waits for the abandoned (non-killable) executor thread to finish its + # 2s sleep at loop teardown. The fix's contract is that the *agent* regains + # control at the budget, which is what run_until_complete returning proves. + loop = asyncio.new_event_loop() + try: + t0 = time.time() + out = loop.run_until_complete(agent._execute_tool_with_hooks(tool, "slow", "x")) + elapsed = time.time() - t0 # captured BEFORE loop.close() joins the thread + finally: + loop.close() + + assert "timed out" in out.lower(), f"expected timeout message, got: {out!r}" + assert elapsed < 1.5, f"wait_for did not bound the hang (took {elapsed:.1f}s)" + + +def test_fast_tool_returns_normally(monkeypatch): + monkeypatch.setattr(codec_agents, "_TOOL_CALL_TIMEOUT_SECONDS", 5) + tool = Tool(name="echo", description="echo", fn=lambda s: f"got:{s}") + agent = Agent(name="T", role="r", tools=[tool]) + out = asyncio.run(agent._execute_tool_with_hooks(tool, "echo", "hi")) + assert out == "got:hi" + + +# ── Crew allowlist still strips (tautology simplification) ────────────────── +def test_crew_allowlist_strips_out_of_scope_tools(): + keep = Tool(name="keep", description="", fn=lambda s: s) + drop = Tool(name="drop", description="", fn=lambda s: s) + agent = Agent(name="A", role="r", tools=[keep, drop]) + Crew(agents=[agent], tasks=["t"], allowed_tools=["keep"]) + names = {t.name for t in agent.tools} + assert names == {"keep"}, f"allowlist scoping broken: {names}" + + +def test_crew_no_allowlist_keeps_all_tools(): + a_tool = Tool(name="a", description="", fn=lambda s: s) + b_tool = Tool(name="b", description="", fn=lambda s: s) + agent = Agent(name="A", role="r", tools=[a_tool, b_tool]) + Crew(agents=[agent], tasks=["t"], allowed_tools=None) + assert {t.name for t in agent.tools} == {"a", "b"} + + +# ── dead global removed ───────────────────────────────────────────────────── +def test_dead_serper_global_removed(): + assert not hasattr(codec_agents, "SERPER_API_KEY"), ( + "the eager SERPER_API_KEY module global should be gone (it was unused " + "and did a Keychain shellout at import)" + ) + # the live getter stays + assert hasattr(codec_agents, "_serper_api_key")