Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 189 additions & 7 deletions veadk/runtime/codex/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import asyncio
import json
import os
from typing import Any, AsyncIterator

import litellm
Expand Down Expand Up @@ -62,6 +63,111 @@
)


def _shim_num_retries() -> int:
"""Backend retry count for transient errors (429/5xx/overloaded/timeout).

Previously the backend call used ``num_retries=0`` with no timeout, so a
transient Ark error or a stalled connection failed the turn outright and
the eval client's read timeout (default 300s) fired before any recovery.
Retrying lets litellm apply its built-in exponential backoff. Env-tunable
via ``CODEX_SHIM_NUM_RETRIES`` (default 2).
"""
try:
return max(0, int(os.getenv("CODEX_SHIM_NUM_RETRIES", "2")))
except ValueError:
return 2


def _shim_timeout() -> float:
"""Per-backend-call timeout (seconds) so a hung connection cannot exhaust
the whole client budget. ``0``/unset keeps litellm's default. Env-tunable
via ``CODEX_SHIM_TIMEOUT``.
"""
try:
return max(0.0, float(os.getenv("CODEX_SHIM_TIMEOUT", "0")))
except ValueError:
return 0.0


def _shim_max_tool_iters() -> int:
"""Max shim-internal web tool round-trips per request (``0`` = disabled).

Codex only speaks the Responses API and its hosted ``web_search`` tool is
stripped before Ark (Ark rejects its schema), so a Codex+Ark agent has no
web capability. When this is > 0, the shim translates the hosted web tools
into Ark-accepted ``function`` tools, executes the veADK builtins itself,
and loops until the model stops calling them. Default ``0`` keeps the path
byte-identical to before. Env-tunable via ``CODEX_SHIM_MAX_TOOL_ITERS``
(recommended 6 when enabling).
"""
try:
return max(0, int(os.getenv("CODEX_SHIM_MAX_TOOL_ITERS", "0")))
except ValueError:
return 0


# Hosted web tools (Codex emits these; Ark rejects their schema) translated to
# plain Responses ``function`` tools that the shim executes against veADK's own
# builtins. Kept minimal — the builtins own the real parameter handling.
_EXECUTABLE_TOOLS: list[dict[str, Any]] = [
{
"type": "function",
"name": "web_search",
"description": "Search the web for a query and return result documents.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query."}
},
"required": ["query"],
},
},
{
"type": "function",
"name": "web_fetch",
"description": (
"Fetch a public web page or PDF over HTTP(S) and return its "
"readable main content."
),
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The http(s) URL to fetch."},
"extract_mode": {
"type": "string",
"enum": ["markdown", "text"],
"description": "Output format; defaults to markdown.",
},
"max_chars": {
"type": "integer",
"description": "Truncate content to at most this many chars.",
},
},
"required": ["url"],
},
},
]

_EXECUTABLE_TOOL_NAMES = {t["name"] for t in _EXECUTABLE_TOOLS}


async def _run_tool(name: str, args: dict[str, Any]) -> str:
"""Execute a veADK built-in web tool and return a JSON string.

The web builtins use blocking ``requests``, so they run in a worker thread
to avoid stalling the shim's event loop. Any error is returned as a JSON
payload (never raised) so the model can recover on the next turn.
"""
try:
from veadk.tools import get_builtin_tool

fn = get_builtin_tool(name)
result = await asyncio.to_thread(fn, **args)
return json.dumps(result, ensure_ascii=False, default=str)
except Exception as e: # noqa: BLE001 - surfaced to the model, not raised
return json.dumps({"error": str(e)})


class ResponsesShim:
"""In-process Responses ``/v1/responses`` server backed by a chat endpoint.

Expand Down Expand Up @@ -98,11 +204,22 @@ async def responses(request: Request) -> Any:
# Codex injects its built-in tools (e.g. `web_search`) whose schema
# carries fields like `external_web_access` that non-OpenAI Responses
# backends (Ark) reject. Keep only standard `function` tools, which
# the bridged chat backend understands.
# the bridged chat backend understands. When the tool loop is enabled
# (CODEX_SHIM_MAX_TOOL_ITERS > 0), additionally translate the stripped
# hosted web tools into Ark-accepted `function` tools that the shim
# executes itself (see the tool loop below), so a Codex+Ark agent
# regains web search/fetch.
if isinstance(call_kwargs.get("tools"), list):
call_kwargs["tools"] = [
t for t in call_kwargs["tools"] if t.get("type") == "function"
]
inbound = call_kwargs["tools"]
wants_web = any(
t.get("type") in ("web_search", "web_search_preview")
for t in inbound
)
kept = [t for t in inbound if t.get("type") == "function"]
if wants_web and _shim_max_tool_iters() > 0:
have = {t.get("name") for t in kept}
kept.extend(t for t in _EXECUTABLE_TOOLS if t["name"] not in have)
call_kwargs["tools"] = kept
# On multi-step turns Codex replays prior assistant messages in
# `input` without a `status` field, but Ark's Responses API
# requires `status` on assistant messages (MissingParameter:
Expand All @@ -123,18 +240,83 @@ async def responses(request: Request) -> Any:
api_key=self.api_key,
custom_llm_provider="openai",
drop_params=True,
num_retries=0,
num_retries=_shim_num_retries(),
stream=False,
)
timeout = _shim_timeout()
if timeout:
call_kwargs["timeout"] = timeout

# Always call the backend non-streaming. litellm's chat->Responses
# bridge can only emit a single degenerate `response.completed`
# event when streaming a chat backend, which Codex's strict SSE
# parser rejects (surfaced as a generic "high demand" error). So we
# fetch the full result and, when Codex asked for a stream,
# synthesize the canonical Responses event sequence ourselves.
result = await litellm.aresponses(**call_kwargs)
resp = _to_dict(result)
# Bounded shim-internal tool loop: call the backend, and while it
# asks for an executable web tool, run the veADK builtin and feed
# the result back as a paired function_call + function_call_output
# (append BOTH so the chat bridge sees [user, assistant(tool_calls),
# tool] regardless of its internal cache). Exit purely on the absence
# of executable function_calls in the fresh output — the always-empty
# `message` item is ignored. The loop is invisible to Codex: only the
# final, tool-free turn is returned/synthesized.
max_iters = _shim_max_tool_iters()
iters = 0
while True:
result = await litellm.aresponses(**call_kwargs)
resp = _to_dict(result)
if max_iters <= 0 or iters >= max_iters:
break
conv = call_kwargs.get("input")
if not isinstance(conv, list):
break
calls = [
it
for it in (resp.get("output") or [])
if it.get("type") == "function_call"
and it.get("name") in _EXECUTABLE_TOOL_NAMES
]
if not calls:
break
for fc in calls:
cid = fc.get("call_id") or fc.get("id")
try:
args = json.loads(fc.get("arguments") or "{}")
except json.JSONDecodeError:
args = {}
out = await _run_tool(fc["name"], args)
conv.append(
{
"type": "function_call",
"call_id": cid,
"id": fc.get("id") or cid,
"name": fc["name"],
"arguments": fc.get("arguments") or "{}",
"status": "completed",
}
)
conv.append(
{
"type": "function_call_output",
"call_id": cid,
"output": out,
}
)
iters += 1

# If we broke at the iteration cap with an executable function_call
# still pending, strip it so it never leaks to Codex (which cannot
# run it and would desync the next turn / emit a null delta).
if max_iters > 0 and isinstance(resp.get("output"), list):
resp["output"] = [
it
for it in resp["output"]
if not (
it.get("type") == "function_call"
and it.get("name") in _EXECUTABLE_TOOL_NAMES
)
]

if stream:
return StreamingResponse(
Expand Down
Loading