From ccc1b5eaeea033a9276fcaa17d897abe3db582b6 Mon Sep 17 00:00:00 2001 From: Matt Galligan Date: Fri, 5 Jun 2026 11:05:53 -0400 Subject: [PATCH 1/2] docs: plan lazy thread sync --- .agents/plans/lazy-thread-sync/GOAL.md | 22 +++ .agents/plans/lazy-thread-sync/PLAN.md | 189 ++++++++++++++++++++++++ .agents/plans/lazy-thread-sync/REFS.md | 104 +++++++++++++ .agents/plans/lazy-thread-sync/RETRO.md | 58 ++++++++ 4 files changed, 373 insertions(+) create mode 100644 .agents/plans/lazy-thread-sync/GOAL.md create mode 100644 .agents/plans/lazy-thread-sync/PLAN.md create mode 100644 .agents/plans/lazy-thread-sync/REFS.md create mode 100644 .agents/plans/lazy-thread-sync/RETRO.md diff --git a/.agents/plans/lazy-thread-sync/GOAL.md b/.agents/plans/lazy-thread-sync/GOAL.md new file mode 100644 index 0000000..06b541e --- /dev/null +++ b/.agents/plans/lazy-thread-sync/GOAL.md @@ -0,0 +1,22 @@ +# Lazy Thread Sync — goal prompt + +```text +/goal From `/Users/mg/Developer/outfitter/dispatch`, implement lazy sync for existing Codex threads so pickup is instant and honest. + +Read first: `AGENTS.md`, `.agents/plans/PLANNING.md`, `.agents/plans/lazy-thread-sync/PLAN.md`, `.agents/plans/lazy-thread-sync/REFS.md`, `.agents/plans/lazy-thread-sync/RETRO.md`, `docs/adrs/0005-lane-authority-capability-ladder.md`, `docs/adrs/0011-codex-session-registration-is-explicit.md`, and current attach/discover/sync-adjacent code/tests. Verify live branch/state before editing. + +Objective: ship metadata-only `dispatch lane attach ` plus explicit progressive `dispatch lane sync ` using compact App Server metadata and bounded Codex JSONL top+tail indexing. Update registry state, CLI/MCP/schema projections, docs, skills, and tests. Preserve observe-only attached-lane authority. + +Constraints: work on `feat/lazy-thread-sync` or create it from main if missing. Follow contract-first/no-drift patterns; add ops through the registry, not hand-written surface forks. Do not unlock attached-lane writes, copy whole transcripts by default, index all unattached threads by default, merge, publish, mutate releases, commit secrets, or disturb live Codex threads. If live Codex smoke is useful, use temp `DISPATCH_HOME`, read-only metadata/sync only, no sends/stops/renames/archives. + +Loop: define success before each slice; make small reversible changes; run focused tests first; update `RETRO.md`; do local review; fix P0/P1/P2; repeat until the feature is correct and you would ship it. Use bounded subagents for parser/schema/docs/review scouting, but verify their claims and keep synthesis, edits, commits, PRs, and tracker/source-control writes centralized. + +Verification: parser/index fixture tests; attach tests proving default attach does not call `thread/resume`; registry migration tests; CLI/schema/MCP parity tests for `lane sync`; docs/skills checks; safe local runtime smoke; final `just check`. + +Source control: commit coherent slices with conventional messages. Submit draft PR(s) only after local checks/review are clean. Do not mark ready, merge, or publish without explicit user approval. + +Stop if unsafe ambiguity, missing credentials, unauthorized external side effects, impossible verification, or the same blocker repeats 3 times. + +Done only when metadata-only attach, explicit sync, sync state surfaces, docs/skills/schema parity, tests, safe local smoke, green `just check`, and local review with no unresolved P0/P1/P2 are all complete; `RETRO.md` must record changed files, checks, review rounds, risks, forbidden-action audit, PR/branch state, and exact proof the completion condition is satisfied or blocked. +``` + diff --git a/.agents/plans/lazy-thread-sync/PLAN.md b/.agents/plans/lazy-thread-sync/PLAN.md new file mode 100644 index 0000000..4788640 --- /dev/null +++ b/.agents/plans/lazy-thread-sync/PLAN.md @@ -0,0 +1,189 @@ +# Lazy Thread Sync — implementation plan + +Goal packet for making existing Codex thread pickup instant without eager +`thread/resume` or transcript copying. Pasteable goal: [`GOAL.md`](./GOAL.md). +Execution ledger: [`RETRO.md`](./RETRO.md). References: [`REFS.md`](./REFS.md). + +## Objective + +Ship progressive sync for attached Codex lanes: + +- `dispatch lane attach ` registers existing Codex threads quickly + from compact metadata, without `thread/resume` by default. +- `dispatch lane sync ` builds or refreshes dispatch's local indexed view + with bounded top+tail parsing of Codex JSONL artifacts. +- `lane get`, `lane list`, and unmanaged discovery expose honest sync state. +- The daemon can keep attached lanes warm without whole-home indexing. +- CLI, MCP, schemas, docs, and first-party skills stay derived/aligned. + +## Current Evidence + +The local investigation found the right split of responsibilities: + +- `thread/list(useStateDbOnly:true)` is the official cheap discovery path. +- `thread/read(includeTurns:false)` verifies a thread id and returns compact + official metadata in a few milliseconds. +- `thread/resume` is too heavy and side-effectful for default attach: on the + sampled thread it returned about 87 KB, emitted notifications, loaded the + thread, and took 1-3 seconds. +- Local JSONL top+tail parsing is cheap enough for progressive sync, but must + cap bytes as well as lines because early records can be large. +- `Thread.path` is explicitly unstable in the current App Server schema, so + dispatch should treat it as a cached source pointer with file identity, not a + durable id. + +The packet summarizes the local notes in [`REFS.md`](./REFS.md) so execution +does not depend on chat history. + +## Product Contract + +Expected operator shape: + +```bash +dispatch lane list --unmanaged --limit 20 +dispatch lane attach +dispatch lane attach --sync +dispatch lane sync +dispatch lane sync --full +dispatch lane get +dispatch lane list +dispatch schema "lane sync" +``` + +Semantics: + +- `attach` defaults to metadata-only and uses `thread/read(includeTurns:false)`. +- `attach --sync` performs a quick sync after registration. +- `sync` means progressive indexing, not hydration and not transcript copying. +- `sync --full` may backfill more aggressively if implemented, but should still + be bounded and explicit; do not block the main product slice on perfect full + archival indexing. +- Existing attached lanes remain observe-only. Do not unlock send/stop/archive + or history writes for attached desktop lanes. +- `lane tail` may keep using official `thread/read(includeTurns:true)` for + persisted turn summaries, but should not be described as a fast tail API. + +## Implementation Slices + +### Slice 1 — Codex source adapters + +- Add a focused module for Codex thread metadata and JSONL source parsing. +- Wrap official App Server metadata reads/lists instead of making handlers + inspect raw schema blobs. +- Add a JSONL top+tail parser with: + - line and byte caps; + - complete-line handling; + - partial-final-line tolerance; + - file identity capture: path, device, inode, size, mtime; + - sanitized metadata extraction, not raw transcript dumping. +- Add fixture tests for large first records, tail offsets, partial final lines, + missing/moved files, and path identity changes. + +### Slice 2 — Registry sync index + +- Bump the registry schema and add the smallest useful sync tables. +- Start with source/snapshot facts before adding item-level history. +- Candidate fields: + - thread id, source path, device, inode, size, mtime; + - sync state: `metadata`, `partial`, `complete`, `error`; + - last synced at, latest event at, latest turn id, error; + - display name, preview/excerpt policy, cwd, source, model/provider, session id; + - top cursor/tail cursor/backfill cursor. +- Add migration/open tests, model validation tests, and older-db/newer-db checks. + +### Slice 3 — Metadata-only attach and explicit sync op + +- Change `attach` to verify with `thread/read(includeTurns:false)`, register the + lane, store metadata sync state, and never call `thread/resume` by default. +- Add `sync` as a first-class op projected to CLI/MCP/schema. +- Route `dispatch lane sync` through the op registry; do not hand-write a + special surface. +- Make idempotency and error behavior explicit: + - missing thread -> clean App Server/not-found style error; + - missing source file -> registered lane can still exist with sync error; + - repeated sync -> updates snapshot/cursors. +- Update tests proving attach no longer resumes. + +### Slice 4 — Read surfaces and daemon warming + +- Include sync state in `lane get` and `lane list`. +- Keep `lane list --unmanaged` official and cheap, using `thread/list` state DB. +- Add bounded daemon background sync for attached lanes if it stays simple: + - quick sync after attach when configured; + - poll/watch attached source file size/mtime; + - append-only parsing from stored cursor; + - conservative defaults. +- Add config only where needed: + +```toml +[sync] +quick_on_attach = true +watch_attached = true +unattached = "off" # off | cwd | recent | all +tail_lines = 200 +tail_bytes = 262144 +top_bytes = 262144 +max_backfill_bytes_per_tick = 1048576 +``` + +Do not implement broad automatic unattached indexing unless the slice remains +small and private by default. + +### Slice 5 — Docs, skills, schemas, and local proving + +- Update README/docs/usage/skills/plugin docs for the new attach/sync semantics. +- Update ADRs or add one if a durable decision changed: + - likely: progressive sync/index cache and metadata-only attach. +- Ensure `dispatch schema "lane sync"` and MCP grouped tools expose the new op. +- Run focused tests first, then `just check`. +- Put the feature through local paces: + - isolated registry/`DISPATCH_HOME`; + - fixture JSONL sync; + - real App Server metadata read if available; + - optional real existing Codex thread attach/sync smoke in a temp dispatch + home, read-only, no sends/stops/renames/archives. + +## Review Loop + +Use local review between meaningful slices: + +- request/perform review with score out of 5; +- P0/P1/P2 block completion; +- P3 can be fixed if cheap or recorded in `RETRO.md`; +- update `RETRO.md` after each slice, check run, review round, and material + decision. + +Subagents are useful for: + +- fixture/parser audit; +- App Server schema/current-doc check; +- docs/skill drift review; +- final code quality review. + +Subagents must not commit, push, mutate PRs, merge, publish, or touch live user +agent state. + +## Source Control + +- Work from branch `feat/lazy-thread-sync`. +- Use Graphite if submitting PRs. +- Keep commits coherent. One commit is fine if the final diff stays reviewable; + split only when it genuinely helps review. +- PRs stay draft until local checks and local review are clean. +- Do not merge, publish, or alter release state without explicit user approval. + +## Done + +Done only when: + +- metadata-only attach, `lane sync`, sync index, read surfaces, and docs/skills + are implemented or explicitly pared down with evidence; +- the code path no longer relies on eager `thread/resume` for default attach; +- schema/MCP/CLI parity tests pass; +- parser/index fixtures cover the failure modes above; +- local runtime smoke demonstrates attach/sync behavior safely; +- `just check` is green; +- local review has no unresolved P0/P1/P2; +- `RETRO.md` records final state, checks, review result, remaining risks, and + any deferred follow-up issues. + diff --git a/.agents/plans/lazy-thread-sync/REFS.md b/.agents/plans/lazy-thread-sync/REFS.md new file mode 100644 index 0000000..9dd8c5d --- /dev/null +++ b/.agents/plans/lazy-thread-sync/REFS.md @@ -0,0 +1,104 @@ +# Lazy Thread Sync — references + +## Local Investigation Summary + +The local investigation note is gitignored at +`.agents/notes/2026-06-05-codex-storage-investigation.md`. This file captures +the load-bearing findings so the goal packet does not depend on chat history or +ignored notes. + +Evidence gathered on 2026-06-05: + +- Codex CLI: `codex-cli 0.137.0-alpha.4`. +- dispatch CLI: `dispatch 0.2.1`. +- Sample thread id: `019e9598-9214-7ed1-ac40-52d6d675d3e7`. +- Sample JSONL artifact size: about 1.49 MB, 812 records. +- `~/.codex` symlinked to `/Users/mg/.config/codex`. + +Observed stores: + +- `session_index.jsonl`: display thread names and updated timestamps. +- `state_5.sqlite`: thread metadata, rollout path, cwd, title/preview, + model/provider, archive state, dynamic tools, spawn edges, agent jobs. +- `goals_1.sqlite`: native goals. +- `logs_2.sqlite`: logs/telemetry, not transcript structure. +- `sessions/YYYY/MM/DD/rollout-....jsonl`: append-oriented JSONL. +- `shell_snapshots/..sh`: shell snapshots. + +Important mismatch: + +- `state_5.sqlite.threads.title` and `preview` can be the raw opening prompt. +- `session_index.jsonl.thread_name` and App Server `Thread.name` are better + user-facing display-name sources. + +Schema findings from `codex app-server generate-json-schema --out `: + +- `thread/list` supports `useStateDbOnly`, pagination, cwd/search/source/model + filters, and returns rows under `result.data`. +- `thread/read` only accepts `threadId` and `includeTurns`. +- `thread/resume` does not currently expose a usable turn-page parameter. +- `Thread.path` is marked `[UNSTABLE]`; store path plus file identity as a + pointer, not the durable id. +- `Thread.turns` is populated only for resume/fork/rollback/read with + `includeTurns:true`. + +Timing on the sample thread: + +- `thread/list(useStateDbOnly:true, limit=10)`: about 10 ms, about 33.5 KB. +- `thread/list(useStateDbOnly:false, limit=10)`: about 133 ms. +- `thread/read(includeTurns:false)`: about 3 ms, about 4 KB. +- `thread/read(includeTurns:true)`: about 10-15 ms, about 86 KB. +- `thread/resume`: about 1.1-3.0 seconds, about 87 KB, loaded the thread and + emitted notifications. +- local first 8 JSONL records: about 0.46 ms, about 114 KB. +- local tail 256 KiB window: about 0.88 ms, 126 complete records. +- local full 1.49 MB parse: about 5 ms. + +JSONL shape on the sample: + +- top-level types: `response_item`, `event_msg`, `session_meta`, `turn_context`. +- useful payload kinds: `thread_goal_updated`, `function_call`, + `function_call_output`, `token_count`, `message`, `agent_message`, + `reasoning`, `custom_tool_call`, `custom_tool_call_output`, + `patch_apply_end`, `task_complete`, `user_message`. +- first `session_meta` starts at byte 0 and includes thread id, cwd, source, + thread source, model provider, cli version, dynamic tools, and git keys. +- `turn_context` includes model, effort, approval policy, sandbox policy, cwd. +- `task_complete` includes duration, turn id, completion timestamp, and final + message length. + +## Repo References + +- `AGENTS.md` +- `.agents/plans/PLANNING.md` +- `docs/development/design.md` +- `docs/adrs/0005-lane-authority-capability-ladder.md` +- `docs/adrs/0011-codex-session-registration-is-explicit.md` +- `docs/adrs/0016-history-goals-and-bounded-watch.md` +- `docs/usage/README.md` +- `skills/dispatch/SKILL.md` +- `skills/dm/SKILL.md` +- `src/outfitter/dispatch/core/handlers.py` +- `src/outfitter/dispatch/core/models.py` +- `src/outfitter/dispatch/core/ops.py` +- `src/outfitter/dispatch/client/client.py` +- `src/outfitter/dispatch/client/models.py` +- `src/outfitter/dispatch/registry/store.py` +- `src/outfitter/dispatch/contracts/derive_cli.py` +- `src/outfitter/dispatch/contracts/derive_mcp.py` +- `tests/core/test_handlers.py` +- `tests/client/test_client.py` +- `tests/surfaces/test_parity.py` +- `tests/surfaces/test_mcp_routing.py` + +## Follow-Up Unknowns + +- Active-write behavior: verify JSONL append behavior while a thread is running. +- Rename propagation: verify display-name update order across App Server, + `session_index.jsonl`, state DB, and JSONL. +- Archived threads: verify `thread/list(archived:true)` behavior. +- Subagent source projection: state DB source can be JSON-shaped strings. +- Rotation/rewrite: detect file shrink, inode change, move, or compact. +- Privacy policy: decide whether excerpts are stored by default, capped, or + opt-in. + diff --git a/.agents/plans/lazy-thread-sync/RETRO.md b/.agents/plans/lazy-thread-sync/RETRO.md new file mode 100644 index 0000000..a798e78 --- /dev/null +++ b/.agents/plans/lazy-thread-sync/RETRO.md @@ -0,0 +1,58 @@ +# Lazy Thread Sync — execution ledger + +This file must be kept current by the goal executor. Do not claim completion +until the final state is recorded here. + +## Starting State + +- Branch: `feat/lazy-thread-sync` +- Base: `main` +- Objective: implement metadata-only attach and explicit progressive sync for + existing Codex threads. +- Current status: planned, not implemented. + +## Execution Log + +- 2026-06-05: Packet created from local Codex storage investigation. + +## Decisions And Divergence + +- None yet. + +## Verification Log + +- None yet. + +## Local Review Log + +- None yet. + +## PR / Source-Control State + +- No PR yet. +- No push/merge/publish/release action yet. + +## Forbidden-Action Audit + +Record confirmation before completion: + +- No attached-lane write authority unlocked without explicit approval. +- No live Codex send/stop/rename/archive performed during smoke testing. +- No broad whole-home indexing enabled by default. +- No transcript bulk copy introduced by default. +- No merge/publish/release mutation without explicit approval. +- No secrets committed. + +## Remaining Risks / Follow-Ups + +- Active-write JSONL append behavior still needs proof. +- Rename propagation still needs proof. +- Archived-thread behavior still needs proof. +- Subagent source projection still needs proof. +- File rotation/rewrite behavior still needs proof. +- Excerpt privacy policy must be explicit if excerpts are stored. + +## Final State + +Incomplete. + From a9514a56bc7d2e1737b2f434a7f1ef1496fa0035 Mon Sep 17 00:00:00 2001 From: Matt Galligan Date: Fri, 5 Jun 2026 11:27:15 -0400 Subject: [PATCH 2/2] feat: add lazy thread sync --- .agents/plans/lazy-thread-sync/RETRO.md | 93 ++++++- README.md | 3 +- .../0002-single-daemon-over-one-app-server.md | 4 +- .../0005-lane-authority-capability-ladder.md | 12 +- docs/adrs/0009-mcp-daemon-lifecycle.md | 4 +- .../0017-progressive-thread-sync-index.md | 97 +++++++ docs/adrs/README.md | 1 + docs/development/design.md | 16 +- docs/usage/README.md | 35 ++- justfile | 1 + pyproject.toml | 2 +- skills/dispatch/SKILL.md | 26 +- src/outfitter/dispatch/client/models.py | 7 +- .../dispatch/contracts/derive_cli.py | 2 + .../dispatch/contracts/derive_mcp.py | 1 + src/outfitter/dispatch/core/handlers.py | 184 +++++++++++- src/outfitter/dispatch/core/models.py | 31 ++- src/outfitter/dispatch/core/ops.py | 14 + src/outfitter/dispatch/core/sync.py | 263 ++++++++++++++++++ src/outfitter/dispatch/daemon/supervisor.py | 24 +- src/outfitter/dispatch/doctor.py | 9 +- src/outfitter/dispatch/registry/models.py | 30 ++ src/outfitter/dispatch/registry/store.py | 225 ++++++++++++++- tests/client/test_models.py | 20 ++ tests/core/test_examples.py | 1 + tests/core/test_handlers.py | 99 ++++++- tests/core/test_sync.py | 128 +++++++++ tests/daemon/test_supervisor.py | 22 +- tests/fakes.py | 3 + tests/registry/test_store.py | 96 +++++++ tests/surfaces/test_mcp_routing.py | 8 + tests/surfaces/test_parity.py | 3 + tests/test_doctor.py | 2 +- uv.lock | 2 +- 34 files changed, 1376 insertions(+), 92 deletions(-) create mode 100644 docs/adrs/0017-progressive-thread-sync-index.md create mode 100644 src/outfitter/dispatch/core/sync.py create mode 100644 tests/core/test_sync.py diff --git a/.agents/plans/lazy-thread-sync/RETRO.md b/.agents/plans/lazy-thread-sync/RETRO.md index a798e78..01f890b 100644 --- a/.agents/plans/lazy-thread-sync/RETRO.md +++ b/.agents/plans/lazy-thread-sync/RETRO.md @@ -9,39 +9,110 @@ until the final state is recorded here. - Base: `main` - Objective: implement metadata-only attach and explicit progressive sync for existing Codex threads. -- Current status: planned, not implemented. +- Current status: implemented locally; checks and smoke are green. ## Execution Log - 2026-06-05: Packet created from local Codex storage investigation. +- 2026-06-05: Implemented metadata-only attach using + `thread/read(includeTurns:false)`, added explicit `sync` op, and projected + `dispatch lane sync ` through CLI/MCP/schema. +- 2026-06-05: Added registry schema v2 tables `lane_sync_sources` and + `lane_snapshots`. +- 2026-06-05: Added bounded Codex JSONL top+tail sync scanner and full-scan mode. +- 2026-06-05: Updated daemon restart behavior so owned lanes resume but attached + lanes stay metadata-only. +- 2026-06-05: Updated README, usage docs, design doc, dispatch skill, ADR-0002, + ADR-0005, ADR-0009, and added ADR-0017. ## Decisions And Divergence -- None yet. +- Quick sync does not report an exact whole-file `line_count`; that would require + walking the whole JSONL file and would defeat the fast-path goal. Full sync + reports `line_count`. +- Supervisor restart no longer resumes attached lanes. It now resumes owned lanes + and performs metadata reads for attached lanes. ## Verification Log -- None yet. +- `uv run pytest tests/core/test_sync.py tests/registry/test_store.py tests/test_doctor.py -q` + -> 15 passed. +- `uv run pytest tests/core/test_sync.py tests/registry/test_store.py tests/core/test_handlers.py tests/client/test_models.py tests/surfaces/test_parity.py tests/surfaces/test_mcp_routing.py tests/test_doctor.py -q` + -> 74 passed. +- `uv run pytest tests/core/test_sync.py tests/registry/test_store.py tests/core/test_handlers.py tests/daemon/test_supervisor.py tests/client/test_models.py tests/surfaces/test_parity.py tests/surfaces/test_mcp_routing.py tests/test_doctor.py -q` + -> 76 passed. +- `uv run ruff check src tests` -> passed. +- `uv run mypy src tests` -> passed. +- `uv run pytest tests/registry/test_store.py tests/core/test_handlers.py -q` + -> 49 passed. +- `uv run pytest tests/core/test_sync.py tests/core/test_handlers.py tests/registry/test_store.py -q` + -> 52 passed. +- `just check` -> ruff passed, format check passed, mypy passed, pytest 164 + passed / 8 deselected, `uv build` succeeded, package contents check + succeeded. +- Seeded stale `dist/stale-review-loop.whl` and + `dist/stale-review-loop.tar.gz`, then ran `just check` -> ruff passed, format + check passed, mypy passed, pytest 167 passed / 8 deselected, stale artifacts + removed before build, `outfitter_dispatch-0.3.0` wheel/sdist built, package + contents check succeeded. +- `uv run dispatch schema "lane sync" | jq -r '.op, (.input.properties | keys | join(",")), .output.title'` + -> `sync`, `full,lane`, `LaneSyncResult`. +- `uv run dispatch lane sync --help` -> showed required `LANE`, `--full`, + `--json`. +- `uv run dispatch lane attach --help` -> showed required `THREAD`, `--sync`, + `--json`. +- Safe runtime smoke with `DISPATCH_HOME=/tmp/dispatch-lazy-sync.qfYNZH`: + `dispatch up`, `lane list --unmanaged --limit 3 --json`, `lane attach + --sync --json`, `lane get --json`, `lane list --json`, `lane sync + --json`, `doctor --no-app-server --json`, `dispatch down`. Result: temp + daemon started/stopped; unmanaged discovery saw persisted sessions; attach + registered one observe-only attached lane; sync state surfaced as `partial` + with source path, source size, latest event timestamp, latest turn id, and no + sync error; registry schema version was 2 with both new tables. +- Post-review safe runtime smoke with temp `DISPATCH_HOME`: `dispatch up`, + `lane list --unmanaged --limit 1 --json`, `lane attach --sync --json`, + `lane get --json`, `lane sync --json`, `dispatch down`. Result: + attach/sync succeeded against a real persisted Codex thread with a 2.6 MB + JSONL source; sync state surfaced as `partial` with no sync error. ## Local Review Log -- None yet. +- P2 found and fixed: quick scan initially walked the full file to count lines, + defeating the long-thread performance goal. Fixed by making partial sync line + count unknown and preserving exact counts for `--full`. +- P2 found and fixed: daemon supervisor still resumed attached lanes after + app-server restart. Fixed by restoring attached lanes with metadata reads only. +- P2/P3 challenge resolved: attach initially persisted lane, sync metadata, and + audit in separate registry calls. Fixed by adding one registry transaction for + lane registration, initial sync state, and attach audit, with rollback coverage. +- P3 found and fixed: malformed `thread/read` metadata could have surfaced as a + generic internal error. Fixed by projecting invalid metadata as an + `app_server` error with a regression test. +- P2 found and fixed in final review loop: JSONL scanning was synchronous disk + I/O inside async handlers. Fixed by running scanner work through + `asyncio.to_thread`. +- P2 found and fixed in final review loop: the top-window parser used line + iteration, so a single huge first JSONL line could exceed the byte cap. Fixed + by reading a fixed byte window and ignoring incomplete trailing records. +- P3 found and fixed in final review loop: stale local `dist/` artifacts could + make `just check` fail after a version bump. Fixed by cleaning wheel/sdist + artifacts before `uv build` in the check recipe. ## PR / Source-Control State -- No PR yet. -- No push/merge/publish/release action yet. +- Draft PR: https://github.com/outfitter-dev/dispatch/pull/31 +- Branch pushed through Graphite: `feat/lazy-thread-sync`. +- No merge/publish/release action yet. ## Forbidden-Action Audit -Record confirmation before completion: - - No attached-lane write authority unlocked without explicit approval. - No live Codex send/stop/rename/archive performed during smoke testing. - No broad whole-home indexing enabled by default. - No transcript bulk copy introduced by default. - No merge/publish/release mutation without explicit approval. - No secrets committed. +- Temp `DISPATCH_HOME` created for smoke was stopped and removed. ## Remaining Risks / Follow-Ups @@ -50,9 +121,9 @@ Record confirmation before completion: - Archived-thread behavior still needs proof. - Subagent source projection still needs proof. - File rotation/rewrite behavior still needs proof. -- Excerpt privacy policy must be explicit if excerpts are stored. +- Excerpt privacy policy must be explicit if future sync stores transcript + excerpts. This slice stores preview and compact facts only. ## Final State -Incomplete. - +Complete locally and submitted as draft PR #31. diff --git a/README.md b/README.md index e98c35d..888151c 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ uv run dispatch down Use owned lanes for writes. Existing desktop Codex threads can be attached, but v0 treats attached lanes as observe-only: mutating commands such as `send`, `stop`, `lane archive`, `goal set`, `goal clear`, `lane fork`, `lane rollback`, and `lane compact` are blocked by -ADR-0005. +ADR-0005. Attach is metadata-only by default; use `dispatch lane sync ` when you want +dispatch to refresh its local indexed view of an attached thread. For the operator guide, CLI/MCP examples, triggers, and plugin setup, start at [`docs/usage/README.md`](docs/usage/README.md). diff --git a/docs/adrs/0002-single-daemon-over-one-app-server.md b/docs/adrs/0002-single-daemon-over-one-app-server.md index c289667..3651252 100644 --- a/docs/adrs/0002-single-daemon-over-one-app-server.md +++ b/docs/adrs/0002-single-daemon-over-one-app-server.md @@ -4,7 +4,7 @@ slug: single-daemon-over-one-app-server title: Single Daemon over One App Server status: accepted created: 2026-06-02 -updated: 2026-06-02 +updated: 2026-06-05 owners: ['[galligan](https://github.com/galligan)'] --- @@ -29,7 +29,7 @@ We drive the App Server binary directly (not the `openai-codex` SDK, which pins ### Tradeoffs -- A process to keep alive (launchd) and supervise (restart + re-resume on app-server crash). +- A process to keep alive (launchd) and supervise (restart + restore lane observation on app-server crash). - A second App Server alongside the desktop app shares `~/.codex` — cross-process safety on a shared thread is unverified (see ADR-0005 / Phase-1 spike). ## Alternatives considered diff --git a/docs/adrs/0005-lane-authority-capability-ladder.md b/docs/adrs/0005-lane-authority-capability-ladder.md index 8f2d8c4..d3879c1 100644 --- a/docs/adrs/0005-lane-authority-capability-ladder.md +++ b/docs/adrs/0005-lane-authority-capability-ladder.md @@ -4,7 +4,7 @@ slug: lane-authority-capability-ladder title: Lane Authority Capability Ladder status: accepted created: 2026-06-02 -updated: 2026-06-03 +updated: 2026-06-05 owners: ['[galligan](https://github.com/galligan)'] --- @@ -14,20 +14,20 @@ owners: ['[galligan](https://github.com/galligan)'] ## Context -dispatch drives both lanes it **owns** (created via `open`) and lanes it **attaches** to (existing desktop threads via `resume`). Owned lanes have no other writer. Attached lanes are also live in the desktop Codex app — a separate app-server process over the same `~/.codex` store. We verified that a second connection resuming a *persisted* thread receives live event fan-out, but we did **not** verify that two app-servers running turns on the same thread concurrently is safe. Critically, dispatch's planned advisory lock is **dispatch-local**: it cannot stop the desktop app, which knows nothing about it. +dispatch drives both lanes it **owns** (created via `open`) and lanes it **attaches** to (existing desktop threads registered from App Server metadata). Owned lanes have no other writer. Attached lanes are also live in the desktop Codex app — a separate app-server process over the same `~/.codex` store. We verified that a second connection resuming a *persisted* thread receives live event fan-out, but we did **not** verify that two app-servers running turns on the same thread concurrently is safe. Critically, dispatch's planned advisory lock is **dispatch-local**: it cannot stop the desktop app, which knows nothing about it. ## Decision Authority over a lane is a ladder, not a flag: - **Owned lanes** (dispatch created them): full read/write, always. -- **Attached lanes** (existing desktop threads): **observe-only by default** — resume, read history, subscribe to events; no `send`/`steer`/`brief`/`interrupt`. +- **Attached lanes** (existing desktop threads): **observe-only by default** — read metadata, sync a local index, read history; no `send`/`steer`/`brief`/`interrupt`. - **idle-only-write** and **full-write** on attached lanes are unlocked only when **(a)** the slice-0 cross-process spike shows it is safe, **and (b)** the user explicitly opts in (per-lane or global). ## Assumptions (must hold; verify before relying) 1. Owned lanes truly have no concurrent external writer. -2. Merely *observing* an attached lane (resume + read events) is safe alongside the desktop app — to be confirmed by the spike, not assumed. +2. Merely *observing* an attached lane (metadata reads, history reads, and explicit sync) is safe alongside the desktop app — to be confirmed by the spike, not assumed. 3. The spike can actually distinguish "safe" from "racy" for concurrent turns on a shared thread. ## Consequences @@ -44,7 +44,7 @@ Two `codex app-server` processes shared one isolated `CODEX_HOME` (modelling our - **Live fan-out does NOT cross processes:** while A ran a turn, B (resumed) received **zero** live events. Live event fan-out is intra-process only (one app-server process). The spike-04 "resume = live co-presence" finding holds only for multiple connections to the *same* server process — which is exactly dispatch's own topology (ADR-0002), not the desktop-vs-daemon case. - **Concurrent turns are uncoordinated:** A and B each ran a turn on the shared thread with no error returned, but there is no cross-process interlock (dispatch's advisory lock is dispatch-local and cannot gate the desktop app), so "no error" is not "safe." -**Decision:** keep attached lanes **observe-only** for v0. Observation is limited to resume + history read + periodic re-read (no live cross-process stream). The idle-only-write and full-write rungs stay locked; unlocking them needs a real cross-process interlock, which Codex does not expose today. This is the safe default the ladder already proposed — the spike confirms rather than relaxes it. +**Decision:** keep attached lanes **observe-only** for v0. Observation is limited to metadata reads, explicit sync, history read, and periodic re-read (no live cross-process stream). ADR-0017 makes default attach metadata-only instead of `thread/resume`-based. The idle-only-write and full-write rungs stay locked; unlocking them needs a real cross-process interlock, which Codex does not expose today. This is the safe default the ladder already proposed — the spike confirms rather than relaxes it. ## Alternatives considered @@ -53,4 +53,4 @@ Two `codex app-server` processes shared one isolated `CODEX_HOME` (modelling our ## References -- ADR-0002 (Single Daemon over One App Server); `docs/research/app-server-verification.md` (resume fan-out, cross-process untested); `PLAN.md` Phase-1 slice-0 spike. +- ADR-0002 (Single Daemon over One App Server); ADR-0017 (Progressive Thread Sync Index); `docs/research/app-server-verification.md` (resume fan-out, cross-process untested); `PLAN.md` Phase-1 slice-0 spike. diff --git a/docs/adrs/0009-mcp-daemon-lifecycle.md b/docs/adrs/0009-mcp-daemon-lifecycle.md index b0ef051..d681a78 100644 --- a/docs/adrs/0009-mcp-daemon-lifecycle.md +++ b/docs/adrs/0009-mcp-daemon-lifecycle.md @@ -4,7 +4,7 @@ slug: mcp-daemon-lifecycle title: MCP Daemon Lifecycle — Auto-Start Detached Singleton status: accepted created: 2026-06-02 -updated: 2026-06-03 +updated: 2026-06-05 owners: ['[galligan](https://github.com/galligan)'] --- @@ -38,7 +38,7 @@ Phase 5 implemented: - `dispatch up` / `dispatch down` as a detached singleton lifecycle. - MCP startup that connects through the same control socket path and fails with bounded timeouts. - Stale pidfile safety: `down` only signals a pid after a live socket probe confirms a daemon is answering. -- Supervisor restart/re-resume of persisted lanes after app-server EOF. +- Supervisor restart that resumes owned lanes and keeps attached lanes metadata-only after app-server EOF. - A launchd plist generator; actual `launchctl` installation remains a deliberate user action. ## Consequences diff --git a/docs/adrs/0017-progressive-thread-sync-index.md b/docs/adrs/0017-progressive-thread-sync-index.md new file mode 100644 index 0000000..48a48a7 --- /dev/null +++ b/docs/adrs/0017-progressive-thread-sync-index.md @@ -0,0 +1,97 @@ +--- +id: 0017 +slug: progressive-thread-sync-index +title: Progressive Thread Sync Index +status: accepted +created: 2026-06-05 +updated: 2026-06-05 +owners: ['[galligan](https://github.com/galligan)'] +--- + +# ADR-0017: Progressive Thread Sync Index + +## Context + +dispatch can discover existing Codex desktop threads through App Server +`thread/list(useStateDbOnly:true)` and can read compact thread metadata through +`thread/read(includeTurns:false)`. That is enough to make first pickup feel fast +and honest. + +The previous attach shape leaned on `thread/resume`, which is too heavy for a +default registration path. Resume can load persisted turns, can be slow on long +threads, and suggests a live co-presence model that ADR-0005 explicitly does not +grant for desktop-vs-dispatch app-server processes. + +At the same time, operators need more than a raw thread id. They need enough +local state to identify, list, and inspect attached lanes without copying every +transcript into dispatch up front. + +## Decision + +Attach is metadata-only by default: + +- `dispatch lane attach ` verifies the id with + `thread/read(includeTurns:false)`. +- It registers an observe-only attached lane and stores metadata sync state. +- It does not call `thread/resume`, load turn history, or grant write authority. + +Progressive sync is explicit: + +- `dispatch lane attach --sync` runs a quick sync after registration. +- `dispatch lane sync ` refreshes dispatch's local indexed view. +- `dispatch lane sync --full` scans the whole current source file and marks + that cache complete for the current file identity. + +The sync index is a compact SQLite cache: + +- `lane_sync_sources` records sync state, source path, file identity, size/mtime, + parsed offsets, line count when known, last sync time, and errors. +- `lane_snapshots` records display name, preview, cwd, source/model/session facts, + latest event timestamp, latest turn id, and whether the transcript view is + partial. +- Quick sync reads bounded top+tail JSONL records from Codex's local rollout path + when App Server exposes one. It captures early metadata plus recent state, then + can be backfilled later. +- Partial sync does not promise exact whole-file counts. Exact counts are a + full-scan property. + +`lane tail` remains the explicit history surface and continues to use official +App Server `thread/read(includeTurns:true)` persisted history. + +## Consequences + +### Positive + +- First attach is cheap and side-effect-minimal. +- Long-lived threads can be picked up immediately, then backfilled when useful. +- List/get surfaces can show sync state without needing a transcript read. +- Attached-lane authority remains aligned with ADR-0005. + +### Tradeoffs + +- The sync cache can be partial or stale; surfaces must expose sync state honestly. +- JSONL rollout paths and payload shapes are Codex implementation details. The + parser must be conservative and tolerate missing files, invalid lines, and + partial final writes. +- `--full` is intentionally more expensive and should remain opt-in. + +## Alternatives considered + +- **Eager `thread/resume` on attach** — rejected: slower, heavier, and implies a + co-presence model we do not have across desktop and dispatch app-server + processes. +- **Copy full transcripts into dispatch by default** — rejected: bad first-run + latency and unnecessary data duplication. +- **Only use App Server history reads, no local index** — rejected: keeps attach + simple but leaves list/get surfaces without cheap recency and identity facts. +- **Automatically sync every unattached Codex thread** — rejected for v0: it is + surprising, potentially expensive, and conflicts with ADR-0011's explicit + registration policy. + +## References + +- ADR-0005 (Lane Authority Capability Ladder) +- ADR-0011 (Codex Session Registration Is Explicit) +- ADR-0016 (History, Goals, and Bounded Watch) +- `docs/research/app-server-verification.md` +- `.agents/plans/lazy-thread-sync/PLAN.md` diff --git a/docs/adrs/README.md b/docs/adrs/README.md index e3dcdbe..820675f 100644 --- a/docs/adrs/README.md +++ b/docs/adrs/README.md @@ -25,3 +25,4 @@ Files are `NNNN-slug.md`. Copy [`template.md`](template.md) to start one. Keep t | [0014](0014-mesh-auth-discovery-and-durable-queues.md) | Mesh Auth, Discovery, and Durable Queues | Proposed | | [0015](0015-new-command-config-presets-and-name-prefixes.md) | New Command, Config Presets, and Name Prefixes | Proposed | | [0016](0016-history-goals-and-bounded-watch.md) | History, Goals, and Bounded Watch | Accepted | +| [0017](0017-progressive-thread-sync-index.md) | Progressive Thread Sync Index | Accepted | diff --git a/docs/development/design.md b/docs/development/design.md index f9d070c..80b41e2 100644 --- a/docs/development/design.md +++ b/docs/development/design.md @@ -75,8 +75,9 @@ Projections (pure functions over the registry, mirroring Trails' `derive* → cr - Daemon lifecycle: `up` / `down` (process) · `daemon status` · `daemon log` - Lane creation: `new [--preset ...] [--text ...] [--no-send]` - Lane reads/discovery: `lane get ` · `lane status ` · `lane list` · - `lane list --unmanaged` · `lane tail ` · `lane tail --follow` -- Lane management/history: `lane attach ` · `lane rename ` · + `lane list --unmanaged` · `lane sync ` · `lane tail ` · + `lane tail --follow` +- Lane management/history: `lane attach [--sync]` · `lane rename ` · `lane fork ` · `lane rollback ` · `lane compact ` · `lane archive ` - Sending: `send "…"` with `--mode send|steer|queue|interject|context` @@ -97,7 +98,8 @@ boundary rather than forced to be one tool per op. The noun for a managed thread | --- | --- | --- | | `open` | `thread/start` (then register) | `sandbox` is a STRING enum (`read-only`/`workspace-write`/`danger-full-access`); persists by default (`ephemeral:false`) → spawned lanes show in desktop app, matching the `→ @project:name` convention. | | `new` | `thread/start` + `thread/name/set` + optional `turn/start` | Applies `.dispatch/config.toml` defaults/presets, name prefixes, verified session/turn options, and optional initial payload. | -| `attach` | `thread/resume` (+ register) | Cross-process discovery and history resume work, but live fan-out does **not** cross app-server processes. Attached desktop lanes are observe-only in v0 (ADR-0005). Pre-persistence resume errors `no rollout found`. | +| `attach` | `thread/read(includeTurns:false)` (+ register) | Metadata-only by default: verifies the thread id, registers an observe-only attached lane, and stores sync state without loading turn history. `--sync` runs a quick local index refresh after registration. | +| `sync` (`lane sync`) | `thread/read(includeTurns:false)` + bounded local JSONL parsing | Refreshes dispatch's index/cache for a lane: source file identity, sync state, latest event timestamp, latest turn id, preview, and selected metadata. Does not copy transcripts wholesale or grant attached-lane write authority. | | `send` (`mode=send`) | `turn/start` | Delivers a message the lane processes + answers. The DM/`send_message_to_thread` equivalent. `sandboxPolicy` here is an OBJECT (`{type:"readOnly"}`) — different encoding than `thread/start.sandbox`. | | `send` (`mode=queue`) | registry queue + later `turn/start` | Persists local queued delivery and starts one queued turn when the lane becomes idle. | | `send` (`mode=steer`) | `turn/steer` | Requires `expectedTurnId` (the active turn id from `turn/started`). Adds input to an in-flight turn. | @@ -130,7 +132,7 @@ The scheduler is **our own** (asyncio): a time wheel for time triggers + the rea ## Lanes: owned write, attached observe-only -The daemon drives threads it spawns (`new`, backed by the lower-level `open` op) with full read/write. Existing desktop threads can be registered with `attach`, but they are **observe-only in v0**. The Phase-1 cross-process spike confirmed that a second app-server process can discover and resume persisted history, but live event fan-out does not cross processes and concurrent turns are uncoordinated. Dispatch's advisory lock is dispatch-local; it cannot gate the desktop app. ADR-0005 keeps attached-lane writes locked until there is a real cross-process interlock and an explicit user opt-in. +The daemon drives threads it spawns (`new`, backed by the lower-level `open` op) with full read/write. Existing desktop threads can be registered with `attach`, but they are **observe-only in v0**. The Phase-1 cross-process spike confirmed that a second app-server process can discover and read persisted history, but live event fan-out does not cross processes and concurrent turns are uncoordinated. Dispatch's advisory lock is dispatch-local; it cannot gate the desktop app. ADR-0005 keeps attached-lane writes locked until there is a real cross-process interlock and an explicit user opt-in. ## Approvals (v1 minimal) @@ -147,16 +149,18 @@ The client supports the full responder loop. v1 surfaces `waiting_on_approval` a ## Data model (registry, SQLite) - `lanes`: id, handle (`@name` / `→ @project:name`), role, cwd, source (`own`|`attached`), status, pinned, created_at, updated_at, last_event_at. +- `lane_sync_sources`: lane, sync state, source path/file identity, source size/mtime, parsed offsets, line count, last synced timestamp, error. +- `lane_snapshots`: lane, display name, preview, cwd, source/model/session facts, latest event timestamp, latest turn id, transcript-partial flag. - `triggers`: id, name, lane selector, when-spec (json), action-spec (json), guard-spec (json), enabled, last_fired_at. - `actions_log`: id, ts, lane, op, trigger_id?, request/decision, outcome — full audit of every send/action. ## Error handling / resilience -- app-server subprocess crash → daemon detects stdout EOF → restart → re-`resume` persisted lanes → restart the reactor. +- app-server subprocess crash → daemon detects stdout EOF → restart → restore owned-lane resumes and attached-lane metadata reads → restart the reactor. - Action on a busy lane → direct `send` starts a turn immediately; `send --queue` persists local queued delivery and starts one queued turn when the lane next becomes idle. -- Reconnect → rebuild via `resume` + `thread/read`; rely on persisted history, not replay. +- Reconnect → rebuild via `thread/read` + explicit sync; rely on persisted history, not replay. - Every action audited; per-lane advisory lock for cross-process safety. ## Testing diff --git a/docs/usage/README.md b/docs/usage/README.md index 572473a..6b87171 100644 --- a/docs/usage/README.md +++ b/docs/usage/README.md @@ -277,8 +277,8 @@ Treat it as a conversation-history operation, not a source-control undo. `lane list` lists the lanes dispatch already manages. `lane list --unmanaged` is the other half: it lists the persisted Codex sessions on this machine — desktop threads and prior -runs — that you could attach. It reads the Codex state DB directly (`thread/list`, state-db -only), so it is fast and read-only; it never resumes, writes, or registers anything. +runs — that you could attach. It uses App Server `thread/list` in state-db-only mode, so +it is fast and read-only; it never resumes, writes, or registers anything. ```bash uv run dispatch lane list --unmanaged --limit 20 @@ -289,6 +289,7 @@ Each row carries `id`, `name`, a shortened `preview`, `cwd`, `status`, `source`, ```bash uv run dispatch lane attach +uv run dispatch lane attach --sync ``` Keep the two straight: `lane list --unmanaged` shows attachable Codex sessions (not yet @@ -300,6 +301,7 @@ Attach registers an existing Codex thread by raw thread id: ```bash uv run dispatch lane attach +uv run dispatch lane sync ``` Attached lanes are observe-only in v0. Dispatch can register and inspect them, but it must @@ -307,12 +309,28 @@ not write to them because the desktop app uses a separate app-server process and cross-process write interlock. ADR-0005 is the authoritative decision: [`docs/adrs/0005-lane-authority-capability-ladder.md`](../adrs/0005-lane-authority-capability-ladder.md). -Attach is bounded: the underlying `thread/resume` must complete within a short timeout -(15s). If the app-server is wedged and resume stalls, attach fails with a clear -`app_server` error and registers no lane — it never leaves a half-attached entry behind. -Re-run `attach` once the app-server is healthy. Large persisted histories are expected: -dispatch sizes its App Server stdio reader for realistic `thread/resume` and -`includeTurns` responses instead of treating them as hangs. +Attach is compact by default: it verifies the thread with App Server +`thread/read(includeTurns:false)`, registers the lane, and stores metadata sync state. It +does not call `thread/resume` or load turn history. If the app-server is wedged and the +metadata read stalls, attach fails with a clear `app_server` error and registers no lane — +it never leaves a half-attached entry behind. + +Use `lane sync` to refresh dispatch's local indexed view of an attached lane. Sync reads the +official metadata and, when Codex exposes a local rollout path, parses bounded top+tail JSONL +records into a compact cache: source file identity, sync state, latest event timestamp, +latest turn id, and a preview. It does not copy the full transcript by default. + +```bash +uv run dispatch lane sync +uv run dispatch lane sync --full +uv run dispatch lane get +uv run dispatch lane list +``` + +`lane sync --full` scans the whole current source file and marks the cache complete for that +file identity. It is still an index refresh, not a write to the Codex thread. `lane tail` +continues to use official `thread/read(includeTurns:true)` persisted history when you want +App Server turn summaries. When referring to a Codex thread in docs or prompts, prefer a readable handle with a URI: @@ -386,6 +404,7 @@ derived from the contract registry: ```bash uv run dispatch lane list --json uv run dispatch schema send +uv run dispatch schema "lane sync" uv run dispatch schema "goal set" ``` diff --git a/justfile b/justfile index 9195e5d..1788c7d 100644 --- a/justfile +++ b/justfile @@ -6,6 +6,7 @@ check: uv run ruff format --check . uv run mypy src tests uv run pytest + rm -f dist/*.whl dist/*.tar.gz uv build uv run python scripts/check_package_contents.py diff --git a/pyproject.toml b/pyproject.toml index 8e9fbb7..e7816e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "outfitter-dispatch" -version = "0.2.1" +version = "0.3.0" description = "Local control plane for orchestrating Codex agent lanes over the Codex App Server." readme = "README.md" requires-python = ">=3.13" diff --git a/skills/dispatch/SKILL.md b/skills/dispatch/SKILL.md index e86a674..4e7dbe9 100644 --- a/skills/dispatch/SKILL.md +++ b/skills/dispatch/SKILL.md @@ -29,7 +29,7 @@ The current operator grammar is: - daemon reads: `daemon status`, `daemon log` - create/send: `new`, `send`, `stop` - lanes: `lane get`, `lane status`, `lane list`, `lane list --unmanaged`, - `lane attach`, `lane rename`, `lane tail`, `lane fork`, `lane rollback`, + `lane attach`, `lane sync`, `lane rename`, `lane tail`, `lane fork`, `lane rollback`, `lane compact`, `lane archive` - goals: `goal status`, `goal set`, `goal clear` - triggers: `trigger add`, `trigger list`, `trigger rm`, `trigger pause`, @@ -79,6 +79,7 @@ Attached lanes are existing desktop Codex threads registered by raw thread id: ```bash uv run dispatch lane attach +uv run dispatch lane attach --sync ``` Attached lanes are observe-only in v0. Do not try mutating commands such as @@ -87,17 +88,26 @@ Attached lanes are observe-only in v0. Do not try mutating commands such as writes locked because desktop Codex and dispatch run separate app-server processes and there is no cross-process write interlock. -Attach is bounded: if the app-server stalls, the underlying `thread/resume` -times out (~15s) and `attach` fails with a clear `app_server` error, registering -no lane. There is no half-attached state to clean up. Large persisted histories -are supported; attach should not fail only because a resumed thread has more than -64 KiB of turns. +Attach is compact by default: it verifies the thread with +`thread/read(includeTurns:false)`, registers metadata, and does not resume turn +history. Use `--sync` or `lane sync` when you want dispatch to refresh its local +indexed view. + +```bash +uv run dispatch lane sync +uv run dispatch lane sync --full +``` + +Sync indexes source identity, sync state, latest event time, latest turn id, and +bounded top+tail JSONL facts when Codex exposes a rollout path. It does not copy +the full transcript by default and it does not grant write authority to attached +lanes. ## Discover Sessions `lane list` shows lanes dispatch already manages. `lane list --unmanaged` lists -persisted Codex sessions you could attach, read straight from the Codex state DB. -It is read-only and does not resume or register anything: +persisted Codex sessions you could attach through App Server `thread/list` in +state-db-only mode. It is read-only and does not resume or register anything: ```bash uv run dispatch lane list diff --git a/src/outfitter/dispatch/client/models.py b/src/outfitter/dispatch/client/models.py index 9ed7fee..c09ccc2 100644 --- a/src/outfitter/dispatch/client/models.py +++ b/src/outfitter/dispatch/client/models.py @@ -89,13 +89,18 @@ class ThreadInfo(WireModel): """Subset of the rich thread object the server returns (extra fields ignored).""" id: str + session_id: str | None = None forked_from_id: str | None = None ephemeral: bool | None = None status: ThreadStatus | None = None cwd: str | None = None name: str | None = None + path: str | None = None preview: str | None = None source: str | None = None + thread_source: str | None = None + model_provider: str | None = None + updated_at: int | None = None turns: list[dict[str, object]] = Field(default_factory=list) @@ -194,7 +199,7 @@ class ThreadGoal(WireModel): class ThreadResult(WireModel): - """Result envelope for ``thread/start`` and ``thread/resume``.""" + """Result envelope for thread methods that return one thread.""" thread: ThreadInfo diff --git a/src/outfitter/dispatch/contracts/derive_cli.py b/src/outfitter/dispatch/contracts/derive_cli.py index f6355c2..96a6530 100644 --- a/src/outfitter/dispatch/contracts/derive_cli.py +++ b/src/outfitter/dispatch/contracts/derive_cli.py @@ -37,6 +37,7 @@ class CliRoute: CliRoute(("lane", "get"), "show", ("lane",)), CliRoute(("lane", "status"), "show", ("lane",)), CliRoute(("lane", "attach"), "attach", ("thread",)), + CliRoute(("lane", "sync"), "sync", ("lane",)), CliRoute(("lane", "rename"), "lane-rename", ("old", "new")), CliRoute(("lane", "fork"), "fork", ("lane",)), CliRoute(("lane", "rollback"), "rollback", ("lane",)), @@ -350,6 +351,7 @@ def _schema_op_id(command: str) -> str: "lane-list": "roster", "lane-list-unmanaged": "discover", "lane-attach": "attach", + "lane-sync": "sync", "lane-rename": "lane-rename", "lane-fork": "fork", "lane-rollback": "rollback", diff --git a/src/outfitter/dispatch/contracts/derive_mcp.py b/src/outfitter/dispatch/contracts/derive_mcp.py index 54c85a3..643d36b 100644 --- a/src/outfitter/dispatch/contracts/derive_mcp.py +++ b/src/outfitter/dispatch/contracts/derive_mcp.py @@ -62,6 +62,7 @@ class _ToolGroup: ("open", "open"), ("new", "new"), ("attach", "attach"), + ("sync", "sync"), ("rename", "lane-rename"), ("send", "send"), ("stop", "stop"), diff --git a/src/outfitter/dispatch/core/handlers.py b/src/outfitter/dispatch/core/handlers.py index 3b629a4..cb79d8d 100644 --- a/src/outfitter/dispatch/core/handlers.py +++ b/src/outfitter/dispatch/core/handlers.py @@ -9,9 +9,17 @@ import asyncio +from pydantic import ValidationError as PydanticValidationError + from outfitter.dispatch.client.errors import AppServerError as ClientAppServerError from outfitter.dispatch.client.errors import ClientError -from outfitter.dispatch.client.models import SandboxPolicy, ThreadGoal, ThreadInfo, ThreadSandbox +from outfitter.dispatch.client.models import ( + SandboxPolicy, + ThreadGoal, + ThreadInfo, + ThreadResult, + ThreadSandbox, +) from outfitter.dispatch.contracts.context import Ctx from outfitter.dispatch.contracts.errors import ( AppServerError, @@ -21,7 +29,7 @@ ValidationError, project_error, ) -from outfitter.dispatch.registry.models import Lane +from outfitter.dispatch.registry.models import Lane, LaneSync, SyncState from . import queue from .models import ( @@ -40,8 +48,12 @@ GoalView, LaneDetail, LaneInput, + LaneListItem, LaneRef, LaneRenameInput, + LaneSyncInput, + LaneSyncResult, + LaneSyncView, LaneTextInput, LogInput, LogOutput, @@ -63,13 +75,13 @@ WatchOutput, ) from .new_config import NewSettings, resolve_new +from .sync import scan_codex_jsonl _READ_ONLY = SandboxPolicy(type="readOnly") -# Bound ``thread/resume`` during attach: a persisted resume is a quick state-db read, -# so a stuck one means the app-server is wedged. Fail with a clear error rather than -# hang — and never half-register (the registry write only follows a successful resume). -_RESUME_TIMEOUT_S = 15.0 +# Bound attach metadata reads: if the app-server is wedged, fail clearly and never +# half-register (the registry write only follows a successful metadata read). +_ATTACH_METADATA_TIMEOUT_S = 10.0 _PREVIEW_MAX = 80 @@ -78,6 +90,25 @@ def _ref(lane: Lane) -> LaneRef: return LaneRef(id=lane.id, handle=lane.handle, source=lane.source, status=lane.status) +def _sync_view(sync: LaneSync | None) -> LaneSyncView: + if sync is None: + return LaneSyncView() + return LaneSyncView( + state=sync.state, + last_synced_at=sync.last_synced_at, + source_path=sync.source_path, + source_size=sync.source_size, + latest_event_at=sync.latest_event_at, + latest_turn_id=sync.latest_turn_id, + transcript_partial=sync.transcript_partial, + error=sync.error, + ) + + +def _list_item(lane: Lane, sync: LaneSync | None) -> LaneListItem: + return LaneListItem(**_ref(lane).model_dump(), sync=_sync_view(sync)) + + def _handle(name: str) -> str: return name if name.startswith("@") else f"@{name}" @@ -207,24 +238,139 @@ def _turn_sandbox(sandbox: ThreadSandbox) -> SandboxPolicy: async def attach_lane(inp: AttachInput, ctx: Ctx) -> LaneRef: existing = await ctx.registry.find_lane(inp.thread) if existing is not None: + if inp.sync: + await _sync_lane(existing, ctx, full=False) return _ref(existing) # idempotent re-attach try: - thread = await asyncio.wait_for(ctx.client.thread_resume(inp.thread), _RESUME_TIMEOUT_S) + thread = await asyncio.wait_for( + _read_thread_metadata(ctx, inp.thread), _ATTACH_METADATA_TIMEOUT_S + ) except TimeoutError as exc: # The registry write below never ran, so no lane is half-registered. raise AppServerError( - f"attach timed out: thread/resume for {inp.thread!r} exceeded " - f"{_RESUME_TIMEOUT_S:.0f}s (no lane registered)" + f"attach timed out: thread/read metadata for {inp.thread!r} exceeded " + f"{_ATTACH_METADATA_TIMEOUT_S:.0f}s (no lane registered)" ) from exc handle = thread.name or f"@{inp.thread[:8]}" - lane = await ctx.registry.add_lane( - id=thread.id, handle=handle, source="attached", cwd=thread.cwd, status="idle" + sync = ( + await _sync_from_thread(thread.id, thread, full=False) + if inp.sync + else _metadata_sync(thread.id, thread, state="metadata") + ) + lane, _ = await ctx.registry.add_lane_with_sync( + id=thread.id, + handle=handle, + source="attached", + cwd=thread.cwd, + status="idle", + sync=sync, + audit_op="attach", + audit_detail=handle, ) - await ctx.registry.log_action("attach", lane=lane.id, detail=handle) ctx.log.info("lane.attach", lane=lane.id, handle=handle) return _ref(lane) +async def _read_thread_metadata(ctx: Ctx, thread_id: str) -> ThreadInfo: + result = await ctx.client.thread_read(thread_id, include_turns=False) + try: + return ThreadResult.model_validate(result).thread + except PydanticValidationError as exc: + raise AppServerError( + f"thread/read metadata for {thread_id!r} returned an invalid payload" + ) from exc + + +async def _sync_lane( + lane: Lane, ctx: Ctx, *, full: bool, metadata: ThreadInfo | None = None +) -> LaneSync: + thread = metadata or await _read_thread_metadata(ctx, lane.id) + return await ctx.registry.upsert_lane_sync(await _sync_from_thread(lane.id, thread, full=full)) + + +async def _sync_from_thread(lane_id: str, thread: ThreadInfo, *, full: bool) -> LaneSync: + if thread.path is None: + return _metadata_sync(lane_id, thread, state="metadata") + facts = await asyncio.to_thread(scan_codex_jsonl, thread.path, full=full) + state = facts.state + return _metadata_sync( + lane_id, + thread, + state=state, + source_path=facts.source.path if facts.source else thread.path, + source_device=facts.source.device if facts.source else None, + source_inode=facts.source.inode if facts.source else None, + source_size=facts.source.size if facts.source else None, + source_mtime_ns=facts.source.mtime_ns if facts.source else None, + line_count=facts.line_count, + first_offset=facts.first_offset, + tail_offset=facts.tail_offset, + error=facts.error, + cwd=facts.cwd, + source=facts.source_kind, + thread_source=facts.thread_source, + model_provider=facts.model_provider, + model=facts.model, + reasoning_effort=facts.reasoning_effort, + session_id=facts.session_id, + latest_event_at=facts.latest_event_at, + latest_turn_id=facts.latest_turn_id, + transcript_partial=state != "complete", + ) + + +def _metadata_sync( + lane_id: str, + thread: ThreadInfo, + *, + state: SyncState, + source_path: str | None = None, + source_device: int | None = None, + source_inode: int | None = None, + source_size: int | None = None, + source_mtime_ns: int | None = None, + line_count: int | None = None, + first_offset: int | None = None, + tail_offset: int | None = None, + error: str | None = None, + cwd: str | None = None, + source: str | None = None, + thread_source: str | None = None, + model_provider: str | None = None, + model: str | None = None, + reasoning_effort: str | None = None, + session_id: str | None = None, + latest_event_at: str | None = None, + latest_turn_id: str | None = None, + transcript_partial: bool = True, +) -> LaneSync: + return LaneSync( + lane=lane_id, + state=state, + source_path=source_path or thread.path, + source_device=source_device, + source_inode=source_inode, + source_size=source_size, + source_mtime_ns=source_mtime_ns, + line_count=line_count, + first_offset=first_offset, + tail_offset=tail_offset, + error=error, + display_name=thread.name, + preview=_short(thread.preview, limit=200), + cwd=cwd or thread.cwd, + source=source or thread.source, + thread_source=thread_source or thread.thread_source, + model_provider=model_provider or thread.model_provider, + model=model, + reasoning_effort=reasoning_effort, + session_id=session_id or thread.session_id, + latest_event_at=latest_event_at, + latest_turn_id=latest_turn_id, + transcript_partial=transcript_partial, + ) + + async def send(inp: LaneTextInput, ctx: Ctx) -> ActionAck: lane = await _resolve(ctx, inp.lane) _require_writable(lane) @@ -311,6 +457,7 @@ async def stop(inp: LaneInput, ctx: Ctx) -> ActionAck: async def show(inp: ShowInput, ctx: Ctx) -> LaneDetail: lane = await _resolve(ctx, inp.lane) + sync = await ctx.registry.get_lane_sync(lane.id) transcript: list[TranscriptItem] = [] if inp.include_transcript: result = await ctx.client.thread_read(lane.id, include_turns=True) @@ -322,10 +469,20 @@ async def show(inp: ShowInput, ctx: Ctx) -> LaneDetail: status=lane.status, cwd=lane.cwd, active_turn_id=lane.active_turn_id, + sync=_sync_view(sync), transcript=transcript, ) +async def sync_lane(inp: LaneSyncInput, ctx: Ctx) -> LaneSyncResult: + lane = await _resolve(ctx, inp.lane) + sync = await _sync_lane(lane, ctx, full=inp.full) + await ctx.registry.log_action( + "sync", lane=lane.id, detail=f"state={sync.state}; full={inp.full}" + ) + return LaneSyncResult(lane=lane.id, sync=_sync_view(sync)) + + async def rename_lane(inp: LaneRenameInput, ctx: Ctx) -> LaneRef: lane = await _resolve(ctx, inp.old) handle = _handle(inp.new) @@ -562,7 +719,8 @@ async def compact(inp: CompactInput, ctx: Ctx) -> ActionAck: async def roster(inp: RosterInput, ctx: Ctx) -> Roster: lanes = await ctx.registry.list_lanes(include_archived=inp.include_archived) - return Roster(lanes=[_ref(lane) for lane in lanes]) + syncs = await ctx.registry.get_lane_sync_many([lane.id for lane in lanes]) + return Roster(lanes=[_list_item(lane, syncs.get(lane.id)) for lane in lanes]) def _short(text: str | None, limit: int = _PREVIEW_MAX) -> str | None: diff --git a/src/outfitter/dispatch/core/models.py b/src/outfitter/dispatch/core/models.py index a5ef1bd..08a14a0 100644 --- a/src/outfitter/dispatch/core/models.py +++ b/src/outfitter/dispatch/core/models.py @@ -16,7 +16,7 @@ ThreadGoalStatus, ThreadSandbox, ) -from outfitter.dispatch.registry.models import LaneSource, LaneStatus +from outfitter.dispatch.registry.models import LaneSource, LaneStatus, SyncState # --- inputs ------------------------------------------------------------------- @@ -59,6 +59,7 @@ class NewInput(BaseModel): class AttachInput(BaseModel): thread: str = Field(description="App Server threadId of an existing (desktop) lane.") + sync: bool = Field(default=False, description="Run a quick sync after attaching.") class LaneTextInput(BaseModel): @@ -82,6 +83,11 @@ class LaneInput(BaseModel): lane: str = Field(description="Lane id or @handle.") +class LaneSyncInput(BaseModel): + lane: str = Field(description="Lane id or @handle.") + full: bool = Field(default=False, description="Scan the full source file instead of top+tail.") + + class LaneRenameInput(BaseModel): old: str = Field(description="Existing lane id or @handle.") new: str = Field(description="New lane handle; @ is optional.") @@ -176,6 +182,21 @@ class LaneRef(BaseModel): status: LaneStatus +class LaneSyncView(BaseModel): + state: SyncState = "unknown" + last_synced_at: str | None = None + source_path: str | None = None + source_size: int | None = None + latest_event_at: str | None = None + latest_turn_id: str | None = None + transcript_partial: bool = True + error: str | None = None + + +class LaneListItem(LaneRef): + sync: LaneSyncView = Field(default_factory=LaneSyncView) + + class NewLane(LaneRef): sent: bool @@ -183,6 +204,7 @@ class NewLane(LaneRef): class LaneDetail(LaneRef): cwd: str | None = None active_turn_id: str | None = None + sync: LaneSyncView = Field(default_factory=LaneSyncView) transcript: list[TranscriptItem] = Field(default_factory=list) @@ -233,8 +255,13 @@ class ActionAck(BaseModel): detail: str | None = None +class LaneSyncResult(BaseModel): + lane: str + sync: LaneSyncView + + class Roster(BaseModel): - lanes: list[LaneRef] + lanes: list[LaneListItem] class DiscoveredSession(BaseModel): diff --git a/src/outfitter/dispatch/core/ops.py b/src/outfitter/dispatch/core/ops.py index e91898e..03f1195 100644 --- a/src/outfitter/dispatch/core/ops.py +++ b/src/outfitter/dispatch/core/ops.py @@ -25,6 +25,8 @@ LaneInput, LaneRef, LaneRenameInput, + LaneSyncInput, + LaneSyncResult, LogInput, LogOutput, NewInput, @@ -174,6 +176,17 @@ examples=[Example("missing", input={"lane": "nope", "timeout": 0}, raises=NotFoundError)], ) +SYNC = define_op( + id="sync", + summary="Progressively sync an attached lane's local Codex thread index.", + input=LaneSyncInput, + output=LaneSyncResult, + intent="write", + idempotent=True, + handler=handlers.sync_lane, + examples=[Example("missing", input={"lane": "nope"}, raises=NotFoundError)], +) + ROSTER = define_op( id="roster", summary="List managed lanes.", @@ -374,6 +387,7 @@ LANE_RENAME, TRANSCRIPT, WATCH, + SYNC, ROSTER, DISCOVER, ARCHIVE, diff --git a/src/outfitter/dispatch/core/sync.py b/src/outfitter/dispatch/core/sync.py new file mode 100644 index 0000000..496cd77 --- /dev/null +++ b/src/outfitter/dispatch/core/sync.py @@ -0,0 +1,263 @@ +"""Codex thread source parsing for progressive lane sync. + +Sync indexes compact facts from Codex's persisted JSONL artifacts. It does not +copy transcripts wholesale; App Server history reads remain the semantic source +for transcript commands. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Literal + +SyncScanState = Literal["partial", "complete", "error"] + + +@dataclass(frozen=True) +class SyncLimits: + top_bytes: int = 262_144 + tail_bytes: int = 262_144 + tail_lines: int = 200 + + +DEFAULT_SYNC_LIMITS = SyncLimits() + + +@dataclass(frozen=True) +class SourceIdentity: + path: str + device: int + inode: int + size: int + mtime_ns: int + + +@dataclass(frozen=True) +class JsonlSyncFacts: + state: SyncScanState + source: SourceIdentity | None + line_count: int | None = None + first_offset: int | None = None + tail_offset: int | None = None + latest_event_at: str | None = None + latest_turn_id: str | None = None + session_id: str | None = None + cwd: str | None = None + source_kind: str | None = None + thread_source: str | None = None + model_provider: str | None = None + model: str | None = None + reasoning_effort: str | None = None + error: str | None = None + stable_during_read: bool = True + + +def scan_codex_jsonl( + path: str, *, full: bool = False, limits: SyncLimits = DEFAULT_SYNC_LIMITS +) -> JsonlSyncFacts: + source_path = Path(path).expanduser() + records: list[dict[str, Any]] + line_count: int | None + first_offset: int | None + tail_offset: int | None + try: + before = source_path.stat() + except OSError as exc: + return JsonlSyncFacts(state="error", source=None, error=f"stat failed: {exc}") + + source = SourceIdentity( + path=str(source_path), + device=before.st_dev, + inode=before.st_ino, + size=before.st_size, + mtime_ns=before.st_mtime_ns, + ) + try: + if full: + records, line_count, first_offset, tail_offset = _read_all_records(source_path) + state: SyncScanState = "complete" + else: + records, line_count, first_offset, tail_offset = _read_quick_records( + source_path, limits + ) + state = "partial" + except OSError as exc: + return JsonlSyncFacts(state="error", source=source, error=f"read failed: {exc}") + + summary = _summarize_records(records) + error: str | None = None + try: + after = source_path.stat() + stable_during_read = ( + before.st_size == after.st_size and before.st_mtime_ns == after.st_mtime_ns + ) + except OSError as exc: + state = "error" + error = f"post-read stat failed: {exc}" + stable_during_read = False + return JsonlSyncFacts( + state=state, + source=source, + line_count=line_count, + first_offset=first_offset, + tail_offset=tail_offset, + latest_event_at=summary.latest_event_at, + latest_turn_id=summary.latest_turn_id, + session_id=summary.session_id, + cwd=summary.cwd, + source_kind=summary.source_kind, + thread_source=summary.thread_source, + model_provider=summary.model_provider, + model=summary.model, + reasoning_effort=summary.reasoning_effort, + error=error, + stable_during_read=stable_during_read, + ) + + +def _read_all_records(path: Path) -> tuple[list[dict[str, Any]], int, int | None, int | None]: + records: list[dict[str, Any]] = [] + first_offset: int | None = None + tail_offset: int | None = None + offset = 0 + line_count = 0 + with path.open("rb") as handle: + for raw in handle: + line_count += 1 + if first_offset is None: + first_offset = offset + parsed = _parse_line(raw) + if parsed is not None: + records.append(parsed) + tail_offset = offset + offset += len(raw) + return records, line_count, first_offset, tail_offset + + +def _read_quick_records( + path: Path, limits: SyncLimits +) -> tuple[list[dict[str, Any]], None, int | None, int | None]: + top_records, first_offset = _read_top_records(path, limits.top_bytes) + tail_records, tail_offset = _read_tail_records(path, limits.tail_bytes, limits.tail_lines) + seen: set[int] = set() + records: list[dict[str, Any]] = [] + for offset, record in (*top_records, *tail_records): + if offset in seen: + continue + seen.add(offset) + records.append(record) + return records, None, first_offset, tail_offset + + +def _read_top_records( + path: Path, max_bytes: int +) -> tuple[list[tuple[int, dict[str, Any]]], int | None]: + if max_bytes <= 0: + return [], None + with path.open("rb") as handle: + data = handle.read(max_bytes) + + lines = data.splitlines(keepends=True) + if lines and not lines[-1].endswith(b"\n"): + lines = lines[:-1] + + records: list[tuple[int, dict[str, Any]]] = [] + offset = 0 + first_offset: int | None = None + for raw in lines: + parsed = _parse_line(raw) + if parsed is not None: + if first_offset is None: + first_offset = offset + records.append((offset, parsed)) + offset += len(raw) + return records, first_offset + + +def _read_tail_records( + path: Path, max_bytes: int, max_lines: int +) -> tuple[list[tuple[int, dict[str, Any]]], int | None]: + if max_bytes <= 0 or max_lines <= 0: + return [], None + size = path.stat().st_size + read_size = min(size, max_bytes) + with path.open("rb") as handle: + handle.seek(size - read_size) + data = handle.read(read_size) + + lines = data.splitlines(keepends=True) + offset = size - len(data) + if read_size < size and lines: + offset += len(lines[0]) + lines = lines[1:] + if lines and not lines[-1].endswith(b"\n"): + lines = lines[:-1] + + selected = lines[-max_lines:] + offset += sum(len(line) for line in lines[: len(lines) - len(selected)]) + records: list[tuple[int, dict[str, Any]]] = [] + first_offset: int | None = None + for raw in selected: + parsed = _parse_line(raw) + if parsed is not None: + if first_offset is None: + first_offset = offset + records.append((offset, parsed)) + offset += len(raw) + return records, first_offset + + +def _parse_line(raw: bytes) -> dict[str, Any] | None: + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + return None + return parsed if isinstance(parsed, dict) else None + + +@dataclass +class _Summary: + latest_event_at: str | None = None + latest_turn_id: str | None = None + session_id: str | None = None + cwd: str | None = None + source_kind: str | None = None + thread_source: str | None = None + model_provider: str | None = None + model: str | None = None + reasoning_effort: str | None = None + + +def _summarize_records(records: list[dict[str, Any]]) -> _Summary: + summary = _Summary() + for record in records: + timestamp = _string(record.get("timestamp")) + if timestamp is not None: + summary.latest_event_at = timestamp + payload = record.get("payload") + if not isinstance(payload, dict): + continue + if record.get("type") == "session_meta": + summary.session_id = _string(payload.get("id")) or summary.session_id + summary.cwd = _string(payload.get("cwd")) or summary.cwd + summary.source_kind = _string(payload.get("source")) or summary.source_kind + summary.thread_source = _string(payload.get("thread_source")) or summary.thread_source + summary.model_provider = ( + _string(payload.get("model_provider")) or summary.model_provider + ) + continue + if record.get("type") == "turn_context": + summary.cwd = _string(payload.get("cwd")) or summary.cwd + summary.model = _string(payload.get("model")) or summary.model + summary.reasoning_effort = _string(payload.get("effort")) or summary.reasoning_effort + continue + payload_type = payload.get("type") + if payload_type == "task_complete": + summary.latest_turn_id = _string(payload.get("turn_id")) or summary.latest_turn_id + return summary + + +def _string(value: object) -> str | None: + return value if isinstance(value, str) else None diff --git a/src/outfitter/dispatch/daemon/supervisor.py b/src/outfitter/dispatch/daemon/supervisor.py index 7405f86..fe8a809 100644 --- a/src/outfitter/dispatch/daemon/supervisor.py +++ b/src/outfitter/dispatch/daemon/supervisor.py @@ -1,5 +1,5 @@ """App-server supervision: detect a crash (stdout EOF) and recover — restart the -app-server, re-resume persisted lanes, and restart the reactor subscription. +app-server, restore lane observations, and restart the reactor subscription. The supervisor swaps ``ctx.client`` in place so the control server, scheduler, and handlers transparently use the new connection after a restart (recoverable daemon @@ -48,7 +48,7 @@ async def supervise(self, initial: SupervisedClient) -> None: while True: # not `while not self._stopped` — stop() flips it during the await below self._ctx.client = client self._client = client - await self._resume_lanes(client) + await self._restore_lanes(client) reactor_task = asyncio.create_task(self._run_reactor()) await client.wait_closed() # blocks until app-server dies or we stop reactor_task.cancel() @@ -75,15 +75,23 @@ async def _respawn(self) -> SupervisedClient | None: self._ctx.log.exception("app_server.spawn_failed", backoff=self._backoff) return None - async def _resume_lanes(self, client: SupervisedClient) -> None: - """Re-resume persisted lanes on the (re)connected app-server so the daemon - observes them again after a restart.""" + async def _restore_lanes(self, client: SupervisedClient) -> None: + """Restore persisted lane observation on the (re)connected app-server. + + Owned lanes are resumed so their app-server event stream is reattached. + Attached lanes stay metadata-only per ADR-0017; restarting the daemon must + not turn an observe-only registration into an implicit resume. + """ for lane in await self._ctx.registry.list_lanes(): try: - await client.thread_resume(lane.id) - self._ctx.log.info("lane.resumed", lane=lane.id, source=lane.source) + if lane.source == "own": + await client.thread_resume(lane.id) + self._ctx.log.info("lane.resumed", lane=lane.id, source=lane.source) + else: + await client.thread_read(lane.id, include_turns=False) + self._ctx.log.info("lane.metadata_read", lane=lane.id, source=lane.source) except ClientError as exc: - self._ctx.log.warning("lane.resume_failed", lane=lane.id, error=str(exc)) + self._ctx.log.warning("lane.restore_failed", lane=lane.id, error=str(exc)) drained = await drain_idle_queues(self._ctx) if drained: self._ctx.log.info("queue.drained_on_resume", count=drained) diff --git a/src/outfitter/dispatch/doctor.py b/src/outfitter/dispatch/doctor.py index 0f21b68..3c4ac1a 100644 --- a/src/outfitter/dispatch/doctor.py +++ b/src/outfitter/dispatch/doctor.py @@ -290,7 +290,14 @@ def _registry_check() -> DoctorCheck: recovery="Move the damaged registry aside or set DISPATCH_DB to a fresh path.", data=data, ) - expected = {"lanes", "triggers", "actions_log", "queued_messages"} + expected = { + "lanes", + "triggers", + "actions_log", + "queued_messages", + "lane_sync_sources", + "lane_snapshots", + } data.update({"schema_version": version, "integrity": integrity, "tables": sorted(tables)}) if integrity != "ok": return DoctorCheck( diff --git a/src/outfitter/dispatch/registry/models.py b/src/outfitter/dispatch/registry/models.py index d8fb89d..2f19f0d 100644 --- a/src/outfitter/dispatch/registry/models.py +++ b/src/outfitter/dispatch/registry/models.py @@ -9,6 +9,7 @@ LaneSource = Literal["own", "attached"] LaneStatus = Literal["idle", "busy", "waiting_approval", "archived", "error", "unknown"] +SyncState = Literal["unknown", "metadata", "partial", "complete", "error"] QueuedMessageStatus = Literal["pending", "sending", "sent", "error"] @@ -52,6 +53,35 @@ class QueuedMessage(BaseModel): error: str | None = None +class LaneSync(BaseModel): + """Compact sync/index state for a managed lane.""" + + lane: str + state: SyncState + source_path: str | None = None + source_device: int | None = None + source_inode: int | None = None + source_size: int | None = None + source_mtime_ns: int | None = None + line_count: int | None = None + first_offset: int | None = None + tail_offset: int | None = None + last_synced_at: str | None = None + error: str | None = None + display_name: str | None = None + preview: str | None = None + cwd: str | None = None + source: str | None = None + thread_source: str | None = None + model_provider: str | None = None + model: str | None = None + reasoning_effort: str | None = None + session_id: str | None = None + latest_event_at: str | None = None + latest_turn_id: str | None = None + transcript_partial: bool = True + + # --- triggers ----------------------------------------------------------------- # A trigger binds when -> action -> lane (ADR-0003: our own scheduler/reactor). diff --git a/src/outfitter/dispatch/registry/store.py b/src/outfitter/dispatch/registry/store.py index 7aad218..3891017 100644 --- a/src/outfitter/dispatch/registry/store.py +++ b/src/outfitter/dispatch/registry/store.py @@ -22,13 +22,14 @@ Lane, LaneSource, LaneStatus, + LaneSync, QueuedMessage, Trigger, WhenAdapter, ) Clock = Callable[[], datetime] -SCHEMA_VERSION = 1 +SCHEMA_VERSION = 2 def _utcnow() -> datetime: @@ -78,6 +79,37 @@ def _utcnow() -> datetime: updated_at TEXT NOT NULL, error TEXT ); +CREATE TABLE IF NOT EXISTS lane_sync_sources ( + lane TEXT PRIMARY KEY, + state TEXT NOT NULL, + source_path TEXT, + source_device INTEGER, + source_inode INTEGER, + source_size INTEGER, + source_mtime_ns INTEGER, + line_count INTEGER, + first_offset INTEGER, + tail_offset INTEGER, + last_synced_at TEXT, + error TEXT, + FOREIGN KEY(lane) REFERENCES lanes(id) ON DELETE CASCADE +); +CREATE TABLE IF NOT EXISTS lane_snapshots ( + lane TEXT PRIMARY KEY, + display_name TEXT, + preview TEXT, + cwd TEXT, + source TEXT, + thread_source TEXT, + model_provider TEXT, + model TEXT, + reasoning_effort TEXT, + session_id TEXT, + latest_event_at TEXT, + latest_turn_id TEXT, + transcript_partial INTEGER NOT NULL DEFAULT 1, + FOREIGN KEY(lane) REFERENCES lanes(id) ON DELETE CASCADE +); """ @@ -103,7 +135,7 @@ async def open(cls, path: str | Path = ":memory:", now: Clock = _utcnow) -> Regi f"version {SCHEMA_VERSION}" ) await store._conn.executescript(_SCHEMA) - if user_version == 0: + if user_version < SCHEMA_VERSION: await store._conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") await store._conn.commit() return store @@ -137,6 +169,56 @@ async def add_lane( updated_at=now, last_event_at=None, ) + await self._insert_lane(lane) + await self._conn.commit() + return lane + + async def add_lane_with_sync( + self, + *, + id: str, + handle: str, + source: LaneSource, + sync: LaneSync, + role: str | None = None, + cwd: str | None = None, + status: LaneStatus = "unknown", + pinned: bool = False, + audit_op: str | None = None, + audit_detail: str | None = None, + ) -> tuple[Lane, LaneSync]: + if sync.lane != id: + raise ValueError(f"sync lane {sync.lane!r} does not match lane id {id!r}") + now = self._now() + lane = Lane( + id=id, + handle=handle, + role=role, + cwd=cwd, + source=source, + status=status, + pinned=pinned, + created_at=now, + updated_at=now, + last_event_at=None, + ) + synced_at = sync.last_synced_at or now.isoformat() + await self._conn.execute("BEGIN") + try: + await self._insert_lane(lane) + await self._upsert_lane_sync_rows(sync, synced_at) + if audit_op is not None: + await self._insert_action_log(audit_op, lane=lane.id, detail=audit_detail) + except Exception: + await self._conn.rollback() + raise + await self._conn.commit() + saved_sync = await self.get_lane_sync(lane.id) + if saved_sync is None: + raise RuntimeError("lane sync insert did not return a row") + return lane, saved_sync + + async def _insert_lane(self, lane: Lane) -> None: await self._conn.execute( "INSERT INTO lanes (id, handle, role, cwd, source, status, pinned, active_turn_id, " "created_at, updated_at, last_event_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", @@ -148,14 +230,12 @@ async def add_lane( lane.source, lane.status, int(lane.pinned), - None, + lane.active_turn_id, lane.created_at.isoformat(), lane.updated_at.isoformat(), - None, + lane.last_event_at.isoformat() if lane.last_event_at else None, ), ) - await self._conn.commit() - return lane async def find_lane(self, lane_id: str) -> Lane | None: async with self._conn.execute("SELECT * FROM lanes WHERE id = ?", (lane_id,)) as cur: @@ -284,6 +364,87 @@ async def reset_sending_messages(self) -> int: await self._conn.commit() return cur.rowcount + # --- lane sync ----------------------------------------------------------- + + async def upsert_lane_sync(self, sync: LaneSync) -> LaneSync: + now = sync.last_synced_at or self._now().isoformat() + await self._upsert_lane_sync_rows(sync, now) + await self._conn.commit() + got = await self.get_lane_sync(sync.lane) + if got is None: + raise RuntimeError("lane sync upsert did not return a row") + return got + + async def _upsert_lane_sync_rows(self, sync: LaneSync, last_synced_at: str) -> None: + await self._conn.execute( + "INSERT INTO lane_sync_sources (lane, state, source_path, source_device, " + "source_inode, source_size, source_mtime_ns, line_count, first_offset, " + "tail_offset, last_synced_at, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "ON CONFLICT(lane) DO UPDATE SET state = excluded.state, " + "source_path = excluded.source_path, source_device = excluded.source_device, " + "source_inode = excluded.source_inode, source_size = excluded.source_size, " + "source_mtime_ns = excluded.source_mtime_ns, line_count = excluded.line_count, " + "first_offset = excluded.first_offset, tail_offset = excluded.tail_offset, " + "last_synced_at = excluded.last_synced_at, error = excluded.error", + ( + sync.lane, + sync.state, + sync.source_path, + sync.source_device, + sync.source_inode, + sync.source_size, + sync.source_mtime_ns, + sync.line_count, + sync.first_offset, + sync.tail_offset, + last_synced_at, + sync.error, + ), + ) + await self._conn.execute( + "INSERT INTO lane_snapshots (lane, display_name, preview, cwd, source, " + "thread_source, model_provider, model, reasoning_effort, session_id, " + "latest_event_at, latest_turn_id, transcript_partial) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "ON CONFLICT(lane) DO UPDATE SET display_name = excluded.display_name, " + "preview = excluded.preview, cwd = excluded.cwd, source = excluded.source, " + "thread_source = excluded.thread_source, model_provider = excluded.model_provider, " + "model = excluded.model, reasoning_effort = excluded.reasoning_effort, " + "session_id = excluded.session_id, latest_event_at = excluded.latest_event_at, " + "latest_turn_id = excluded.latest_turn_id, " + "transcript_partial = excluded.transcript_partial", + ( + sync.lane, + sync.display_name, + sync.preview, + sync.cwd, + sync.source, + sync.thread_source, + sync.model_provider, + sync.model, + sync.reasoning_effort, + sync.session_id, + sync.latest_event_at, + sync.latest_turn_id, + int(sync.transcript_partial), + ), + ) + + async def get_lane_sync(self, lane_id: str) -> LaneSync | None: + async with self._conn.execute(_LANE_SYNC_SELECT + " WHERE src.lane = ?", (lane_id,)) as cur: + row = await cur.fetchone() + return _row_to_lane_sync(row) if row is not None else None + + async def get_lane_sync_many(self, lane_ids: list[str]) -> dict[str, LaneSync]: + if not lane_ids: + return {} + placeholders = ", ".join("?" for _ in lane_ids) + async with self._conn.execute( + _LANE_SYNC_SELECT + f" WHERE src.lane IN ({placeholders})", tuple(lane_ids) + ) as cur: + rows = await cur.fetchall() + return {sync.lane: sync for sync in (_row_to_lane_sync(row) for row in rows)} + # --- triggers ------------------------------------------------------------- async def add_trigger(self, trigger: Trigger) -> Trigger: @@ -349,13 +510,26 @@ async def log_action( trigger_id: str | None = None, detail: str | None = None, outcome: str = "ok", + ) -> None: + await self._insert_action_log( + op, lane=lane, trigger_id=trigger_id, detail=detail, outcome=outcome + ) + await self._conn.commit() + + async def _insert_action_log( + self, + op: str, + *, + lane: str | None = None, + trigger_id: str | None = None, + detail: str | None = None, + outcome: str = "ok", ) -> None: await self._conn.execute( "INSERT INTO actions_log (ts, op, lane, trigger_id, detail, outcome) " "VALUES (?, ?, ?, ?, ?, ?)", (self._now().isoformat(), op, lane, trigger_id, detail, outcome), ) - await self._conn.commit() async def recent_actions(self, limit: int = 50) -> list[ActionRecord]: async with self._conn.execute( @@ -370,10 +544,47 @@ def _row_dict(row: aiosqlite.Row) -> dict[str, object]: return dict(zip(row.keys(), tuple(row), strict=True)) +_LANE_SYNC_SELECT = """ +SELECT + src.lane AS lane, + src.state AS state, + src.source_path AS source_path, + src.source_device AS source_device, + src.source_inode AS source_inode, + src.source_size AS source_size, + src.source_mtime_ns AS source_mtime_ns, + src.line_count AS line_count, + src.first_offset AS first_offset, + src.tail_offset AS tail_offset, + src.last_synced_at AS last_synced_at, + src.error AS error, + snap.display_name AS display_name, + snap.preview AS preview, + snap.cwd AS cwd, + snap.source AS source, + snap.thread_source AS thread_source, + snap.model_provider AS model_provider, + snap.model AS model, + snap.reasoning_effort AS reasoning_effort, + snap.session_id AS session_id, + snap.latest_event_at AS latest_event_at, + snap.latest_turn_id AS latest_turn_id, + snap.transcript_partial AS transcript_partial +FROM lane_sync_sources src +LEFT JOIN lane_snapshots snap ON snap.lane = src.lane +""" + + def _row_to_lane(row: aiosqlite.Row) -> Lane: return Lane.model_validate(_row_dict(row)) +def _row_to_lane_sync(row: aiosqlite.Row) -> LaneSync: + data = _row_dict(row) + data["transcript_partial"] = bool(data["transcript_partial"]) + return LaneSync.model_validate(data) + + def _row_to_trigger(row: aiosqlite.Row) -> Trigger: data = _row_dict(row) last_fired = data["last_fired_at"] diff --git a/tests/client/test_models.py b/tests/client/test_models.py index bcd618e..63b8ca8 100644 --- a/tests/client/test_models.py +++ b/tests/client/test_models.py @@ -10,6 +10,7 @@ ThreadForkParams, ThreadGoal, ThreadGoalSetParams, + ThreadInfo, ThreadListResult, ThreadReadParams, ThreadRollbackParams, @@ -105,6 +106,25 @@ def test_thread_list_result_reads_data_key() -> None: assert result.next_cursor == "c1" +def test_thread_info_keeps_sync_metadata_fields() -> None: + thread = ThreadInfo.model_validate( + { + "id": "t1", + "sessionId": "t1", + "path": "/tmp/rollout.jsonl", + "modelProvider": "openai", + "threadSource": "user", + "updatedAt": 123, + } + ) + + assert thread.session_id == "t1" + assert thread.path == "/tmp/rollout.jsonl" + assert thread.model_provider == "openai" + assert thread.thread_source == "user" + assert thread.updated_at == 123 + + def test_thread_read_include_turns_alias() -> None: params = ThreadReadParams(thread_id="t1", include_turns=True) assert params.model_dump(by_alias=True, exclude_none=True) == { diff --git a/tests/core/test_examples.py b/tests/core/test_examples.py index fccc040..fd6f8bd 100644 --- a/tests/core/test_examples.py +++ b/tests/core/test_examples.py @@ -20,6 +20,7 @@ async def test_registry_has_the_v1_ops() -> None: "lane-rename", "transcript", "watch", + "sync", "roster", "discover", "archive", diff --git a/tests/core/test_handlers.py b/tests/core/test_handlers.py index 16ef631..83fc4d4 100644 --- a/tests/core/test_handlers.py +++ b/tests/core/test_handlers.py @@ -24,6 +24,7 @@ GoalSetInput, LaneInput, LaneRenameInput, + LaneSyncInput, LaneTextInput, LogInput, NewInput, @@ -584,21 +585,28 @@ async def test_attach_is_idempotent(store: Registry) -> None: second = await handlers.attach_lane(AttachInput(thread="T9"), ctx) assert first.id == second.id == "T9" assert len((await handlers.roster(RosterInput(), ctx)).lanes) == 1 + assert any(name == "thread_read" for name, _ in client.calls) + assert not any(name == "thread_resume" for name, _ in client.calls) + sync = await store.get_lane_sync("T9") + assert sync is not None + assert sync.state == "metadata" + actions = await store.recent_actions(limit=10) + assert [action.op for action in actions] == ["attach"] -class _HangingResumeClient(FakeLaneClient): - """A client whose ``thread/resume`` never returns — models a wedged app-server.""" +class _HangingReadClient(FakeLaneClient): + """A client whose metadata read never returns — models a wedged app-server.""" - async def thread_resume(self, thread_id: str) -> ThreadInfo: + async def thread_read(self, thread_id: str, include_turns: bool = False) -> dict[str, object]: await asyncio.sleep(3600) # cancelled by the handler's wait_for bound raise AssertionError("unreachable") # pragma: no cover -async def test_attach_resume_timeout_projects_cleanly_and_leaves_registry_empty( +async def test_attach_metadata_timeout_projects_cleanly_and_leaves_registry_empty( store: Registry, monkeypatch: pytest.MonkeyPatch ) -> None: - monkeypatch.setattr(handlers, "_RESUME_TIMEOUT_S", 0.05) - ctx = make_ctx(store, _HangingResumeClient()) + monkeypatch.setattr(handlers, "_ATTACH_METADATA_TIMEOUT_S", 0.05) + ctx = make_ctx(store, _HangingReadClient()) with pytest.raises(AppServerError) as excinfo: await handlers.attach_lane(AttachInput(thread="STUCK"), ctx) assert "timed out" in str(excinfo.value) @@ -606,6 +614,85 @@ async def test_attach_resume_timeout_projects_cleanly_and_leaves_registry_empty( assert (await handlers.roster(RosterInput(), ctx)).lanes == [] +async def test_attach_invalid_metadata_projects_cleanly_and_leaves_registry_empty( + store: Registry, +) -> None: + client = FakeLaneClient() + client.read_result = {"data": []} + ctx = make_ctx(store, client) + + with pytest.raises(AppServerError) as excinfo: + await handlers.attach_lane(AttachInput(thread="BAD"), ctx) + + assert "invalid payload" in str(excinfo.value) + assert (await handlers.roster(RosterInput(), ctx)).lanes == [] + + +async def test_attach_with_sync_indexes_jsonl_and_roster_reports_state( + store: Registry, tmp_path: Path +) -> None: + path = tmp_path / "rollout.jsonl" + path.write_text( + "\n".join( + [ + '{"type":"session_meta","timestamp":"2026-06-05T10:00:00.000Z",' + '"payload":{"id":"T9","cwd":"/work","source":"vscode",' + '"thread_source":"user","model_provider":"openai"}}', + '{"type":"turn_context","timestamp":"2026-06-05T10:00:01.000Z",' + '"payload":{"model":"gpt-5-codex","effort":"low"}}', + '{"type":"event_msg","timestamp":"2026-06-05T10:00:02.000Z",' + '"payload":{"type":"task_complete","turn_id":"turn-1"}}', + ] + ) + + "\n" + ) + client = FakeLaneClient() + client.read_result = { + "thread": { + "id": "T9", + "name": "Desktop", + "preview": "hello from desktop", + "cwd": "/work", + "source": "vscode", + "path": str(path), + "sessionId": "T9", + "modelProvider": "openai", + } + } + ctx = make_ctx(store, client) + + attached = await handlers.attach_lane(AttachInput(thread="T9", sync=True), ctx) + detail = await handlers.show(ShowInput(lane="T9"), ctx) + roster = await handlers.roster(RosterInput(), ctx) + + assert attached.handle == "Desktop" + assert detail.sync.state == "partial" + assert detail.sync.latest_turn_id == "turn-1" + assert detail.sync.source_size == path.stat().st_size + assert roster.lanes[0].sync.state == "partial" + assert roster.lanes[0].sync.latest_event_at == "2026-06-05T10:00:02.000Z" + assert sum(1 for name, _ in client.calls if name == "thread_read") == 1 + assert not any(name == "thread_resume" for name, _ in client.calls) + + +async def test_lane_sync_can_full_scan_existing_lane(store: Registry, tmp_path: Path) -> None: + path = tmp_path / "rollout.jsonl" + path.write_text( + '{"type":"session_meta","timestamp":"2026-06-05T10:00:00.000Z","payload":{"id":"T9"}}\n' + ) + client = FakeLaneClient() + client.read_result = {"thread": {"id": "T9", "path": str(path)}} + ctx = make_ctx(store, client) + await store.add_lane(id="T9", handle="@desktop", source="attached") + + out = await handlers.sync_lane(LaneSyncInput(lane="@desktop", full=True), ctx) + + assert out.lane == "T9" + assert out.sync.state == "complete" + assert out.sync.transcript_partial is False + assert any(name == "thread_read" for name, _ in client.calls) + + async def test_discover_lists_persisted_sessions_from_client(store: Registry) -> None: client = FakeLaneClient() client.list_result = [ diff --git a/tests/core/test_sync.py b/tests/core/test_sync.py new file mode 100644 index 0000000..2380f0b --- /dev/null +++ b/tests/core/test_sync.py @@ -0,0 +1,128 @@ +"""Codex JSONL sync parser tests.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from outfitter.dispatch.core.sync import SyncLimits, scan_codex_jsonl + + +def _write_jsonl( + path: Path, records: list[dict[str, object]], *, partial: str | None = None +) -> None: + with path.open("w") as handle: + for record in records: + handle.write(json.dumps(record) + "\n") + if partial is not None: + handle.write(partial) + + +def test_quick_scan_reads_bounded_top_and_tail_without_partial_line(tmp_path: Path) -> None: + path = tmp_path / "rollout.jsonl" + large_context = "x" * 5000 + _write_jsonl( + path, + [ + { + "type": "session_meta", + "timestamp": "2026-06-05T10:00:00.000Z", + "payload": { + "id": "T1", + "cwd": "/work", + "source": "vscode", + "thread_source": "user", + "model_provider": "openai", + "base_instructions": large_context, + }, + }, + { + "type": "turn_context", + "timestamp": "2026-06-05T10:00:01.000Z", + "payload": {"cwd": "/work", "model": "gpt-5-codex", "effort": "low"}, + }, + { + "type": "event_msg", + "timestamp": "2026-06-05T10:00:02.000Z", + "payload": {"type": "token_count"}, + }, + { + "type": "event_msg", + "timestamp": "2026-06-05T10:00:03.000Z", + "payload": {"type": "task_complete", "turn_id": "turn-1"}, + }, + ], + partial='{"type": "event_msg", "timestamp"', + ) + + facts = scan_codex_jsonl( + str(path), + limits=SyncLimits(top_bytes=10_000, tail_bytes=512, tail_lines=2), + ) + + assert facts.state == "partial" + assert facts.source is not None + assert facts.source.size == path.stat().st_size + assert facts.line_count is None + assert facts.first_offset == 0 + assert facts.tail_offset is not None + assert facts.session_id == "T1" + assert facts.cwd == "/work" + assert facts.source_kind == "vscode" + assert facts.thread_source == "user" + assert facts.model_provider == "openai" + assert facts.model == "gpt-5-codex" + assert facts.reasoning_effort == "low" + assert facts.latest_turn_id == "turn-1" + assert facts.latest_event_at == "2026-06-05T10:00:03.000Z" + + +def test_quick_scan_honors_top_byte_limit_for_large_first_line(tmp_path: Path) -> None: + path = tmp_path / "rollout.jsonl" + _write_jsonl( + path, + [ + { + "type": "session_meta", + "timestamp": "2026-06-05T10:00:00.000Z", + "payload": {"id": "T1", "base_instructions": "x" * 20_000}, + }, + { + "type": "event_msg", + "timestamp": "2026-06-05T10:00:03.000Z", + "payload": {"type": "task_complete", "turn_id": "turn-1"}, + }, + ], + ) + + facts = scan_codex_jsonl( + str(path), + limits=SyncLimits(top_bytes=128, tail_bytes=512, tail_lines=1), + ) + + assert facts.state == "partial" + assert facts.first_offset is None + assert facts.session_id is None + assert facts.latest_turn_id == "turn-1" + + +def test_full_scan_marks_complete_and_reports_missing_file(tmp_path: Path) -> None: + path = tmp_path / "rollout.jsonl" + _write_jsonl( + path, + [ + { + "type": "session_meta", + "timestamp": "2026-06-05T10:00:00.000Z", + "payload": {"id": "T1"}, + } + ], + ) + + full = scan_codex_jsonl(str(path), full=True) + missing = scan_codex_jsonl(str(tmp_path / "missing.jsonl")) + + assert full.state == "complete" + assert full.line_count == 1 + assert missing.state == "error" + assert missing.error is not None diff --git a/tests/daemon/test_supervisor.py b/tests/daemon/test_supervisor.py index 65cc588..5837b8c 100644 --- a/tests/daemon/test_supervisor.py +++ b/tests/daemon/test_supervisor.py @@ -1,5 +1,4 @@ -"""Supervisor: restart the app-server on crash and re-resume lanes (recoverable -daemon — the v0 DoD).""" +"""Supervisor: restart the app-server on crash and restore lane observation.""" from __future__ import annotations @@ -27,7 +26,7 @@ async def store() -> AsyncIterator[Registry]: await s.close() -async def test_supervisor_restarts_and_reresumes_lanes_on_crash(store: Registry) -> None: +async def test_supervisor_restarts_and_restores_lanes_on_crash(store: Registry) -> None: await store.add_lane(id="D1", handle="@desktop", source="attached", status="idle") await store.add_lane(id="O1", handle="@own", source="own", status="idle") ctx = make_ctx(store) @@ -45,17 +44,26 @@ async def make_client() -> FakeSupervisedClient: task = asyncio.create_task(supervisor.supervise(first)) await asyncio.sleep(0.05) - # On first start, both persisted lanes are re-resumed and ctx.client is set. - assert sorted(clients[0].resumed) == ["D1", "O1"] + # Owned lanes are resumed for event observation; attached lanes stay + # metadata-only after restart (ADR-0017). + assert clients[0].resumed == ["O1"] + assert any( + name == "thread_read" and kw["thread_id"] == "D1" and kw["include_turns"] is False + for name, kw in clients[0].calls + ) assert ctx.client is clients[0] # Simulate app-server crash (stdout EOF → wait_closed returns). clients[0].closed.set() await asyncio.sleep(0.05) - # Supervisor started a fresh client and re-resumed the lanes on it. + # Supervisor started a fresh client and restored lanes on it. assert len(clients) == 2 - assert sorted(clients[1].resumed) == ["D1", "O1"] + assert clients[1].resumed == ["O1"] + assert any( + name == "thread_read" and kw["thread_id"] == "D1" and kw["include_turns"] is False + for name, kw in clients[1].calls + ) assert ctx.client is clients[1] await supervisor.stop() diff --git a/tests/fakes.py b/tests/fakes.py index d657fc8..5c80cfb 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -128,6 +128,9 @@ async def thread_list( async def thread_read(self, thread_id: str, include_turns: bool = False) -> dict[str, object]: self._record("thread_read", thread_id=thread_id, include_turns=include_turns) + if not self.read_result: + thread = self.threads.get(thread_id, ThreadInfo(id=thread_id)) + return {"thread": thread.model_dump(by_alias=True, exclude_none=True)} return self.read_result async def thread_archive(self, thread_id: str) -> None: diff --git a/tests/registry/test_store.py b/tests/registry/test_store.py index 472c02d..be40888 100644 --- a/tests/registry/test_store.py +++ b/tests/registry/test_store.py @@ -9,6 +9,7 @@ import pytest_asyncio from outfitter.dispatch.contracts.errors import NotFoundError +from outfitter.dispatch.registry.models import LaneSync from outfitter.dispatch.registry.store import Registry @@ -65,6 +66,53 @@ async def test_log_action_and_recent(store: Registry) -> None: assert recent[1].detail == "hi" +async def test_add_lane_with_sync_commits_lane_sync_and_audit(store: Registry) -> None: + lane, sync = await store.add_lane_with_sync( + id="L1", + handle="@a", + source="attached", + cwd="/work", + status="idle", + sync=LaneSync(lane="L1", state="metadata", display_name="Desktop"), + audit_op="attach", + audit_detail="@a", + ) + + assert lane.id == "L1" + assert lane.status == "idle" + assert sync.lane == "L1" + assert sync.state == "metadata" + assert sync.display_name == "Desktop" + assert sync.last_synced_at == _clock().isoformat() + actions = await store.recent_actions(limit=10) + assert [(action.op, action.lane, action.detail) for action in actions] == [ + ("attach", "L1", "@a") + ] + + +async def test_add_lane_with_sync_rolls_back_if_sync_write_fails( + store: Registry, monkeypatch: pytest.MonkeyPatch +) -> None: + async def fail_sync_write(sync: LaneSync, last_synced_at: str) -> None: + raise RuntimeError(f"boom: {sync.lane} {last_synced_at}") + + monkeypatch.setattr(store, "_upsert_lane_sync_rows", fail_sync_write) + + with pytest.raises(RuntimeError, match="boom"): + await store.add_lane_with_sync( + id="L1", + handle="@a", + source="attached", + sync=LaneSync(lane="L1", state="metadata"), + audit_op="attach", + audit_detail="@a", + ) + + assert await store.find_lane("L1") is None + assert await store.get_lane_sync("L1") is None + assert await store.recent_actions(limit=10) == [] + + async def test_queued_messages_are_claimed_and_recovered(store: Registry) -> None: first = await store.enqueue_message(lane="L1", text="one") second = await store.enqueue_message(lane="L1", text="two") @@ -87,3 +135,51 @@ async def test_queued_messages_are_claimed_and_recovered(store: Registry) -> Non failed = await store.get_queued_message(second.id) assert failed.status == "error" assert failed.error == "app_server" + + +async def test_lane_sync_roundtrip_and_many_lookup(store: Registry) -> None: + await store.add_lane(id="L1", handle="@a", source="attached", cwd="/work") + await store.add_lane(id="L2", handle="@b", source="own") + + saved = await store.upsert_lane_sync( + LaneSync( + lane="L1", + state="partial", + source_path="/tmp/rollout.jsonl", + source_device=1, + source_inode=2, + source_size=3, + source_mtime_ns=4, + line_count=5, + first_offset=0, + tail_offset=128, + display_name="Desktop", + preview="hello", + cwd="/work", + source="vscode", + thread_source="user", + model_provider="openai", + model="gpt-5-codex", + reasoning_effort="low", + session_id="L1", + latest_event_at="2026-06-05T10:00:00.000Z", + latest_turn_id="turn-1", + ) + ) + + assert saved.last_synced_at == _clock().isoformat() + assert saved.display_name == "Desktop" + assert saved.source_size == 3 + + updated = await store.upsert_lane_sync( + saved.model_copy( + update={"state": "complete", "source_size": 10, "latest_turn_id": "turn-2"} + ) + ) + assert updated.state == "complete" + assert updated.source_size == 10 + assert updated.latest_turn_id == "turn-2" + + many = await store.get_lane_sync_many(["L1", "L2"]) + assert set(many) == {"L1"} + assert many["L1"].preview == "hello" diff --git a/tests/surfaces/test_mcp_routing.py b/tests/surfaces/test_mcp_routing.py index b1e47c0..08bc457 100644 --- a/tests/surfaces/test_mcp_routing.py +++ b/tests/surfaces/test_mcp_routing.py @@ -84,6 +84,14 @@ async def test_tool_calls_transcript_goal_and_compact(socket_path: Path) -> None assert compact.structuredContent is not None assert compact.structuredContent["accepted"] is True + synced = await handle_tool_call( + socket_path, + "dispatch_lane_write", + {"op": "sync", "lane": "lane-1"}, + ) + assert synced.structuredContent is not None + assert synced.structuredContent["sync"]["state"] == "metadata" + async def test_tool_call_error_projects_full_taxonomy_into_meta(socket_path: Path) -> None: result = await handle_tool_call( diff --git a/tests/surfaces/test_parity.py b/tests/surfaces/test_parity.py index 909e2a4..f1ad3a8 100644 --- a/tests/surfaces/test_parity.py +++ b/tests/surfaces/test_parity.py @@ -41,6 +41,7 @@ def _stub_invoke(op_id: str, params: dict[str, object]) -> dict[str, object]: "lane list": "roster", "lane list --unmanaged": "discover", "lane attach": "attach", + "lane sync": "sync", "lane rename": "lane-rename", "lane fork": "fork", "lane rollback": "rollback", @@ -118,12 +119,14 @@ def invoke(op_id: str, params: dict[str, object]) -> dict[str, object]: assert runner.invoke(app, ["send", "@docs", "hi", "--context"]).exit_code == 0 assert runner.invoke(app, ["stop", "@docs"]).exit_code == 0 assert runner.invoke(app, ["lane", "list", "--unmanaged"]).exit_code == 0 + assert runner.invoke(app, ["lane", "sync", "@docs"]).exit_code == 0 assert runner.invoke(app, ["goal", "status", "@docs"]).exit_code == 0 assert calls == [ ("send", {"lane": "@docs", "text": "hi", "mode": "context"}), ("stop", {"lane": "@docs"}), ("discover", {"limit": 50}), + ("sync", {"lane": "@docs", "full": False}), ("goal-get", {"lane": "@docs"}), ] diff --git a/tests/test_doctor.py b/tests/test_doctor.py index 599a810..74341c2 100644 --- a/tests/test_doctor.py +++ b/tests/test_doctor.py @@ -70,7 +70,7 @@ def test_doctor_warns_for_unversioned_registry_migration( registry = next(check for check in report.checks if check.name == "registry") assert registry.status == "warn" assert registry.summary == "registry schema is unversioned" - assert registry.detail == "missing tables: queued_messages" + assert registry.detail == "missing tables: lane_snapshots, lane_sync_sources, queued_messages" assert registry.recovery is not None assert "dispatch down" in registry.recovery diff --git a/uv.lock b/uv.lock index 2f36cdf..568d631 100644 --- a/uv.lock +++ b/uv.lock @@ -466,7 +466,7 @@ wheels = [ [[package]] name = "outfitter-dispatch" -version = "0.2.1" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "aiosqlite" },