Reference for everything importable from the top-level eden package — every entry-point, dataclass, Protocol, factory, hook type, abort primitive, and error class that eden.__all__ re-exports.
Eden's surface is a single module. Import what you need from eden; nothing private is part of the contract.
from eden import (
run, create_worktree,
RunResult, Iteration, Commit, Usage, Timeouts, Mount, FinalizeResult,
BranchStrategy, StreamEvent, Logging,
AbortController, AbortSignal, Aborted,
Agent, IterationContext,
simulated_agent, claude_code, codex, opencode, pi, cli_agent,
Hook, HookPhase, Hooks, HostHooks, SandboxHooks,
IsolatedSandboxHandle,
EdenError, ConfigError, CwdError, EdenTimeoutError, EnvMergeError,
HookError, HookFailed, HookTimeout, IdleTimeout, InvalidOptions,
PromptError, RestAuthError, RestError, RestNotFoundError,
RestRateLimited, SessionCaptureFailed, StepTimeout,
__version__,
)Sandbox providers live alongside the public surface but are imported from eden.providers (see sandbox-providers.md) — they are passed into run(sandbox=...).
A 15-line minimum-viable example using simulated_agent and no_sandbox:
from eden import run, simulated_agent
from eden.sandboxes.no_sandbox import provider as no_sandbox
result = run(
agent=simulated_agent(output="<promise>COMPLETE</promise>\n"),
sandbox=no_sandbox(),
prompt="say hi",
max_iterations=1,
)
print(result.completion_signal) # "<promise>COMPLETE</promise>"
print(result.branch) # generated worktree branch
print(len(result.iterations)) # 1Runs an agent against a sandbox in a managed worktree and returns a RunResult. Keyword-only.
def run(
*,
agent: Agent,
sandbox: SandboxProvider,
prompt: str | None = None,
prompt_file: str | Path | None = None,
prompt_args: Mapping[str, str] | None = None,
cwd: str | Path | None = None,
env: Mapping[str, str] | None = None,
branch_strategy: BranchStrategy | None = None,
max_iterations: int = 1,
completion_signal: str | list[str] = "<promise>COMPLETE</promise>",
idle_timeout: float | timedelta = 600.0,
idle_warning_interval: float | timedelta | None = None,
name: str | None = None,
hooks: Hooks | None = None,
timeouts: Timeouts | None = None,
on_event: Callable[[StreamEvent], None] | None = None,
logging: Logging | None = None,
signal: AbortSignal | None = None,
output: OutputDefinition | None = None,
resume_session: str | None = None,
copy_to_worktree: list[str] | None = None,
throw_on_duplicate_worktree: bool = True,
) -> RunResult: ...Parameters:
agent— anything satisfying theAgentProtocol (see Agents below).sandbox— aSandboxProvider(no_sandbox(),docker(...),daytona(...), etc.).prompt/prompt_file/prompt_args— supply the iteration prompt inline, from a file path, or with{name}substitutions; mutually-aware (see prompts.md). Exactly one ofpromptorprompt_fileis required.cwd— host path that will be the worktree root's source. Defaults toPath.cwd().env— extra environment variables forwarded into the agent process.branch_strategy— one ofBranchStrategy.head(),merge_to_head(base),named(branch, base). Defaults to a generatededen/<slug>branch.max_iterations— maximum agent loop iterations. Default1.completion_signal— string or list-of-strings whose appearance in agent output stops the loop early. Default"<promise>COMPLETE</promise>".idle_timeout— seconds (ortimedelta) of stdout silence before the run aborts withIdleTimeout. Default600.0.idle_warning_interval— emitStreamEvent(type="idle_warning")every N seconds during idleness.Nonedisables.name— informational tag used in worktree branch slugs and stream events.hooks—Hooks(host=..., sandbox=...)lifecycle bundle. DefaultHooks().timeouts—Timeouts(...)per-step deadlines. DefaultTimeouts().on_event— callback invoked with everyStreamEvent. Use to forward to UIs, logs, or queues.logging—Logging.file(path, on_agent_stream_event=...)to mirror events to a log file; the optional callback fires for agent-emitted text/tool/usage events only and swallows exceptions.signal—AbortSignalfor cooperative cancellation. If omitted,runallocates its own (unused) signal.output—Output.object(...)/Output.string(...)to extract a typed payload from a<tag>block in stdout. Requiresmax_iterations=1and that<tag>literally appear in the prompt. Failure raisesStructuredOutputError.resume_session— Claude Code session id to resume; appends--resume <id>to the agent argv. Requiresmax_iterations=1.copy_to_worktree— list of host-relative file/directory paths to copy fromcwdinto the freshly-carved worktree before the sandbox boots (and beforehost.on_worktree_readyhooks fire, so hooks can use the copied files). Files preserve their relative path; directories copy recursively; existing destinations are overwritten. Absolute paths,..traversal, and theheadbranch strategy raiseInvalidOptions; missing sources raiseCopyToWorktreeError. Useful for seeding.envfiles, fixtures, or local configs that the worktree shouldn't inherit fromgit checkout.throw_on_duplicate_worktree— whenFalseand the named-strategy branch already has an on-disk worktree, that worktree is reused (andclose()does not remove it). DefaultTrue(raiseBranchExistson duplicate). Only meaningful forBranchStrategy.named(...). Useful for iterative re-runs against the same scenario branch withouteden cleanin between.
Returns a RunResult.
eden.aio mirrors the three top-level entry points (run, create_sandbox, interactive) as async def functions. Each is a thin asyncio.to_thread wrapper around its sync counterpart — same arguments, same return type, no async-native primitives in the core. See ADR 0011.
import asyncio
import eden
from eden import aio
from eden.sandboxes.no_sandbox import provider as no_sandbox
async def main() -> None:
# Single run.
result = await aio.run(
agent=eden.simulated_agent(output="hi\n<promise>COMPLETE</promise>\n"),
sandbox=no_sandbox(),
prompt="x",
max_iterations=1,
)
# Concurrent runs.
a, b = await asyncio.gather(
aio.run(agent=..., sandbox=..., prompt="task A", branch_strategy=eden.BranchStrategy.named("eden/a")),
aio.run(agent=..., sandbox=..., prompt="task B", branch_strategy=eden.BranchStrategy.named("eden/b")),
)
# create_sandbox.run() is sync; await it via asyncio.to_thread.
s = await aio.create_sandbox(sandbox=no_sandbox())
try:
impl = await asyncio.to_thread(s.run, agent=..., prompt_file="implement.md", max_iterations=20)
finally:
s.close()
asyncio.run(main())Concurrency is bounded by asyncio's default ThreadPoolExecutor (min(32, cpu+4) workers). Users running more concurrent tasks should size the pool with loop.set_default_executor(...). See ADR 0011 for why eden does not async-ify the core.
Run an agent attached to the parent terminal's stdio. There is no iteration loop, no idle watchdog, no completion-signal matching — eden carves a worktree, optionally renders a prompt, and execs the agent. The function returns when the agent process exits.
def interactive(
*,
agent: Agent,
sandbox: SandboxProvider | None = None,
prompt: str | None = None,
prompt_file: str | Path | None = None,
prompt_args: Mapping[str, str] | None = None,
cwd: str | Path | None = None,
env: Mapping[str, str] | None = None,
branch_strategy: BranchStrategy | None = None,
name: str | None = None,
hooks: Hooks | None = None,
copy_to_worktree: list[str] | None = None,
) -> InteractiveResult: ...sandboxdefaults tono_sandbox().docker(...)andpodman(...)are also supported — eden runs the agent argv inside the container via<binary> exec -it. Isolated providers (Daytona, Vercel, the localisolatedcopy) raiseInvalidOptionsbecause they don't expose a TTY.prompt/prompt_file/prompt_argsare optional. When supplied, the rendered text is passed to the agent'sbuild_interactive_command(ctx)(orbuild_command(ctx)when no interactive override exists).branch_strategydefaults toBranchStrategy.head()when the provider supports it — interactive sessions usually want writes to land in the host repo directly. Override tomerge_to_head()ornamed()for an isolated session.hooksruns the sameOnWorktreeReady/OnSandboxReady/OnCloselifecycle asrun();OnIterationStart/OnIterationEndare not relevant.copy_to_worktree— same semantics as onrun(): host-relative paths copied into the worktree beforeon_worktree_readyhooks fire. Incompatible withBranchStrategy.head(), which is the default for interactive sessions — passbranch_strategy=BranchStrategy.merge_to_head()(ornamed(...)) to use it.
Returns an InteractiveResult.
@dataclass(frozen=True)
class InteractiveResult:
branch: str
exit_code: int
worktree_path: Path
cwd: PathLightweight: exit_code is the agent's exit status; branch is the worktree branch ("HEAD" for the head strategy); worktree_path is where the agent ran (commit / inspect from there). No commit list, no stdout — interactive sessions don't capture either.
Creates a sandbox + worktree once and returns a Sandbox whose .run(...) method can be called multiple times against the same branch and container. Use when one logical task requires multiple agent runs (implement → review, plan → execute, etc.) and you want them to share environment, branch, and any provider-side caches.
def create_sandbox(
*,
sandbox: SandboxProvider,
branch: str | None = None,
branch_strategy: BranchStrategy | None = None,
cwd: Path | None = None,
env: Mapping[str, str] | None = None,
mounts: tuple[Mount, ...] | None = None,
name: str | None = None,
copy_to_worktree: list[str] | None = None,
) -> Sandbox: ...copy_to_worktree (when supplied) seeds host-relative files into the worktree before the sandbox boots — same semantics as on run(), and the copy happens once at create_sandbox() time (not on every subsequent sb.run()). Incompatible with BranchStrategy.head().
The returned Sandbox is a dataclass with .worktree, .handle, .sandbox_provider, .cwd, plus a .run(...) method (same shape as the top-level run() minus agent/sandbox already supplied). It also doubles as a context manager — with create_sandbox(...) as s: closes the handle and worktree on exit.
Run an agent against an already-created sandbox. Same arguments as run() minus sandbox= (already bound) and branch_strategy= (would be ignored — the sandbox already owns a branch). All other options carry over: output=, resume_session=, logging=, on_event=, signal=, hooks=, timeouts=, etc.
Useful for sequential-reviewer / planner-executor patterns where multiple agents share one branch, and for resuming a captured Claude Code session in a fresh container without re-creating the worktree.
with eden.create_sandbox(sandbox=docker_provider(...), branch="eden/feature/x") as s:
impl = s.run(agent=eden.claude_code("..."), prompt_file="implement.md", max_iterations=20)
if impl.commits:
s.run(agent=eden.claude_code("..."), prompt_file="review.md", max_iterations=1)Carves a worktree without launching an agent — useful when you want to manage the iteration loop yourself.
def create_worktree(
*,
branch: str | None = None,
branch_strategy: BranchStrategy | None = None,
name: str | None = None,
) -> WorktreeHandle: ...Provide either branch (named) or branch_strategy (any of the three strategies); supplying both raises ValueError. Defaults to BranchStrategy.merge_to_head(). Returns a WorktreeHandle with .branch, .worktree_path, and .close() (works as a context manager).
Frozen dataclass capping per-step durations.
@dataclass(frozen=True)
class Timeouts:
hook_step: float = 60.0
iteration_step: float | None = None
copy_to_worktree: float = 60.0hook_step— seconds budget for any individual hook command. Exceeded →HookTimeout.iteration_step— seconds budget for one agent iteration.Nonedefers toidle_timeout. Exceeded →StepTimeout.copy_to_worktree— seconds budget for the isolated provider's worktree clone. Exceeded →CopyToWorktreeError(timed_out=True). Set the provider's owncopy_timeoutto override per-call; passNoneto disable the budget.
File sink for StreamEvents. Each call to run() opens the file in append mode and prepends a --- Run started: <UTC ISO ts> --- delimiter so a shared log file remains readable.
@dataclass(frozen=True)
class Logging:
type: Literal["file"]
path: Path
level: Literal["debug", "info", "warn", "error"] = "info"
on_agent_stream_event: Callable[[StreamEvent], None] | None = None
@staticmethod
def file(
path: str | Path,
level: ... = "info",
on_agent_stream_event: Callable[[StreamEvent], None] | None = None,
) -> Logging: ...Use Logging.file("run.log") to capture every event the orchestrator emits.
on_agent_stream_event (optional) is invoked for every agent-derived event (text, tool_call, usage) in addition to file output. Intended for forwarding the agent's stream to external observability. Idle warnings and orchestrator-internal text are NOT forwarded — use the top-level on_event argument to run() for those. Errors raised by the callback are swallowed so a broken forwarder cannot kill the run.
Provider-side bind-mount declaration (used by sandbox providers, not by run directly).
@dataclass(frozen=True)
class Mount:
host: Path
sandbox: Path
read_only: bool = FalseFrozen dataclass with three named constructors describing how the worktree branch relates to base:
@dataclass(frozen=True)
class BranchStrategy:
tag: Literal["head", "merge_to_head", "named"]
branch: str | None = None
base: str = "main"
@staticmethod
def head() -> BranchStrategy: ...
@staticmethod
def merge_to_head(base: str = "main") -> BranchStrategy: ...
@staticmethod
def named(branch: str, base: str = "main") -> BranchStrategy: ...head()— work directly on the currentHEAD; no merge, no auto-named branch.merge_to_head(base)— generated branch offbase; merged back on success.named(branch, base)— explicit branch offbase; preserved as-is.
Returned by run(). Frozen dataclass.
@dataclass(frozen=True)
class RunResult:
iterations: list[Iteration]
completion_signal: str | None
branch: str
stdout: str
commits: list[Commit]
worktree_path: Path
preserved_worktree_path: Path | None
merged_to_target_branch: str | None
cwd: Path
prompt: str
env: dict[str, str]
log_file_path: Path | None
session_id: str | None
session_file_path: Path | None
usage: Usage | None
output: object | None = Nonecompletion_signal is the matched signal that stopped the loop (or None if all iterations ran to completion). merged_to_target_branch is set when a merge_to_head strategy successfully merged. usage is the final iteration's token usage. output is the validated payload extracted by output=Output.object(...) / Output.string(...), or None when no output= is configured.
@dataclass(frozen=True)
class Iteration:
index: int
completion_signal: str | None
session_id: str | None
session_file_path: Path | None
usage: Usage | NoneOne entry per executed iteration. session_id and session_file_path are populated when the agent reports captures_sessions=True.
@dataclass(frozen=True)
class Commit:
sha: strCaptured commit on the worktree branch — populated in order they appeared.
Token-accounting numbers from agents that report them (e.g. Claude Code).
@dataclass(frozen=True)
class Usage:
input_tokens: int
cache_creation_input_tokens: int
cache_read_input_tokens: int
output_tokens: intReturned by IsolatedSandboxHandle.finalize(target) — summarises what the cloud/isolated provider replayed onto the host.
@dataclass(frozen=True)
class FinalizeResult:
applied: bool
files_changed: tuple[Path, ...]
patch_size_bytes: intapplied=False means at least one copy or unlink failed; the orchestrator logs failures and continues.
Helpers for declaring schema-validated payloads on run().
from eden import Output, run
# String tag — extracts trimmed contents of <answer>...</answer>
result = run(..., output=Output.string(tag="answer"), max_iterations=1, prompt="...<answer>...</answer>...")
print(result.output) # str
# Object tag — JSON-parses contents (with code-fence unwrap) and runs schema()
def parse(raw: object) -> Plan:
assert isinstance(raw, dict)
return Plan(**raw)
result = run(..., output=Output.object(tag="plan", schema=parse), max_iterations=1, prompt="...<plan>...</plan>...")
plan = result.output # whatever schema returnedOutput.object(tag, schema) extracts the last <tag>...</tag> pair, strips an optional Markdown code fence (```json ... ```), json.loads it, and calls schema(parsed). schema is any Callable[[object], T] that returns a validated value or raises — works with pydantic Model.model_validate, dataclass factories, msgspec, or hand-rolled validators.
Output.string(tag) extracts the contents and .strip()s them — no JSON, no validation.
Validation at entry:
max_iterations == 1is required (raisesInvalidOptionsotherwise).<tag>must literally appear in the prompt source (raisesInvalidOptionsotherwise).
Failures during extraction raise StructuredOutputError with tag, raw_matched, branch, optional preserved_worktree_path, and — when the failing iteration was captured — session_id and session_file_path. The session fields let claude_code callers resume the same conversation with corrective feedback and re-emit corrected output, rather than restart from scratch:
from eden import Output, StructuredOutputError, claude_code, run
try:
result = run(
agent=claude_code(),
sandbox=..., prompt="emit <result>{...}</result>",
output=Output.object(tag="result", schema=my_schema),
)
except StructuredOutputError as e:
if e.session_id is None:
raise
run(
agent=claude_code(),
sandbox=..., output=Output.object(tag="result", schema=my_schema),
resume_session=e.session_id,
prompt=f"Your previous <result> was malformed: {e.raw_matched!r}. Re-emit it.",
)Type alias for the union of Output.object(...) and Output.string(...) return values. Use this in helper signatures that accept either shape.
The single discriminated union surfaced by on_event and the JSONL log.
@dataclass(frozen=True)
class StreamEvent:
type: Literal["text", "idle_warning", "tool_call", "usage"]
agent_name: str
iteration: int
timestamp: datetime
text: str | None = None
minutes_idle: int | None = None
tool_name: str | None = None
tool_input: dict[str, object] | None = None
usage: Usage | None = None
session_id: str | None = NoneThe four type kinds:
"text"— line of agent output. Carriestext."idle_warning"— emitted onidle_warning_interval. Carriesminutes_idle."tool_call"— agent invoked a tool. Carriestool_nameandtool_input."usage"— token usage report. Carriesusage(and optionallysession_id).
__post_init__ enforces that the type-specific fields are non-None.
Structural contract every agent must satisfy. Runtime-checkable.
@runtime_checkable
class Agent(Protocol):
@property
def name(self) -> str: ...
@property
def model(self) -> str: ...
def build_command(self, ctx: IterationContext) -> list[str]: ...
def parse_stream(self, line: str) -> StreamEvent | None: ...Agents may also expose captures_sessions: bool — the orchestrator reads it via getattr and post-processes session JSONL when True.
Passed into Agent.build_command(ctx).
@dataclass(frozen=True)
class IterationContext:
iteration: int
prompt: str
sandbox_handle: SandboxHandle
worktree_path: Path
branch: str
name: str | NoneSix factories ship in-tree. Each returns an Agent. See agents.md for capability comparisons.
def simulated_agent(
name: str = "simulated",
model: str = "deterministic-1",
*,
output: str | list[str] | Callable[[IterationContext], str] = "<promise>COMPLETE</promise>\n",
delay_per_line: float = 0.0,
fail_with: Exception | None = None,
) -> Agent: ...A deterministic agent that emits a pre-baked output. Use in tests, in examples, or when wiring up the orchestrator without an LLM.
def claude_code(
model: str = "claude-opus-4-7",
*,
name: str = "claude-code",
effort: Literal["low", "medium", "high"] | None = None,
env: Mapping[str, str] | None = None,
capture_sessions: bool = True,
extra_args: tuple[str, ...] = (),
) -> Agent: ...Wraps the Claude Code CLI; sets captures_sessions=True so the orchestrator preserves session JSONLs under .eden/sessions/. Pass extra_args for any CLI flag eden does not yet surface.
When capture_sessions=True, the agent ships a session_storage attribute of type ClaudeSessionStorage that the orchestrator delegates transcript capture to. Out-of-tree agents (codex, pi, opencode wrappers, etc.) can mirror this pattern to plug in their own transcript layout — see the SessionStorage Protocol below.
def codex(
model: str = "gpt-5",
*,
env: Mapping[str, str] | None = None,
extra_args: tuple[str, ...] = (),
) -> Agent: ...Thin wrapper over cli_agent for the OpenAI Codex CLI binary. Default model="gpt-5" is illustrative.
def opencode(
model: str = "claude-opus-4",
*,
env: Mapping[str, str] | None = None,
extra_args: tuple[str, ...] = (),
) -> Agent: ...Wrapper for sst/opencode. Default model="claude-opus-4" is illustrative — opencode supports many providers.
def pi(
model: str = "pi-3.5",
*,
env: Mapping[str, str] | None = None,
extra_args: tuple[str, ...] = (),
) -> Agent: ...Wrapper for the pi CLI binary.
def cursor(
model: str = "claude-sonnet-4-6",
*,
name: str = "cursor",
env: Mapping[str, str] | None = None,
force: bool = False,
extra_args: tuple[str, ...] = (),
) -> Agent: ...Wrapper for Cursor's CLI binary (named agent). Builds agent --print --output-format stream-json --model <model> [--force] [extra_args ...] <prompt>. Prompt is delivered positionally, with a 120 KB pre-flight guard (raises InvalidOptions(code="config.prompt_too_long") on overflow). force is Cursor's equivalent of Claude's dangerously_skip_permissions. captures_sessions is False. The parser handles cursor's tool_call event and delegates Claude-compatible assistant/result blocks to Claude's parser. See agents.md for details.
def copilot(
model: str = "claude-sonnet-4",
*,
name: str = "copilot",
effort: Literal["low", "medium", "high"] | None = None,
env: Mapping[str, str] | None = None,
allow_all_tools: bool = False,
extra_args: tuple[str, ...] = (),
) -> Agent: ...Wrapper for the copilot CLI binary (GitHub Copilot CLI). Builds copilot -p <prompt> --output-format json --model <model> [--allow-all-tools] [--effort <level>] [extra_args ...]. Prompt is delivered via -p (still argv), with the same 120 KB pre-flight guard. allow_all_tools is Copilot's equivalent of Claude's dangerously_skip_permissions. captures_sessions is False. The parser decodes assistant.message_delta → text, tool.execution_start → tool_call (normalises lowercase "bash" → "Bash"), result → session_id, error/agent_error → text. See agents.md for details.
Generic factory for any line-streaming CLI. The codex/opencode/pi wrappers are 5-line shims over this.
def cli_agent(
*,
name: str,
model: str,
binary: str,
build_argv: Callable[[IterationContext], list[str]] | None = None,
parse_stream: Callable[[str], StreamEvent | None] | None = None,
captures_sessions: bool = False,
env: Mapping[str, str] | None = None,
extra_args: tuple[str, ...] = (),
) -> Agent: ...name—StreamEvent.agent_name.model— informational; threaded into argv if yourbuild_argvreferences it.binary— executable resolved through$PATHat spawn.build_argv— override the default[binary, *extra_args, ctx.prompt].parse_stream— override the defaultNone(orchestrator emitstextevents per line).captures_sessions— opt-in session post-processing.env— per-agent env additions (merged by the orchestrator).extra_args— appended between binary and prompt by the defaultbuild_argv.
@runtime_checkable
class SessionStorage(Protocol):
def extra_mounts(self) -> tuple[Mount, ...]: ...
def host_capture(
self, *, handle, session_id, host_repo_path, branch, iteration
) -> Path | None: ...
def sandbox_transfer(self, *, handle, host_session_file, session_id) -> None: ...Per-agent transcript capture, ADR-0012 style. Eden's default Claude Code agent ships a ClaudeSessionStorage instance on its session_storage attribute (set when capture_sessions=True), which the orchestrator delegates to instead of doing the work in _run_loop. Out-of-tree agents (codex, pi, opencode wrappers) can ship their own SessionStorage implementation to plug in custom transcript layouts without forking the orchestrator. Legacy agents that only expose captures_sessions: bool still work — the orchestrator falls back to ClaudeSessionStorage for them.
@dataclass(frozen=True)
class ClaudeSessionStorage:
home: Path | None = NoneThe default SessionStorage implementation, used by claude_code(). Mounts ~/.claude/projects into containerized sandboxes and locates Claude's per-iteration JSONL by the project-slug convention. home= overrides ~ for tests.
@dataclass(frozen=True)
class CodexSessionStorage:
home: Path | None = NoneThe SessionStorage implementation used by codex(capture_sessions=True) (the default). Mounts ~/.codex/sessions into containerized sandboxes and walks codex's date-nested directory tree (<YYYY>/<MM>/<DD>/rollout-<timestamp>-<session_id>.jsonl) to locate the per-iteration transcript. home= overrides ~ for tests.
def transfer_session(
*,
source: Path,
dest: Path,
source_cwd: str,
dest_cwd: str,
) -> Path: ...Cross-host helper. Copies a captured session JSONL from source to dest, rewriting every absolute path that starts with source_cwd to start with dest_cwd. Use to migrate captured sessions between machines whose worktree paths differ (e.g. /Users/alice/repo → /home/build/repo) so the resumed agent sees its own filesystem layout. dest's parent is created. Raises SessionCaptureFailed on missing source or I/O error.
Eden runs commands at five named phases — HookPhase enumerates them and Hooks bundles host-side and sandbox-side variants.
@dataclass(frozen=True)
class Hook:
cmd: str
cwd: Path | None = None
env: Mapping[str, str] | None = None
timeout: float | None = NoneA single shell command to run. timeout=None defers to Timeouts.hook_step.
class HookPhase(Enum):
OnWorktreeReady = "on_worktree_ready"
OnSandboxReady = "on_sandbox_ready"
OnIterationStart = "on_iteration_start"
OnIterationEnd = "on_iteration_end"
OnClose = "on_close"Order: OnWorktreeReady (host) → OnSandboxReady (sandbox) → for each iteration OnIterationStart → agent → OnIterationEnd → on exit OnClose.
@dataclass(frozen=True)
class HostHooks:
on_worktree_ready: tuple[Hook, ...] = ()
on_iteration_start: tuple[Hook, ...] = ()
on_iteration_end: tuple[Hook, ...] = ()
on_close: tuple[Hook, ...] = ()Host hooks run sequentially on the workstation. Note: on_sandbox_ready is sandbox-only.
@dataclass(frozen=True)
class SandboxHooks:
on_sandbox_ready: tuple[Hook, ...] = ()
on_iteration_start: tuple[Hook, ...] = ()
on_iteration_end: tuple[Hook, ...] = ()
on_close: tuple[Hook, ...] = ()Sandbox hooks run inside the sandbox handle. They may execute in parallel where the provider supports it.
@dataclass(frozen=True)
class Hooks:
host: HostHooks = field(default_factory=HostHooks)
sandbox: SandboxHooks = field(default_factory=SandboxHooks)Failure mapping: a hook that exits non-zero raises HookFailed; exceeding its timeout raises HookTimeout. Both are subclasses of HookError.
Cooperative cancellation uses an AbortController / AbortSignal pair. Pass the signal to run(signal=...); call controller.abort() from another thread to stop.
@dataclass
class AbortController:
signal: AbortSignal = field(default_factory=AbortSignal)
def abort(self, *, reason: str = "abort-signal") -> None: ...Writer side. abort() is idempotent — only the first call records a reason.
@dataclass
class AbortSignal:
def is_aborted(self) -> bool: ...
@property
def reason(self) -> str | None: ...
def raise_if_aborted(self) -> None: ...
def wait(self, timeout: float | None = None) -> bool: ...Reader side. Pollable via is_aborted(), blocking via wait(timeout), and assertable via raise_if_aborted() (raises Aborted).
class Aborted(EdenError):
def __init__(self, *, reason: str = "abort-signal") -> None: ...Raised by raise_if_aborted() and surfaced from run() when cancellation lands.
Eden re-exports the full provider surface from the top-level package so consumers can build cloud or out-of-tree providers without depending on eden.providers._protocols directly. See custom-providers.md for the full walk-through.
@runtime_checkable
class SandboxHandle(Protocol):
worktree_path: Path
def exec(self, cmd: str, *, on_line, cwd, env, timeout, stdin) -> ExecResult: ...
def copy_file_in(self, host: Path, sandbox: Path) -> None: ...
def copy_file_out(self, sandbox: Path, host: Path) -> None: ...
def close(self) -> None: ...The base handle every provider's create() must return. Runtime-checkable. exec(stdin=...) writes the supplied string to the command's stdin so callers can deliver large payloads without hitting the 128KB execve argv cap (REST providers wrap with printf <base64> | base64 -d | (cmd)).
Marker subprotocol of SandboxHandle — no extra methods. Used by docker, podman, no_sandbox and any custom provider that runs the agent against a host-mounted worktree.
@runtime_checkable
class IsolatedSandboxHandle(SandboxHandle, Protocol):
def finalize(self, target: Path) -> FinalizeResult: ...A SandboxHandle whose state replicates back to the host on close via finalize(target). The orchestrator detects the protocol via hasattr(handle, "finalize"). Bind-mount providers (docker, podman, no_sandbox) do not implement it.
@runtime_checkable
class SandboxProvider(Protocol):
name: str
kind: Literal["bind_mount", "isolated", "none"]
def supports_strategy(self, strategy: BranchStrategy) -> bool: ...
def create(self, opts: CreateOptions) -> SandboxHandle: ...The factory contract. Wrap a create callable with make_bind_mount_provider or make_isolated_provider instead of implementing this class by hand unless you have a reason.
@dataclass(frozen=True)
class CreateOptions:
branch: str
worktree_path: Path
host_repo_path: Path
env: Mapping[str, str]
mounts: tuple[Mount, ...]
name_hint: str | NoneThe argument the orchestrator hands to your create() callable.
@dataclass(frozen=True)
class ExecResult:
stdout: str
stderr: str
exit_code: int
@property
def ok(self) -> bool: ...
def check(self) -> ExecResult: ...Returned by handle.exec(...). check() raises ExecFailed if exit_code != 0.
from eden import make_bind_mount_provider
provider = make_bind_mount_provider(name="my-provider", create=my_create_fn)Wraps a create: Callable[[CreateOptions], BindMountSandboxHandle] into a SandboxProvider with kind="bind_mount". Accepts an optional supported_strategies: frozenset[StrategyTag] to restrict the branch strategies your provider supports (default: all three).
from eden import make_isolated_provider
provider = make_isolated_provider(name="my-provider", create=my_create_fn)Same idea, but the returned handle must expose finalize(target) -> FinalizeResult. Produces a provider with kind="isolated".
A swappable sink abstraction for orchestrator → user output. Eden re-exports the Protocol and three concrete sinks; pass any of them to higher-level CLI / interactive helpers that accept a display= argument. Ports sandcastle's tagged DisplayEntry ADT (src/Display.ts).
class Display(Protocol):
def intro(self, title: str) -> None: ...
def status(self, message: str, severity: Severity = "info") -> None: ...
def text(self, message: str) -> None: ...
def tool_call(self, name: str, formatted_args: str) -> None: ...
def summary(self, title: str, rows: Mapping[str, str]) -> None: ...
@contextmanager
def spinner(self, message: str) -> Iterator[None]: ...
@contextmanager
def task_log(self, title: str) -> Iterator[Callable[[str], None]]: ...Severity is one of "info" | "success" | "warn" | "error". The two context managers wrap long-running blocks: spinner for an indeterminate progress indicator; task_log for collecting per-step messages and emitting them on exit (the yielded callable pushes messages into the log).
Tagged-union of IntroEntry | StatusEntry | SpinnerEntry | SummaryEntry | TaskLogEntry | TextEntry | ToolCallEntry. Each has a .tag literal and the relevant payload fields. Used by SilentDisplay to record everything for test assertions.
display = SilentDisplay()
# ... orchestrator runs ...
assert display.entries[-1].title == "Run complete"Records every entry on .entries, prints nothing. The test sink.
display = FileDisplay(Path(".eden/logs/run.log"))Append-only file sink with timestamped delimiter on construction. Spinners and task logs record their duration. Suitable for unattended / CI runs.
display = RichDisplay() # uses default rich.console.Console()Live terminal output powered by the bundled rich dependency. Renders severities with color glyphs, spinners with rich.status.Status, summaries as bold-key / dim-value blocks. Inject a custom Console via RichDisplay(console=Console(file=...)) for capturing tests.
Every error eden raises descends from EdenError. Each concrete class accepts a cause keyword argument and carries code, message, and hint attributes for structured logging. EdenTimeoutError additionally subclasses the built-in TimeoutError for mixed-except ergonomics. See errors.md for the full taxonomy with code strings, raise sites, and recovery guidance.
from eden import EdenError, format_error_message, run
try:
run(agent=..., sandbox=..., prompt="...")
except EdenError as e:
print(format_error_message(e))Maps any EdenError (including the sandbox / worktree subclasses) to a single multi-line user-friendly string of the form:
<kind-prefix>: <message>
code: <code>
hint: <hint>
hint is preserved when the error already carries one (e.g. InvalidOptions(..., hint=...)). For tagged provider errors that don't carry a hint — ProviderUnavailable, ImageNotFound, ContainerStartFailed, ExecTimeout, etc. — the formatter synthesises a context-aware suggestion ("Is Docker running?", "Build the image first: docker build ...", "Increase Timeouts.iteration_step"). Use this in CLI surfaces so users get the same recovery message regardless of which error subclass surfaced.
The 19 concrete error classes re-exported from eden:
EdenError— base class for everything.AgentError— the agent subprocess exited non-zero without hitting the completion signal. Carriesagent_name,exit_code,stderr, andparsed_error(extracted from stdout for Codex / Pi / OpenCode, which surface errors there rather than on stderr).ConfigError— bad arguments, env, or cwd; raised before any side-effect.CopyToWorktreeError— a worktree copy failed. Raised in two places: (1) the isolated provider's worktree clone failed or exceededTimeouts.copy_to_worktree; (2) acopy_to_worktree=entry passed torun()/create_sandbox()/interactive()doesn't exist on disk, or the copy hit a permissions / disk-space error. Carriessource,target,timeout, andtimed_out(true on budget overrun, false on missing-source / permission / disk failure).CwdError— invalidcwd=(missing, not a directory, not in a git repo).EdenTimeoutError— base for time-budget exceedances; subclassesTimeoutError.EnvMergeError— conflictingenvoverrides between caller, agent, and provider.HookError— base for hook failures.HookFailed— a hook command exited non-zero.HookTimeout— a hook exceededTimeouts.hook_step(or its owntimeout).IdleTimeout— agent stdout was silent pastidle_timeout.InvalidOptions— generic kwarg validation failure.PromptError—prompt/prompt_file/prompt_argsresolution failed.RestAuthError— 401/403 from a cloud provider's REST API.RestError— base for any non-2xx REST response (orstatus=0connection failure).RestNotFoundError— 404 from a cloud provider.RestRateLimited— 429 after retries were exhausted.SessionCaptureFailed— the orchestrator could not locate or read a session JSONL; soft failure surfaced as a warning event.SessionNotFound— raised at run start whenresume_session=<id>references a JSONL that does not exist on the host filesystem. The orchestrator runs this precheck before spawning the agent so the failure surfaces host-side with the expected path, rather than buried in agent stderr. Carriessession_id,agent_name, optionalexpected_path, andhint.StepTimeout— an iteration exceededTimeouts.iteration_step.StructuredOutputError—output=Output.{object,string}(...)failed to extract or validate. Carriestag,raw_matched(the matched contents orNone),branch, optionalpreserved_worktree_path, and — when the failing iteration was captured —session_idandsession_file_pathso claude_code callers can resume that conversation with corrective feedback viaresume_session=. Raised on missing tag, invalid JSON, or schema validation failure.
Eden emits OpenTelemetry spans for the iteration loop, sandbox lifecycle, hooks, and REST requests. The runtime depends on opentelemetry-api>=1.20; without an installed SDK, OTel's no-op tracer makes every span a zero-cost noop. To collect traces in your application, install opentelemetry-sdk and configure a provider/exporter — eden picks up whatever provider is set globally.
Spans emitted:
| Span | Where | Key attributes |
|---|---|---|
eden.run |
one per eden.run() / eden.aio.run() call |
agent.name, agent.model, sandbox.name, sandbox.kind, branch, max_iterations, caller_managed, iterations, completion_signal |
eden.sandbox.create |
wraps Sandbox.create + OnSandboxReady hooks |
sandbox.name, sandbox.kind, branch |
eden.agent.exec |
one per agent invocation (per iteration) | agent.name, agent.model, iteration.index, branch |
eden.hook |
one per host or sandbox hook command | hook.location (host/sandbox), hook.phase, hook.command, hook.timeout_s |
eden.rest.request |
one per RestClient HTTP request |
http.method, http.url, http.status_code, http.retry_count |
All spans record exceptions via Span.record_exception() and set status to ERROR on raise — failures show up in your trace UI without extra wiring.
Every span also emits two metrics derived from its name:
<span>.count— counter, attributeoutcome∈{ok, error}.<span>.duration_seconds— histogram, sameoutcomeattribute.
So eden.run.count{outcome="error"} gives you the failure rate across runs, and eden.agent.exec.duration_seconds (P50/P95) tells you whether iterations are getting slower over time. Wire up an OTel MeterProvider alongside the TracerProvider to receive them.
A minimal SDK setup for local debugging:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
import eden
eden.run(agent=..., sandbox=..., prompt="...")See ADR 0012 for the design rationale and instrumented site list.
import eden
print(eden.__version__)eden.__version__ exposes the installed package version (read via importlib.metadata). Unit tests assert the value matches pyproject.toml.