Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from cortexpilot_orch.config import load_config
from cortexpilot_orch.contract.compiler import build_role_binding_summary, sync_role_contract
from cortexpilot_orch.observability.logger import log_event
from cortexpilot_orch.planning.intake import IntakeService, _build_wave_plan, _build_worker_prompt_contracts
from cortexpilot_orch.planning.intake import (
IntakeService,
_build_unblock_tasks_from_worker_contracts,
_build_wave_plan,
_build_worker_prompt_contracts,
)
from cortexpilot_orch.store.intake_store import IntakeStore
from cortexpilot_orch.store.run_store import RunStore

Expand Down Expand Up @@ -169,9 +174,11 @@ def _persist_planning_artifacts_for_run(

run_store = RunStore(runs_root=runs_root)
run_dir = run_store.run_dir(run_id)
worker_prompt_contracts = _build_worker_prompt_contracts(plan_bundle, intake_payload)
artifacts_to_write: list[tuple[str, Any]] = [
("planning_wave_plan.json", _build_wave_plan(plan_bundle)),
("planning_worker_prompt_contracts.json", _build_worker_prompt_contracts(plan_bundle, intake_payload)),
("planning_worker_prompt_contracts.json", worker_prompt_contracts),
("planning_unblock_tasks.json", _build_unblock_tasks_from_worker_contracts(worker_prompt_contracts)),
]
written: list[str] = []
artifact_refs: list[dict[str, Any]] = []
Expand Down
61 changes: 59 additions & 2 deletions apps/orchestrator/src/cortexpilot_orch/planning/intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,56 @@ def _build_worker_prompt_contracts(plan_bundle: dict[str, Any], payload: dict[st
return contracts


def _build_unblock_tasks_from_worker_contracts(worker_contracts: list[dict[str, Any]]) -> list[dict[str, Any]]:
tasks: list[dict[str, Any]] = []
for contract in worker_contracts:
if not isinstance(contract, dict):
continue
continuation_policy = contract.get("continuation_policy") if isinstance(contract.get("continuation_policy"), dict) else {}
on_blocked = str(continuation_policy.get("on_blocked") or "").strip()
if on_blocked != "spawn_independent_temporary_unblock_task":
continue
assigned_agent = contract.get("assigned_agent") if isinstance(contract.get("assigned_agent"), dict) else {}
blocked_when = contract.get("blocked_when") if isinstance(contract.get("blocked_when"), list) else []
reason = next(
(
str(item).strip()
for item in blocked_when
if str(item).strip() and (
"external blocker" in str(item).lower()
or "unblock task" in str(item).lower()
)
),
"an external blocker requires an L0-managed unblock task",
)
prompt_contract_id = str(contract.get("prompt_contract_id") or "").strip() or "worker-preview-1"
tasks.append(
{
"version": "v1",
"unblock_task_id": f"unblock-{prompt_contract_id}",
"source_prompt_contract_id": prompt_contract_id,
"objective": f"Unblock the scoped worker assignment for {contract.get('objective') or 'the current wave'}",
"scope_hint": str(contract.get("scope") or "").strip() or "No scope summary provided.",
"assigned_agent": {
"role": str(assigned_agent.get("role") or "WORKER").strip() or "WORKER",
"agent_id": str(assigned_agent.get("agent_id") or "agent-1").strip() or "agent-1",
},
"owner": "L0",
"mode": "independent_temporary_task",
"status": "proposed",
"trigger": on_blocked,
"reason": reason,
"verification_requirements": [
str(item).strip()
for item in contract.get("verification_requirements", [])
if str(item).strip()
]
or ["repo_hygiene"],
}
)
return tasks


def _apply_intake_contract_overrides(
contract: dict[str, Any],
intake_payload: dict[str, Any],
Expand Down Expand Up @@ -1127,6 +1177,10 @@ def preview(self, payload: dict[str, Any]) -> dict[str, Any]:
assigned_role = str(assigned_agent.get("role") or "WORKER").strip() or "WORKER"
predicted_reports = _predicted_reports_for_task_template(task_template)
predicted_artifacts = _predicted_artifacts_for_payload(normalized_payload)
worker_prompt_contracts = _build_worker_prompt_contracts(plan_bundle, normalized_payload)
unblock_tasks = _build_unblock_tasks_from_worker_contracts(worker_prompt_contracts)
if unblock_tasks and "planning_unblock_tasks.json" not in predicted_artifacts:
predicted_artifacts.append("planning_unblock_tasks.json")
warnings: list[str] = []
if requires_human_approval:
warnings.append("Current policies suggest the run may require manual approval before execution can continue.")
Expand Down Expand Up @@ -1171,13 +1225,16 @@ def preview(self, payload: dict[str, Any]) -> dict[str, Any]:
"plan_bundle": plan_bundle,
"task_chain": task_chain,
"wave_plan": _build_wave_plan(plan_bundle),
"worker_prompt_contracts": _build_worker_prompt_contracts(plan_bundle, normalized_payload),
"worker_prompt_contracts": worker_prompt_contracts,
"unblock_tasks": unblock_tasks,
"role_contract_summary": contract_preview.get("role_contract") if isinstance(contract_preview.get("role_contract"), dict) else {},
"contract_preview": contract_preview,
}
self._validator.validate_report(response["wave_plan"], "wave_plan.v1.json")
for contract in response["worker_prompt_contracts"]:
for contract in worker_prompt_contracts:
self._validator.validate_report(contract, "worker_prompt_contract.v1.json")
for unblock_task in unblock_tasks:
self._validator.validate_report(unblock_task, "unblock_task.v1.json")
self._validator.validate_report(response, "execution_plan_report.v1.json")
return response

Expand Down
4 changes: 4 additions & 0 deletions apps/orchestrator/tests/test_intake_preview_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def test_intake_preview_builds_execution_plan_report(monkeypatch) -> None:
assert report["wave_plan"]["worker_count"] == 1
assert report["worker_prompt_contracts"][0]["prompt_contract_id"] == "plan-preview-1"
assert report["worker_prompt_contracts"][0]["continuation_policy"]["on_blocked"] == "spawn_independent_temporary_unblock_task"
assert report["unblock_tasks"][0]["source_prompt_contract_id"] == "plan-preview-1"
assert report["unblock_tasks"][0]["trigger"] == "spawn_independent_temporary_unblock_task"
assert "planning_unblock_tasks.json" in report["predicted_artifacts"]


def test_intake_preview_marks_manual_approval_when_env_requires_it(monkeypatch) -> None:
Expand Down Expand Up @@ -178,3 +181,4 @@ def test_intake_preview_marks_manual_approval_when_env_requires_it(monkeypatch)
assert report["warnings"]
assert report["wave_plan"]["completion_policy_ref"].endswith("#/wave_completion_policy")
assert report["worker_prompt_contracts"][0]["verification_requirements"] == ["repo_hygiene"]
assert report["unblock_tasks"][0]["owner"] == "L0"
Original file line number Diff line number Diff line change
Expand Up @@ -564,16 +564,26 @@ def execute_task(contract_path: Path, mock_mode: bool = False) -> str:
worker_contracts = json.loads(
(runs_root / run_id / "artifacts" / "planning_worker_prompt_contracts.json").read_text(encoding="utf-8")
)
unblock_tasks = json.loads(
(runs_root / run_id / "artifacts" / "planning_unblock_tasks.json").read_text(encoding="utf-8")
)
manifest = json.loads((runs_root / run_id / "manifest.json").read_text(encoding="utf-8"))

assert result["planning_artifacts"] == ["planning_wave_plan.json", "planning_worker_prompt_contracts.json"]
assert result["planning_artifacts"] == [
"planning_wave_plan.json",
"planning_worker_prompt_contracts.json",
"planning_unblock_tasks.json",
]
assert wave_plan["wave_id"] == "bundle-1"
assert wave_plan["objective"] == "Ship one planning artifact bridge"
assert worker_contracts[0]["prompt_contract_id"] == "worker-1"
assert worker_contracts[0]["continuation_policy"]["on_blocked"] == "spawn_independent_temporary_unblock_task"
assert unblock_tasks[0]["source_prompt_contract_id"] == "worker-1"
assert unblock_tasks[0]["owner"] == "L0"
artifact_names = [item["name"] for item in manifest["artifacts"]]
assert "planning_wave_plan" in artifact_names
assert "planning_worker_prompt_contracts" in artifact_names
assert "planning_unblock_tasks" in artifact_names
assert intake_events[-1] == ("persist", {"event": "INTAKE_RUN", "run_id": run_id})


Expand Down
16 changes: 16 additions & 0 deletions apps/orchestrator/tests/test_schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,22 @@ def test_new_operator_report_and_task_pack_schemas_pass() -> None:
}
assert validator.validate_report(worker_prompt_contract, "worker_prompt_contract.v1.json")["prompt_contract_id"] == "worker-prompt-1"

unblock_task = {
"version": "v1",
"unblock_task_id": "unblock-worker-prompt-1",
"source_prompt_contract_id": "worker-prompt-1",
"objective": "Unblock the scoped worker assignment for Ship the preview artifact.",
"scope_hint": "Implement the preview artifact inside apps/orchestrator/src.",
"assigned_agent": {"role": "WORKER", "agent_id": "agent-1"},
"owner": "L0",
"mode": "independent_temporary_task",
"status": "proposed",
"trigger": "spawn_independent_temporary_unblock_task",
"reason": "an external blocker requires an L0-managed unblock task",
"verification_requirements": ["repo_hygiene"],
}
assert validator.validate_report(unblock_task, "unblock_task.v1.json")["unblock_task_id"] == "unblock-worker-prompt-1"

context_pack = {
"version": "v1",
"pack_id": "ctx-pack-1",
Expand Down
3 changes: 3 additions & 0 deletions docs/architecture/runtime-topology.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ flowchart LR
alongside `plan_bundle`, `task_chain`, and `contract_preview`, so operator
planning surfaces can speak in canon planner language without changing
execution authority.
- The same planning preview may now derive `unblock_tasks`, and run bundles may
persist `planning_unblock_tasks.json` when worker continuation policy says
blocked work should spawn an independent temporary unblock task.
- Queue truth currently lives in `.runtime-cache/cortexpilot/queue.jsonl`; API
and workflow surfaces read that queue state and derive `eligible` /
`sla_state` instead of storing a second scheduler database.
Expand Down
11 changes: 11 additions & 0 deletions docs/specs/00_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,17 @@
- `harness_request` represents a proposed capability change; applying that
change still depends on policy and approval boundaries.

### 6.6 Unblock Task Contract

- `schemas/unblock_task.v1.json` defines the first-class object shape for an
L0-managed independent temporary unblock assignment.
- `unblock_task` is derived from worker continuation policy when
`on_blocked = spawn_independent_temporary_unblock_task`.
- Intake preview may surface `unblock_tasks`, and run-local planning artifacts
may persist `planning_unblock_tasks.json` as an advisory planning artifact.
- `unblock_task` does not replace `task_contract` as execution authority; it is
a read-only control-plane object for unblock coordination.

---

## 7. State Machine
Expand Down
1 change: 1 addition & 0 deletions packages/frontend-shared/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ export type ExecutionPlanReport = {
task_chain?: JsonValue;
wave_plan?: JsonValue;
worker_prompt_contracts?: JsonValue[];
unblock_tasks?: JsonValue[];
contract_preview: RunContract;
};

Expand Down
3 changes: 2 additions & 1 deletion schemas/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
## 2026-04-12
- Added `control_plane_runtime_policy.v1.json` to formalize L0 command-tower runtime rules.
- Added `wave_plan.v1.json` and `worker_prompt_contract.v1.json` for planner preview artifacts.
- Added `unblock_task.v1.json` to formalize L0-managed independent temporary unblock assignments.
- Added `context_pack.v1.json` and `harness_request.v1.json` to reserve first-class schema homes for explicit handoff and harness-evolution contracts.
- Extended `execution_plan_report.v1.json` with `wave_plan` and `worker_prompt_contracts`.
- Extended `execution_plan_report.v1.json` with `wave_plan`, `worker_prompt_contracts`, and `unblock_tasks`.

## 2026-02-04
- Added `schema_registry.json` with SHA256 and size metadata for all v1 schemas.
Expand Down
1 change: 1 addition & 0 deletions schemas/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Machine-readable schemas for contracts, events, and policy validation.
- `control_plane_runtime_policy.v1.json` — machine-readable command-tower runtime constitution for L0/L1/L2, wake policy, completion governance, and harness boundaries.
- `wave_plan.v1.json` — wave-level orchestration preview artifact derived from intake planning.
- `worker_prompt_contract.v1.json` — worker-scoped planner artifact for scope, reading list, continuation, and verification rules.
- `unblock_task.v1.json` — L0-managed independent temporary unblock assignment derived from worker continuation policy.
- `context_pack.v1.json` — explicit fallback handoff contract for context-pressure and role-switch situations.
- `harness_request.v1.json` — capability-evolution request contract for session-local/project-local/global harness changes.
- `approval_pack.v1.json` / `incident_pack.v1.json` / `run_compare_report.v1.json` — derived operator-readable decision packs for approval, failure triage, and replay compare surfaces.
Expand Down
6 changes: 6 additions & 0 deletions schemas/execution_plan_report.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@
"type": "object"
}
},
"unblock_tasks": {
"type": "array",
"items": {
"type": "object"
}
},
"role_contract_summary": {
"type": "object",
"additionalProperties": false,
Expand Down
6 changes: 6 additions & 0 deletions schemas/execution_plan_report.v1.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@
"type": "object"
}
},
"unblock_tasks": {
"type": "array",
"items": {
"type": "object"
}
},
"role_contract_summary": {
"type": "object",
"additionalProperties": false,
Expand Down
16 changes: 8 additions & 8 deletions schemas/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@
"bytes": 1842
},
"execution_plan_report.schema.json": {
"sha256": "211a6d5310adea65ddedb4bc61de4542879772f33b92d40335ad1cbb748c6afb",
"bytes": 7414
"sha256": "9175fefdde202bbc0a7234f8f5f8eab468b5f5583ceaf778cb6ab7d8311c2003",
"bytes": 7517
},
"execution_plan_report.v1.json": {
"sha256": "0c9d6ed72669bcf56fa59a5c2be832f71dd3615e854e0e6a4f6ff72e26d3de8f",
"bytes": 7410
"sha256": "7c0c07d36c4ba6f81f1962243514c984537f6398123f9f18395e7501823094be",
"bytes": 7513
},
"flight_plan_copilot_brief.v1.json": {
"sha256": "555cdae186c13823ccbb716552ec023020eddea4babf3f0f07abfca8a149fab3",
Expand Down Expand Up @@ -193,10 +193,6 @@
"sha256": "771100fbbad00e09e3ee8b4fec888e07e36c6f18ac9e5ae02a9bed6d9f1d219a",
"bytes": 581
},
"schema_registry.json": {
"sha256": "ba5590a6953e3b9d61108b89ac28a0297d8b86ca83f02cc3ccb51ba956d545d6",
"bytes": 8535
},
"search_requests.v1.json": {
"sha256": "e74f48f215fdf9035e284ee4f7a0f931b4a88f320c90a93ab36dcb96d17fd684",
"bytes": 2270
Expand Down Expand Up @@ -292,6 +288,10 @@
"worker_prompt_contract.v1.json": {
"sha256": "6b8a4331f0903f265e565b5a634238a5525972360d6c00501891b7e109b02bec",
"bytes": 4056
},
"unblock_task.v1.json": {
"sha256": "b5e225b38aebaf823fbc7bcbb38a515f9666734093500df5a28d48218afd35ef",
"bytes": 1891
}
}
}
Loading
Loading