Skip to content

Commit 05d0f3c

Browse files
committed
feat: persist planning artifacts for runs
1 parent 721f908 commit 05d0f3c

2 files changed

Lines changed: 167 additions & 1 deletion

File tree

apps/orchestrator/src/cortexpilot_orch/api/main_pm_intake_helpers.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
from cortexpilot_orch.config import load_config
1818
from cortexpilot_orch.contract.compiler import build_role_binding_summary, sync_role_contract
1919
from cortexpilot_orch.observability.logger import log_event
20-
from cortexpilot_orch.planning.intake import IntakeService
20+
from cortexpilot_orch.planning.intake import IntakeService, _build_wave_plan, _build_worker_prompt_contracts
2121
from cortexpilot_orch.store.intake_store import IntakeStore
22+
from cortexpilot_orch.store.run_store import RunStore
2223

2324

2425
_TRUTHY_VALUES = {"1", "true", "yes", "y", "on"}
@@ -118,6 +119,55 @@ def _strip_intake_only_contract_fields(contract: dict[str, Any]) -> dict[str, An
118119
return sanitized
119120

120121

122+
def _safe_read_intake_store_payload(store: object, method_name: str, intake_id: str) -> dict[str, Any]:
123+
reader = getattr(store, method_name, None)
124+
if not callable(reader):
125+
return {}
126+
try:
127+
payload = reader(intake_id)
128+
except Exception:
129+
return {}
130+
return payload if isinstance(payload, dict) else {}
131+
132+
133+
def _persist_planning_artifacts_for_run(
134+
*,
135+
intake_id: str,
136+
run_id: str,
137+
runs_root: Path,
138+
) -> list[str]:
139+
intake_store = IntakeStore()
140+
intake_payload = _safe_read_intake_store_payload(intake_store, "read_intake", intake_id)
141+
response_payload = _safe_read_intake_store_payload(intake_store, "read_response", intake_id)
142+
plan_bundle = response_payload.get("plan_bundle") if isinstance(response_payload.get("plan_bundle"), dict) else None
143+
if not intake_payload or not isinstance(plan_bundle, dict):
144+
return []
145+
146+
run_store = RunStore(runs_root=runs_root)
147+
artifacts_to_write: list[tuple[str, Any]] = [
148+
("planning_wave_plan.json", _build_wave_plan(plan_bundle)),
149+
("planning_worker_prompt_contracts.json", _build_worker_prompt_contracts(plan_bundle, intake_payload)),
150+
]
151+
written: list[str] = []
152+
for filename, payload in artifacts_to_write:
153+
if payload in ({}, [], None):
154+
continue
155+
run_store.write_artifact(run_id, filename, json.dumps(payload, ensure_ascii=False, indent=2))
156+
written.append(filename)
157+
158+
if written:
159+
run_store.append_event(
160+
run_id,
161+
{
162+
"level": "INFO",
163+
"event": "PLANNING_ARTIFACTS_WRITTEN",
164+
"run_id": run_id,
165+
"meta": {"intake_id": intake_id, "artifacts": written},
166+
},
167+
)
168+
return written
169+
170+
121171
def configure_pm_session_aggregation(
122172
*,
123173
runs_root_fn: Callable[[], Path],
@@ -597,10 +647,16 @@ def _execute_in_background() -> None:
597647
)
598648

599649
IntakeStore().append_event(intake_id, {"event": "INTAKE_RUN", "run_id": run_id})
650+
planning_artifacts = _persist_planning_artifacts_for_run(
651+
intake_id=intake_id,
652+
run_id=run_id,
653+
runs_root=runs_root,
654+
)
600655
return {
601656
"ok": True,
602657
"run_id": run_id,
603658
"contract_path": str(contract_path),
604659
"strict_acceptance": bool(runtime_options.get("strict_acceptance", False)),
605660
"role_binding_summary": build_role_binding_summary(contract),
661+
"planning_artifacts": planning_artifacts,
606662
}

apps/orchestrator/tests/test_main_pm_intake_helpers_branches.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,116 @@ def execute_task(contract_path: Path, mock_mode: bool = False) -> str:
463463
assert observed_contract["runtime_options"]["strict_acceptance"] is True
464464

465465

466+
def test_run_intake_persists_planning_artifacts_into_run_bundle(monkeypatch, tmp_path: Path) -> None:
467+
runs_root = tmp_path / "runs"
468+
runtime_contract_root = tmp_path / ".runtime-cache" / "cortexpilot" / "contracts"
469+
intake_payload = {
470+
"objective": "Ship one planning artifact bridge",
471+
"constraints": ["truthful-public-surface"],
472+
"search_queries": ["command tower planning artifact"],
473+
}
474+
response_payload = {
475+
"plan_bundle": {
476+
"bundle_id": "bundle-1",
477+
"objective": "Ship one planning artifact bridge",
478+
"owner_agent": {"role": "PM", "agent_id": "pm-1"},
479+
"plans": [
480+
{
481+
"plan_id": "worker-1",
482+
"assigned_agent": {"role": "WORKER", "agent_id": "worker-1"},
483+
"spec": "Persist the planning artifact into the run bundle.",
484+
"allowed_paths": ["apps/orchestrator"],
485+
"acceptance_tests": [{"name": "pytest", "cmd": "python3 -m pytest -q", "must_pass": True}],
486+
"mcp_tool_set": ["codex"],
487+
"required_outputs": [{"name": "task_result.json", "type": "report"}],
488+
}
489+
],
490+
}
491+
}
492+
intake_events: list[tuple[str, dict[str, object]]] = []
493+
494+
class _Store:
495+
def append_event(self, intake_id: str, payload: dict[str, object]) -> None:
496+
intake_events.append((intake_id, payload))
497+
498+
def read_intake(self, intake_id: str) -> dict[str, object]:
499+
assert intake_id == "persist"
500+
return intake_payload
501+
502+
def read_response(self, intake_id: str) -> dict[str, object]:
503+
assert intake_id == "persist"
504+
return response_payload
505+
506+
monkeypatch.setattr(helpers, "IntakeStore", lambda: _Store())
507+
monkeypatch.setattr(
508+
helpers,
509+
"load_config",
510+
lambda: types.SimpleNamespace(
511+
repo_root=tmp_path,
512+
runs_root=runs_root,
513+
contract_root=tmp_path / "contracts",
514+
runtime_contract_root=runtime_contract_root,
515+
),
516+
)
517+
518+
class _BuildOK:
519+
def build_contract(self, intake_id: str) -> dict[str, object]:
520+
assert intake_id == "persist"
521+
return {
522+
"task_id": "task-persist",
523+
"owner_agent": {"role": "PM", "agent_id": "pm-1"},
524+
"assigned_agent": {"role": "WORKER", "agent_id": "worker-1"},
525+
"inputs": {"spec": "repro", "artifacts": []},
526+
"required_outputs": [{"name": "task_result.json", "type": "json", "acceptance": "ok"}],
527+
"allowed_paths": ["apps/orchestrator"],
528+
"forbidden_actions": [],
529+
"acceptance_tests": [{"name": "pytest", "cmd": "python3 -m pytest -q", "must_pass": True}],
530+
"tool_permissions": {
531+
"filesystem": "workspace-write",
532+
"shell": "on-request",
533+
"network": "deny",
534+
"mcp_tools": ["codex"],
535+
},
536+
"mcp_tool_set": ["codex"],
537+
"timeout_retry": {"timeout_sec": 60, "max_retries": 0, "retry_backoff_sec": 0},
538+
"rollback": {"strategy": "git_reset_hard", "baseline_ref": "HEAD"},
539+
"evidence_links": [],
540+
"log_refs": {"run_id": "", "paths": {}},
541+
}
542+
543+
class _Orchestrator:
544+
@staticmethod
545+
def execute_task(contract_path: Path, mock_mode: bool = False) -> str:
546+
del mock_mode
547+
payload = json.loads(contract_path.read_text(encoding="utf-8"))
548+
store = RunStore(runs_root=runs_root)
549+
run_id = store.create_run(str(payload.get("task_id") or "task"))
550+
store.write_manifest(run_id, {"run_id": run_id, "task_id": payload.get("task_id"), "status": "RUNNING", "repo": {}})
551+
return run_id
552+
553+
result = helpers.run_intake(
554+
"persist",
555+
{"mock": True},
556+
intake_service_cls=_BuildOK,
557+
orchestration_service=_Orchestrator(),
558+
error_detail_fn=lambda code: {"code": code},
559+
current_request_id_fn=lambda: "req-persist",
560+
)
561+
562+
run_id = result["run_id"]
563+
wave_plan = json.loads((runs_root / run_id / "artifacts" / "planning_wave_plan.json").read_text(encoding="utf-8"))
564+
worker_contracts = json.loads(
565+
(runs_root / run_id / "artifacts" / "planning_worker_prompt_contracts.json").read_text(encoding="utf-8")
566+
)
567+
568+
assert result["planning_artifacts"] == ["planning_wave_plan.json", "planning_worker_prompt_contracts.json"]
569+
assert wave_plan["wave_id"] == "bundle-1"
570+
assert wave_plan["objective"] == "Ship one planning artifact bridge"
571+
assert worker_contracts[0]["prompt_contract_id"] == "worker-1"
572+
assert worker_contracts[0]["continuation_policy"]["on_blocked"] == "spawn_independent_temporary_unblock_task"
573+
assert intake_events[-1] == ("persist", {"event": "INTAKE_RUN", "run_id": run_id})
574+
575+
466576
def test_build_role_binding_summary_marks_skills_and_mcp_registry_refs_as_registry_backed() -> None:
467577
summary = build_role_binding_summary(
468578
{

0 commit comments

Comments
 (0)