From 83a5cb5b157bb52c657d44757ba84c350723487c Mon Sep 17 00:00:00 2001 From: Abir Abbas Date: Wed, 3 Jun 2026 15:52:51 -0400 Subject: [PATCH 1/5] build: add hax-sdk and pin agentfield for HITL gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The human-in-the-loop review gate uses the hax-sdk form-builder + agentfield's app.pause() primitive. Add hax-sdk>=0.2.4 and pin agentfield>=0.1.84 (which has pause()/ApprovalResult). Mirror the change in the Dockerfile's explicit pip list since it installs the package with --no-deps, so that list — not pyproject — is what the runtime image actually resolves. Co-Authored-By: Claude Opus 4.8 (1M context) --- Dockerfile | 3 ++- pyproject.toml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index c2b6ad8..a14d36c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,8 @@ COPY pyproject.toml README.md ./ COPY src/ src/ RUN pip install --no-cache-dir --prefix=/install \ - "agentfield" \ + "agentfield>=0.1.84" \ + "hax-sdk>=0.2.4" \ "pydantic>=2.0" \ "httpx>=0.27" \ "python-dotenv>=1.0" \ diff --git a/pyproject.toml b/pyproject.toml index d11d472..d9c5449 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,8 @@ license = "Apache-2.0" requires-python = ">=3.11" authors = [{ name = "AgentField", email = "hello@agentfield.dev" }] dependencies = [ - "agentfield", + "agentfield>=0.1.84", + "hax-sdk>=0.2.4", "pydantic>=2.0", "httpx>=0.27", "pyyaml>=6.0", From ecbe84efdc7887f9d48e9c6f42b800cd75db8196 Mon Sep 17 00:00:00 2001 From: Abir Abbas Date: Wed, 3 Jun 2026 15:53:01 -0400 Subject: [PATCH 2/5] feat(hitl): add human-in-the-loop review gate module Ports SWE-AF's plan-phase approval pattern for PR reviews: - client.py: hax client builder (HAX_API_KEY/HAX_SDK_URL on/off switch), control-plane webhook resolver, watchdog-safe create_request wrapper (asyncio.wait_for over a worker thread, per SWE-AF's pause-watchdog lesson), and the ApprovalResult value extractor. - review_gate.py: builds a hax form-builder request (PR-intent blurb + one checkbox per finding + action radio + instructions textarea), pauses the workflow via app.pause(), and parses the response into a ReviewDecision (post_selected / rerun / reject). Terminal outcomes (expired/error, or a failed create/pause) default to reject so an unreviewed review is never posted. Reuses hax's generic form-builder type, so no external hax frontend template is needed. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/pr_af/hitl/__init__.py | 38 +++++ src/pr_af/hitl/client.py | 146 +++++++++++++++++ src/pr_af/hitl/review_gate.py | 300 ++++++++++++++++++++++++++++++++++ 3 files changed, 484 insertions(+) create mode 100644 src/pr_af/hitl/__init__.py create mode 100644 src/pr_af/hitl/client.py create mode 100644 src/pr_af/hitl/review_gate.py diff --git a/src/pr_af/hitl/__init__.py b/src/pr_af/hitl/__init__.py new file mode 100644 index 0000000..30c71a9 --- /dev/null +++ b/src/pr_af/hitl/__init__.py @@ -0,0 +1,38 @@ +"""Human-in-the-loop review gate for PR-AF. + +Mirrors SWE-AF's plan-phase approval: when ``HAX_API_KEY`` is set, PR-AF pauses +before posting a review and routes the findings to a hax workspace member for +per-finding approval, re-review, or rejection. +""" + +from __future__ import annotations + +from .client import ( + approval_webhook_url, + build_hax_client_from_env, + create_hax_form_request_with_timeout, + extract_values_from_raw, +) +from .review_gate import ( + ACTION_POST, + ACTION_REJECT, + ACTION_RERUN, + ReviewDecision, + build_review_form, + parse_review_decision, + request_review_approval, +) + +__all__ = [ + "ACTION_POST", + "ACTION_REJECT", + "ACTION_RERUN", + "ReviewDecision", + "approval_webhook_url", + "build_hax_client_from_env", + "build_review_form", + "create_hax_form_request_with_timeout", + "extract_values_from_raw", + "parse_review_decision", + "request_review_approval", +] diff --git a/src/pr_af/hitl/client.py b/src/pr_af/hitl/client.py new file mode 100644 index 0000000..d989df1 --- /dev/null +++ b/src/pr_af/hitl/client.py @@ -0,0 +1,146 @@ +"""hax-sdk plumbing for the PR-AF human-in-the-loop review gate. + +Ported from SWE-AF's ``swe_af.hitl.ask_user`` — the hax client builder, the +control-plane webhook URL resolver, the watchdog-safe ``create_request`` +wrapper, and the helper that digs form values out of an ``ApprovalResult``. + +``hax`` is imported lazily so this module (and the orchestrator that imports it) +stays importable in environments without the SDK installed. +""" + +from __future__ import annotations + +import asyncio +import os +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from hax import HaxClient + + +# Same default as SWE-AF: hax service on :3000, REST under /api/v1. +_DEFAULT_HAX_BASE = "http://localhost:3000" + +# SWE-AF's lesson (test_pause_watchdog_e2e.py): the synchronous +# hax_client.create_request can wedge for tens of minutes and burn the +# reasoner's active-time budget, so every call is wrapped in a hard timeout. +HAX_CREATE_REQUEST_TIMEOUT_SECONDS = 120.0 + + +def build_hax_client_from_env() -> HaxClient | None: + """Construct a ``HaxClient`` from ``HAX_API_KEY`` / ``HAX_SDK_URL``. + + Returns ``None`` when ``HAX_API_KEY`` is unset or empty — callers treat that + as "HITL disabled" and post the review directly. This is the on/off switch. + """ + api_key = os.environ.get("HAX_API_KEY", "").strip() + if not api_key: + return None + from hax import HaxClient + + base = os.environ.get("HAX_SDK_URL", _DEFAULT_HAX_BASE).rstrip("/") + return HaxClient(api_key=api_key, base_url=f"{base}/api/v1") + + +def approval_webhook_url(app: Any) -> str | None: + """Resolve the control-plane webhook URL for ``app.pause`` callbacks. + + Mirrors the URL SWE-AF's plan-approval gate uses + (``{cp_base_url}/api/v1/webhooks/approval-response``). Returns ``None`` when + no control-plane URL can be resolved. + """ + cp_base = ( + getattr(app, "agentfield_server", None) + or os.environ.get("AGENTFIELD_SERVER") + or "" + ).rstrip("/") + if not cp_base: + return None + return f"{cp_base}/api/v1/webhooks/approval-response" + + +async def create_hax_form_request_with_timeout( + *, + app: Any, + hax_client: HaxClient, + form: Any, + title: str, + description: str | None, + expires_in_seconds: int, + user_id: str | None, + webhook_url: str | None, + metadata: dict[str, Any] | None, + timeout_seconds: float = HAX_CREATE_REQUEST_TIMEOUT_SECONDS, +) -> Any: + """Submit a hax form-builder request with a hard timeout. + + Runs the synchronous ``hax_client.create_request`` in a worker thread under + ``asyncio.wait_for`` so a wedged hax-sdk fails fast (``RuntimeError``) + instead of silently burning the reasoner's active-time budget. Returns the + ``CreatedRequest``; the caller passes ``.id`` / ``.url`` to ``app.pause``. + """ + app.note( + f"hitl: submitting hax form request ({title!r})", + tags=["hitl", "hax", "create_request"], + ) + + kwargs: dict[str, Any] = { + "type": "form-builder", + "payload": form.to_payload(), + "title": title, + "expires_in_seconds": expires_in_seconds, + } + if description is not None: + kwargs["description"] = description + if user_id is not None: + kwargs["user_id"] = user_id + if webhook_url is not None: + kwargs["webhook_url"] = webhook_url + if metadata is not None: + kwargs["metadata"] = metadata + + try: + created = await asyncio.wait_for( + asyncio.to_thread(hax_client.create_request, **kwargs), + timeout=timeout_seconds, + ) + except TimeoutError as exc: + app.note( + f"hitl: hax create_request timed out after {timeout_seconds}s", + tags=["hitl", "hax", "timeout"], + ) + raise RuntimeError( + f"hax-sdk create_request (form-builder) timed out after " + f"{timeout_seconds}s; hax-sdk is likely wedged." + ) from exc + except Exception as exc: + app.note( + f"hitl: hax create_request raised {type(exc).__name__}: {exc}", + tags=["hitl", "hax", "error"], + ) + raise + + app.note( + f"hitl: hax form request created (request_id={created.id})", + tags=["hitl", "hax", "submitted"], + ) + return created + + +def extract_values_from_raw(raw: Any) -> dict[str, Any]: + """Find the submitted form values inside an ``ApprovalResult.raw_response``. + + hax delivers values at ``raw['values']`` or ``raw['response']['values']`` + depending on the callback shape; check both. + """ + if not isinstance(raw, dict): + return {} + direct = raw.get("values") + if isinstance(direct, dict): + return dict(direct) + response_obj = raw.get("response") + if isinstance(response_obj, dict): + inner = response_obj.get("values") + if isinstance(inner, dict): + return dict(inner) + return {} diff --git a/src/pr_af/hitl/review_gate.py b/src/pr_af/hitl/review_gate.py new file mode 100644 index 0000000..b82e04b --- /dev/null +++ b/src/pr_af/hitl/review_gate.py @@ -0,0 +1,300 @@ +"""The PR-AF human-in-the-loop review gate. + +Builds a hax form that shows the reviewer a short blurb of the PR's intent plus +every individual finding, then pauses the workflow until a workspace member: + + * **post_selected** — post the checked subset of findings, or + * **rerun** — re-run the review with free-text instructions (e.g. "too + aggressive, tone it down"), or + * **reject** — post nothing. + +First responder wins: the single hax callback that resolves ``app.pause`` +decides the outcome. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +from .client import ( + create_hax_form_request_with_timeout, + extract_values_from_raw, +) + +if TYPE_CHECKING: + from hax import HaxClient + + from ..schemas.output import ScoredFinding + + +# Action chosen by the reviewer in the form's "action" radio. +ACTION_POST = "post_selected" +ACTION_RERUN = "rerun" +ACTION_REJECT = "reject" +_VALID_ACTIONS = {ACTION_POST, ACTION_RERUN, ACTION_REJECT} + +_SEVERITY_EMOJI = { + "critical": "🔴", + "important": "🟠", + "suggestion": "🔵", + "nitpick": "⚪", +} + + +@dataclass +class ReviewDecision: + """Parsed outcome of one HITL round.""" + + action: str # ACTION_POST | ACTION_RERUN | ACTION_REJECT + selected_finding_ids: set[str] = field(default_factory=set) + instructions: str = "" + # Underlying agentfield decision string ("approved", "expired", ...) for logs. + decision_raw: str = "" + + @property + def is_post(self) -> bool: + return self.action == ACTION_POST + + @property + def is_rerun(self) -> bool: + return self.action == ACTION_RERUN + + @property + def is_reject(self) -> bool: + return self.action == ACTION_REJECT + + +def _finding_option(finding: ScoredFinding) -> dict[str, str]: + """One checkbox option per finding: ``id`` is the value, label is human.""" + emoji = _SEVERITY_EMOJI.get(finding.severity, "•") + loc = finding.file_path or "(no file)" + if finding.line_start and finding.line_start > 0: + loc = f"{loc}:{finding.line_start}" + return { + "value": finding.id, + "label": f"{emoji} [{finding.severity}] {loc} — {finding.title}", + } + + +def _build_description( + pr_intent: str, + findings: list[ScoredFinding], + revision_iter: int, + revision_history: list[str], +) -> str: + """Markdown blurb shown above the form: PR intent + what's being asked.""" + counts: dict[str, int] = {} + for f in findings: + counts[f.severity] = counts.get(f.severity, 0) + 1 + count_str = ", ".join(f"{n} {sev}" for sev, n in counts.items()) or "no findings" + + lines = [] + if pr_intent: + lines.append("**PR intent:** " + pr_intent.strip()) + lines.append("") + lines.append( + f"PR-AF found **{len(findings)}** finding(s) ({count_str}). " + "Check the ones to post, or request a re-review with instructions." + ) + if revision_iter > 0: + lines.append("") + lines.append(f"_Revision round {revision_iter}._") + if revision_history: + lines.append("") + lines.append("Prior instructions:") + for idx, instr in enumerate(revision_history, start=1): + if instr: + lines.append(f"{idx}. {instr}") + return "\n".join(lines) + + +def build_review_form( + *, + pr_intent: str, + findings: list[ScoredFinding], + title: str, + revision_iter: int = 0, + revision_history: list[str] | None = None, +) -> Any: + """Translate the review into a ``hax.FormBuilder`` (imported lazily).""" + from hax import FormBuilder + + all_ids = [f.id for f in findings] + options = [_finding_option(f) for f in findings] + + form = ( + FormBuilder() + .title(title) + .description( + _build_description(pr_intent, findings, revision_iter, revision_history or []) + ) + .submit_label("Submit decision") + ) + + # checkbox_group requires at least one option; skip it when there are no + # findings (an empty review still lets the reviewer approve/reject). + if options: + form.checkbox_group( + "findings_to_post", + label="Findings to post", + description="Only the checked findings are posted to the PR.", + options=options, + default_value=all_ids, + ) + + form.radio_group( + "action", + label="Action", + options=[ + {"value": ACTION_POST, "label": "Post selected findings"}, + {"value": ACTION_RERUN, "label": "Re-review with instructions below"}, + {"value": ACTION_REJECT, "label": "Reject — post nothing"}, + ], + default_value=ACTION_POST, + ) + form.textarea( + "instructions", + label="Re-review instructions (optional)", + description="Used when action is 'Re-review'. E.g. 'too aggressive, tone it down'.", + required=False, + ) + return form + + +def _coerce_str(value: Any) -> str: + if isinstance(value, str): + return value.strip() + if isinstance(value, list) and value: + first = value[0] + return first.strip() if isinstance(first, str) else str(first) + return "" + + +def _coerce_id_list(value: Any) -> list[str]: + if isinstance(value, list): + return [v for v in value if isinstance(v, str)] + if isinstance(value, str) and value: + return [value] + return [] + + +def parse_review_decision(approval_result: Any, all_finding_ids: list[str]) -> ReviewDecision: + """Convert an agentfield ``ApprovalResult`` into a ``ReviewDecision``. + + Terminal control-plane outcomes (expired/error, or a hax-level reject with + no form values) map to a reject. Otherwise the reviewer's ``action`` radio + drives the outcome; an absent ``findings_to_post`` field defaults to posting + everything (the form pre-checks all findings). + """ + decision = (getattr(approval_result, "decision", "") or "").strip() + feedback = (getattr(approval_result, "feedback", "") or "").strip() + values = extract_values_from_raw(getattr(approval_result, "raw_response", None)) + + if decision in {"expired", "error"}: + return ReviewDecision(action=ACTION_REJECT, instructions=feedback, decision_raw=decision) + if decision == "rejected" and not values: + return ReviewDecision(action=ACTION_REJECT, instructions=feedback, decision_raw=decision) + + action = _coerce_str(values.get("action")) + if action not in _VALID_ACTIONS: + action = ACTION_REJECT if decision == "rejected" else ACTION_POST + instructions = _coerce_str(values.get("instructions")) or feedback + + if action == ACTION_RERUN: + return ReviewDecision(action=ACTION_RERUN, instructions=instructions, decision_raw=decision) + if action == ACTION_REJECT: + return ReviewDecision(action=ACTION_REJECT, instructions=instructions, decision_raw=decision) + + # post_selected: honor the checked subset; default to all when field absent. + if "findings_to_post" in values: + selected = set(_coerce_id_list(values.get("findings_to_post"))) + else: + selected = set(all_finding_ids) + return ReviewDecision( + action=ACTION_POST, + selected_finding_ids=selected, + instructions=instructions, + decision_raw=decision, + ) + + +async def request_review_approval( + *, + app: Any, + hax_client: HaxClient, + pr_intent: str, + findings: list[ScoredFinding], + pr_label: str, + webhook_url: str | None, + user_id: str | None, + expires_in_hours: int, + revision_iter: int = 0, + revision_history: list[str] | None = None, + metadata: dict[str, Any] | None = None, +) -> ReviewDecision: + """Build the form, create the hax request, pause, return the decision. + + Any failure to create the request or pause is surfaced as a reject so the + pipeline never posts an unreviewed review when the gate is enabled. + """ + title = "PR-AF Review Approval" + if revision_iter > 0: + title = f"{title} (revision {revision_iter})" + if pr_label: + title = f"{title} — {pr_label}" + + try: + form = build_review_form( + pr_intent=pr_intent, + findings=findings, + title=title, + revision_iter=revision_iter, + revision_history=revision_history, + ) + except Exception as exc: + app.note( + f"hitl: failed to build review form: {exc}", + tags=["hitl", "form", "error"], + ) + return ReviewDecision(action=ACTION_REJECT, instructions=f"form build failed: {exc}") + + try: + created = await create_hax_form_request_with_timeout( + app=app, + hax_client=hax_client, + form=form, + title=title, + description=None, + expires_in_seconds=expires_in_hours * 3600, + user_id=user_id, + webhook_url=webhook_url, + metadata=metadata, + ) + except Exception as exc: + app.note( + f"hitl: create_request failed, treating as reject: {exc}", + tags=["hitl", "hax", "error"], + ) + return ReviewDecision(action=ACTION_REJECT, instructions=f"create_request failed: {exc}") + + try: + approval_result = await app.pause( + approval_request_id=created.id, + approval_request_url=created.url, + expires_in_hours=expires_in_hours, + ) + except Exception as exc: + app.note( + f"hitl: pause failed, treating as reject: {exc}", + tags=["hitl", "pause", "error"], + ) + return ReviewDecision(action=ACTION_REJECT, instructions=f"pause failed: {exc}") + + decision = parse_review_decision(approval_result, [f.id for f in findings]) + app.note( + f"hitl: review decision={decision.action} " + f"(raw={decision.decision_raw}, selected={len(decision.selected_finding_ids)})", + tags=["hitl", "decision", decision.action], + ) + return decision From 52968f9e15a1a9e9316d53d3149a6bd4106f6598 Mon Sep 17 00:00:00 2001 From: Abir Abbas Date: Wed, 3 Jun 2026 15:53:01 -0400 Subject: [PATCH 3/5] feat(config): add HITLConfig to ReviewConfig Enables the review gate when HAX_API_KEY is set (same trigger as SWE-AF). Reads only the env vars SWE-AF already uses (HAX_API_KEY, HAX_SDK_URL, AGENTFIELD_SERVER, AGENTFIELD_APPROVAL_USER_ID); expiry (72h) and max re-review revisions (2) are plain config defaults matching SWE-AF's BuildConfig, not new PR-AF-specific env var names. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/pr_af/config.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/pr_af/config.py b/src/pr_af/config.py index 4f87018..323b237 100644 --- a/src/pr_af/config.py +++ b/src/pr_af/config.py @@ -159,6 +159,35 @@ class DepthProfile(BaseModel): } +class HITLConfig(BaseModel): + """Human-in-the-loop review gate (mirrors SWE-AF's plan-phase approval). + + When enabled, PR-AF does not post its review directly. Instead it summarizes + the findings, sends a hax form request to a workspace member, and pauses + until they approve a subset, request a re-review with instructions, or + reject. Auto-enables when ``HAX_API_KEY`` is set — same trigger SWE-AF uses + (``build_hax_client_from_env`` returns ``None`` when it is unset, which the + orchestrator treats as "HITL off, post directly"). + """ + + # Mirrors the on/off switch in build_hax_client_from_env: HITL is active + # only when HAX_API_KEY is present. Kept here for observability/overrides. + enabled: bool = Field( + default_factory=lambda: bool(os.getenv("HAX_API_KEY", "").strip()) + ) + # Optional routing: which hax workspace user receives the request. + approval_user_id: str | None = Field( + default_factory=lambda: os.getenv("AGENTFIELD_APPROVAL_USER_ID") or None + ) + # How long the pause stays open before it expires (treated as a reject). + # Plain config default — matches SWE-AF's BuildConfig.approval_expires_in_hours + # (not env-driven, to avoid introducing PR-AF-specific env var names). + approval_expires_in_hours: int = 72 + # How many "re-review with instructions" rounds before giving up (no post). + # Matches SWE-AF's BuildConfig.max_plan_revision_iterations. + max_review_revisions: int = 2 + + class ReviewConfig(BaseModel): """Top-level configuration combining all sub-configs.""" @@ -166,6 +195,7 @@ class ReviewConfig(BaseModel): models: ModelConfig = Field(default_factory=ModelConfig) scoring: ScoringConfig = Field(default_factory=ScoringConfig) comments: CommentConfig = Field(default_factory=CommentConfig) + hitl: HITLConfig = Field(default_factory=HITLConfig) # File ignore patterns (glob) ignore_paths: list[str] = Field( From bebbd23f23adc81c7af02cf8b86fe8635c0dc075 Mon Sep 17 00:00:00 2001 From: Abir Abbas Date: Wed, 3 Jun 2026 15:53:17 -0400 Subject: [PATCH 4/5] feat(orchestrator): gate review posting behind the HITL approval loop Restructure run() so intake + anatomy run once, then wrap the finding-producing phases (meta-selectors -> synthesis) in a revision loop. When HITL is enabled (HAX_API_KEY set + a real PR), the gate runs after synthesis: - post_selected -> filter findings to the approved subset and post; - rerun -> re-run the review phases with the reviewer's instructions threaded in, capped at max_review_revisions; - reject / expire / error / revision-cap -> post nothing (public-repo safety). _generate_output gains a post_to_github flag so the no-post path still builds a full ReviewResult for observability. When HITL is off, behavior is unchanged. Reviewer feedback is threaded into dimension selection (the three meta lenses, via _build_meta_context) and the reviewer prompt (review_dimension), so a re-review actually respects guidance like "tone it down". Co-Authored-By: Claude Opus 4.8 (1M context) --- src/pr_af/orchestrator.py | 148 ++++++++++++++++++++++++++++--- src/pr_af/reasoners/harnesses.py | 36 +++++++- 2 files changed, 170 insertions(+), 14 deletions(-) diff --git a/src/pr_af/orchestrator.py b/src/pr_af/orchestrator.py index 5d38158..38b6035 100644 --- a/src/pr_af/orchestrator.py +++ b/src/pr_af/orchestrator.py @@ -22,6 +22,11 @@ from .diff_engine import parse_unified_diff from .evidence import EvidencePackage, extract_evidence_for_findings from .github.client import GitHubClient +from .hitl import ( + approval_webhook_url, + build_hax_client_from_env, + request_review_approval, +) from .reasoners.harnesses import ( adversary_phase, anatomy_phase, @@ -136,15 +141,94 @@ async def run(self) -> ReviewResult: self.anatomy_result = anatomy print(f"[PR-AF] Anatomy complete: {len(anatomy.files)} files, {len(anatomy.clusters)} clusters", flush=True) - print("[PR-AF] Phase 3: META-SELECTORS (3 parallel lenses)", flush=True) - plan = await self._run_meta_selectors(intake, anatomy, review_depth) + # Human-in-the-loop review gate (mirrors SWE-AF's plan-phase approval). + # Active only when HAX_API_KEY is set AND we have a real PR to post to; + # otherwise hax_client is None and we post directly, exactly as before. + hax_client = None + if self.input.pr_url and not self.input.dry_run: + hax_client = build_hax_client_from_env() + + max_revisions = self.config.hitl.max_review_revisions if hax_client else 0 + revision_history: list[str] = [] + reviewer_feedback = "" + + for revision_iter in range(max_revisions + 1): + plan, scored_findings = await self._run_review_phases( + intake, anatomy, review_depth, reviewer_feedback + ) + + if hax_client is None: + print("[PR-AF] Phase 8: OUTPUT (direct post)", flush=True) + return await self._finish(scored_findings, intake, anatomy, plan, post=True) + + decision = await request_review_approval( + app=self.app, + hax_client=hax_client, + pr_intent=intake.pr_summary, + findings=scored_findings, + pr_label=self._pr_label(), + webhook_url=approval_webhook_url(self.app), + user_id=self.config.hitl.approval_user_id, + expires_in_hours=self.config.hitl.approval_expires_in_hours, + revision_iter=revision_iter, + revision_history=revision_history, + metadata=self._hitl_metadata(), + ) + + if decision.is_post: + approved = [f for f in scored_findings if f.id in decision.selected_finding_ids] + print( + f"[PR-AF] HITL approved {len(approved)}/{len(scored_findings)} findings — posting", + flush=True, + ) + return await self._finish(approved, intake, anatomy, plan, post=True) + + if decision.is_rerun and revision_iter < max_revisions: + revision_history.append(decision.instructions) + reviewer_feedback = self._merge_feedback(revision_history) + print( + f"[PR-AF] HITL re-review requested (round {revision_iter + 1}/{max_revisions}): " + f"{decision.instructions!r}", + flush=True, + ) + continue + + # reject, expire/error, or a re-review past the revision cap → no post. + reason = "revision cap reached" if decision.is_rerun else decision.action + self.app.note( + f"hitl: not posting review ({reason}; raw={decision.decision_raw})", + tags=["hitl", "no-post", decision.action], + ) + print(f"[PR-AF] HITL: not posting review ({reason})", flush=True) + return await self._finish(scored_findings, intake, anatomy, plan, post=False) + # The loop always returns; this guards against a future edit slipping by. + raise RuntimeError("review loop exited without producing a result") + + async def _run_review_phases( + self, + intake: IntakeResult, + anatomy: AnatomyResult, + review_depth: str, + reviewer_feedback: str = "", + ) -> tuple[ReviewPlan, list[ScoredFinding]]: + """Run the finding-producing phases (meta-selectors → synthesis). + + Intake and anatomy are computed once by the caller; this is the part + re-run when a reviewer requests a re-review. ``reviewer_feedback`` is the + accumulated human guidance threaded into dimension selection and the + reviewer prompts so a re-run actually respects "tone it down" etc. + """ + print("[PR-AF] Phase 3: META-SELECTORS (3 parallel lenses)", flush=True) + plan = await self._run_meta_selectors(intake, anatomy, review_depth, reviewer_feedback) print(f"[PR-AF] Meta-selectors complete: {len(plan.dimensions)} dimensions", flush=True) print("[PR-AF] Phase 4+5: REVIEW (parallel) + LAYER", flush=True) findings_queue: asyncio.Queue[list[ReviewFinding] | None] = asyncio.Queue() - review_task = asyncio.create_task(self._run_parallel_review(plan, findings_queue)) + review_task = asyncio.create_task( + self._run_parallel_review(plan, findings_queue, reviewer_feedback=reviewer_feedback) + ) layer_task = asyncio.create_task(self._run_review_layer(plan, findings_queue, anatomy)) _, layer_result = await asyncio.gather(review_task, layer_task) @@ -163,18 +247,51 @@ async def run(self) -> ReviewResult: print("[PR-AF] Phase 7: SYNTHESIS", flush=True) scored_findings = self._synthesize(all_findings, adversary_results) print(f"[PR-AF] Synthesis complete: {len(scored_findings)} scored findings", flush=True) + return plan, scored_findings - print("[PR-AF] Phase 8: OUTPUT", flush=True) - result = await self._generate_output(scored_findings, intake, anatomy, plan) + async def _finish( + self, + scored_findings: list[ScoredFinding], + intake: IntakeResult, + anatomy: AnatomyResult, + plan: ReviewPlan, + *, + post: bool, + ) -> ReviewResult: + """Generate output (optionally posting) and clean up the context dir.""" + result = await self._generate_output(scored_findings, intake, anatomy, plan, post_to_github=post) print( - f"[PR-AF] Pipeline complete! {result.summary.total_findings} findings, cost=${result.summary.cost_usd}", + f"[PR-AF] Pipeline complete! {result.summary.total_findings} findings, " + f"cost=${result.summary.cost_usd}, posted={post}", flush=True, ) - self._cleanup_context_dir() - return result + def _pr_label(self) -> str: + if self.pr_data and self.pr_data.owner and self.pr_data.repo: + return f"{self.pr_data.owner}/{self.pr_data.repo}#{self.pr_data.number}" + return "" + + def _merge_feedback(self, revision_history: list[str]) -> str: + """Collapse accumulated reviewer instructions into one guidance string.""" + items = [instr.strip() for instr in revision_history if instr and instr.strip()] + if not items: + return "" + return " | ".join(items) + + def _hitl_metadata(self) -> dict[str, Any]: + execution_id = "" + ctx = getattr(self.app, "ctx", None) + if ctx is not None: + execution_id = getattr(ctx, "execution_id", "") or "" + return { + "prLabel": self._pr_label(), + "prUrl": self.input.pr_url or "", + "reviewId": self.review_id, + "executionId": execution_id, + } + async def _run_intake(self) -> IntakeResult: if self._budget_or_timeout_exhausted("intake"): raise BudgetExhaustedError("Budget exhausted before intake") @@ -257,7 +374,13 @@ async def _run_planning(self, intake: IntakeResult, anatomy: AnatomyResult, revi return plan - async def _run_meta_selectors(self, intake: IntakeResult, anatomy: AnatomyResult, review_depth: str) -> ReviewPlan: + async def _run_meta_selectors( + self, + intake: IntakeResult, + anatomy: AnatomyResult, + review_depth: str, + reviewer_feedback: str = "", + ) -> ReviewPlan: if self._budget_or_timeout_exhausted("meta_selectors"): raise BudgetExhaustedError("Budget exhausted before meta-selectors") @@ -276,6 +399,7 @@ async def run_lens(lens_name: str) -> MetaDimensionResult: depth=review_depth, repo_path=self.input.repo_path or "", diff_patches=self._build_file_patches(), + reviewer_feedback=reviewer_feedback, ) self.agent_invocations += 1 self._register_cost("meta_selectors", self._extract_cost(result_raw)) @@ -467,6 +591,7 @@ async def _run_parallel_review( plan: ReviewPlan, findings_queue: asyncio.Queue[list[ReviewFinding] | None], current_depth: int = 0, + reviewer_feedback: str = "", ) -> None: max_depth = self.config.budget.max_review_depth semaphore = asyncio.Semaphore(self.config.budget.max_concurrent_reviewers) @@ -490,6 +615,7 @@ async def run_dimension(dim: ReviewDimension, depth: int) -> None: intake_summary=self.intake_result.pr_summary if self.intake_result else "", diff_patches=dim_patches if dim_patches else None, all_dimension_names=[d.name for d in plan.dimensions if d.id != dim.id], + reviewer_feedback=reviewer_feedback, ) self.agent_invocations += 1 self._register_cost("review", self._extract_cost(result_raw)) @@ -789,6 +915,8 @@ async def _generate_output( intake: IntakeResult, anatomy: AnatomyResult, plan: ReviewPlan, + *, + post_to_github: bool = True, ) -> ReviewResult: if self.pr_data is None: raise RuntimeError("PR data not initialized") @@ -851,7 +979,7 @@ async def _generate_output( comments=comments, ) - if not self.input.dry_run and self.input.pr_url: + if post_to_github and not self.input.dry_run and self.input.pr_url: client = GitHubClient() try: await client.post_review( diff --git a/src/pr_af/reasoners/harnesses.py b/src/pr_af/reasoners/harnesses.py index dca5486..c388c81 100644 --- a/src/pr_af/reasoners/harnesses.py +++ b/src/pr_af/reasoners/harnesses.py @@ -475,7 +475,12 @@ async def planning_phase(intake: dict, anatomy: dict, depth: str = "standard", h # --------------------------------------------------------------------------- -def _build_meta_context(intake: dict, anatomy: dict, diff_patches: dict[str, str] | None = None) -> str: +def _build_meta_context( + intake: dict, + anatomy: dict, + diff_patches: dict[str, str] | None = None, + reviewer_feedback: str = "", +) -> str: """Build shared context string for all meta-selectors.""" import json as _json @@ -508,6 +513,13 @@ def _build_meta_context(intake: dict, anatomy: dict, diff_patches: dict[str, str if diff_patches: payload["diff_patches"] = dict(list(diff_patches.items())[:15]) + if reviewer_feedback: + # Human reviewer guidance from a prior HITL round. The reviewer saw the + # last set of findings and asked for changes (e.g. "too aggressive, tone + # it down", "drop nitpicks", "focus on X"). Let it shape which + # dimensions you generate this round. + payload["human_reviewer_guidance"] = reviewer_feedback + return _json.dumps(payload, default=str) @@ -518,13 +530,14 @@ async def meta_semantic( depth: str = "standard", repo_path: str = "", diff_patches: dict[str, str] | None = None, + reviewer_feedback: str = "", ) -> dict: """Semantic lens: What does this code DO differently? Focuses on logic, behavior, API contracts, concurrency, security, error handling. Asks: "If I run the old code and the new code side by side, where do they diverge?" """ - context = _build_meta_context(intake, anatomy, diff_patches) + context = _build_meta_context(intake, anatomy, diff_patches, reviewer_feedback) context_ref = f"{context}" if repo_path and len(context) > 8000: file_path = _write_context_file(context, "meta_semantic_context.json", repo_path) @@ -597,13 +610,14 @@ async def meta_mechanical( depth: str = "standard", repo_path: str = "", diff_patches: dict[str, str] | None = None, + reviewer_feedback: str = "", ) -> dict: """Mechanical lens: Does this code WORK correctly at the language level? Focuses on types, signatures, calling conventions, decorator effects, framework interactions. Asks: "Will this code compile/run without errors?" """ - context = _build_meta_context(intake, anatomy, diff_patches) + context = _build_meta_context(intake, anatomy, diff_patches, reviewer_feedback) context_ref = f"{context}" if repo_path and len(context) > 8000: file_path = _write_context_file(context, "meta_mechanical_context.json", repo_path) @@ -682,13 +696,14 @@ async def meta_systemic( depth: str = "standard", repo_path: str = "", diff_patches: dict[str, str] | None = None, + reviewer_feedback: str = "", ) -> dict: """Systemic lens: How does this code FIT the codebase? Focuses on patterns, complexity, readability, architectural coherence, test coverage. Asks: "Does this change make the codebase better or worse?" """ - context = _build_meta_context(intake, anatomy, diff_patches) + context = _build_meta_context(intake, anatomy, diff_patches, reviewer_feedback) context_ref = f"{context}" if repo_path and len(context) > 8000: file_path = _write_context_file(context, "meta_systemic_context.json", repo_path) @@ -773,11 +788,23 @@ async def review_dimension( intake_summary: str = "", diff_patches: dict[str, str] | None = None, all_dimension_names: list[str] | None = None, + reviewer_feedback: str = "", ) -> dict: ctx_files = context_files or [] risks = risk_surfaces or [] can_spawn = current_depth < max_depth + feedback_section = "" + if reviewer_feedback: + feedback_section = ( + "## Human Reviewer Guidance (IMPORTANT)\n\n" + "A human reviewer saw the previous round of findings and asked for a re-review " + f"with this guidance:\n\n> {reviewer_feedback}\n\n" + "Adjust your review accordingly — e.g. if asked to tone it down or drop nitpicks, " + "raise your bar and report only findings that clearly meet it; if asked to focus on " + "a specific area, prioritize that. Honor this guidance.\n\n" + ) + pr_context_section = "" if pr_narrative or risks: pr_context_section = ( @@ -837,6 +864,7 @@ async def review_dimension( f"{review_prompt}\n\n" f"**Target files** (read and analyze these): {', '.join(target_files)}\n" f"**Context files** (reference as needed): {', '.join(ctx_files) if ctx_files else 'none'}\n\n" + f"{feedback_section}" f"{pr_context_section}" f"{intake_section}" f"{dimensions_section}" From 38657d070ecca6b7d11ee03148f2538ea93661c4 Mon Sep 17 00:00:00 2001 From: Abir Abbas Date: Wed, 3 Jun 2026 15:53:17 -0400 Subject: [PATCH 5/5] test(hitl): cover the review gate and orchestrator revision loop Maps to the validation contract: hax client on/off switch, form shape (one checkbox per finding + action + textarea), decision parsing for every action and terminal outcome, watchdog fast-fail, and the end-to-end run() control flow (HITL off posts directly; approve-subset posts only selected; rerun re-runs with feedback then posts; reject/cap post nothing). Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_hitl_review_gate.py | 173 ++++++++++++++++++++++++++++++++ tests/test_orchestrator_hitl.py | 163 ++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100644 tests/test_hitl_review_gate.py create mode 100644 tests/test_orchestrator_hitl.py diff --git a/tests/test_hitl_review_gate.py b/tests/test_hitl_review_gate.py new file mode 100644 index 0000000..ad281e3 --- /dev/null +++ b/tests/test_hitl_review_gate.py @@ -0,0 +1,173 @@ +"""Unit tests for the PR-AF human-in-the-loop review gate. + +Each test maps to an item in the plan's validation contract: the on/off switch, +the form shape, the decision-parsing for every action/terminal outcome, and the +watchdog-safe create wrapper. +""" + +from __future__ import annotations + +import time +from types import SimpleNamespace + +import pytest + +from pr_af.hitl import ( + ACTION_POST, + ACTION_REJECT, + ACTION_RERUN, + build_hax_client_from_env, + build_review_form, + parse_review_decision, +) +from pr_af.hitl.client import create_hax_form_request_with_timeout +from pr_af.schemas.output import ScoredFinding + + +def _finding(fid: str, severity: str = "important", line: int = 10) -> ScoredFinding: + return ScoredFinding( + id=fid, + dimension_id="d1", + dimension_name="dim", + file_path="src/foo.py", + line_start=line, + line_end=line, + severity=severity, + title=f"title-{fid}", + body="body", + ) + + +class _FakeApp: + def note(self, *args, **kwargs): # noqa: D401 - test stub + pass + + +# --- on/off switch ------------------------------------------------------- + + +def test_build_hax_client_returns_none_without_api_key(monkeypatch): + monkeypatch.delenv("HAX_API_KEY", raising=False) + assert build_hax_client_from_env() is None + + +def test_build_hax_client_returns_client_with_api_key(monkeypatch): + monkeypatch.setenv("HAX_API_KEY", "test-key") + monkeypatch.setenv("HAX_SDK_URL", "http://hax.example") + client = build_hax_client_from_env() + assert client is not None + + +# --- form shape ---------------------------------------------------------- + + +def test_form_has_one_checkbox_option_per_finding_plus_action_and_textarea(): + findings = [_finding("f1"), _finding("f2", severity="nitpick"), _finding("f3")] + form = build_review_form(pr_intent="adds caching", findings=findings, title="t") + payload = form.to_payload() + fields = {f["id"]: f for f in payload["fields"]} + + assert set(fields) == {"findings_to_post", "action", "instructions"} + options = fields["findings_to_post"]["options"] + assert [o["value"] for o in options] == ["f1", "f2", "f3"] + # All findings pre-checked by default so "submit as-is" posts everything. + assert fields["findings_to_post"]["defaultValue"] == ["f1", "f2", "f3"] + assert {o["value"] for o in fields["action"]["options"]} == { + ACTION_POST, + ACTION_RERUN, + ACTION_REJECT, + } + + +def test_form_omits_checkbox_group_when_no_findings(): + form = build_review_form(pr_intent="docs only", findings=[], title="t") + field_ids = {f["id"] for f in form.to_payload()["fields"]} + assert "findings_to_post" not in field_ids + assert "action" in field_ids # reviewer can still approve/reject + + +# --- decision parsing ---------------------------------------------------- + + +def _approval(decision="approved", values=None, feedback=""): + return SimpleNamespace( + decision=decision, + feedback=feedback, + raw_response={"values": values} if values is not None else None, + ) + + +def test_post_selected_keeps_only_checked_subset(): + res = _approval(values={"action": ACTION_POST, "findings_to_post": ["f1", "f3"]}) + decision = parse_review_decision(res, ["f1", "f2", "f3"]) + assert decision.is_post + assert decision.selected_finding_ids == {"f1", "f3"} + + +def test_post_selected_defaults_to_all_when_field_absent(): + res = _approval(values={"action": ACTION_POST}) + decision = parse_review_decision(res, ["f1", "f2"]) + assert decision.is_post + assert decision.selected_finding_ids == {"f1", "f2"} + + +def test_rerun_carries_instructions(): + res = _approval(values={"action": ACTION_RERUN, "instructions": "tone it down"}) + decision = parse_review_decision(res, ["f1"]) + assert decision.is_rerun + assert decision.instructions == "tone it down" + + +def test_reject_action_maps_to_reject(): + res = _approval(values={"action": ACTION_REJECT}) + decision = parse_review_decision(res, ["f1"]) + assert decision.is_reject + + +@pytest.mark.parametrize("terminal", ["expired", "error"]) +def test_terminal_decisions_map_to_reject(terminal): + decision = parse_review_decision(_approval(decision=terminal), ["f1"]) + assert decision.is_reject + assert decision.decision_raw == terminal + + +def test_hax_level_reject_without_values_is_reject(): + decision = parse_review_decision(_approval(decision="rejected"), ["f1"]) + assert decision.is_reject + + +def test_values_nested_under_response_key_are_found(): + res = SimpleNamespace( + decision="approved", + feedback="", + raw_response={"response": {"values": {"action": ACTION_POST, "findings_to_post": ["f2"]}}}, + ) + decision = parse_review_decision(res, ["f1", "f2"]) + assert decision.selected_finding_ids == {"f2"} + + +# --- watchdog safety ----------------------------------------------------- + + +async def test_create_request_fails_fast_when_hax_wedges(): + """A hung sync create_request must raise quickly, not burn the budget.""" + + class _WedgedHax: + def create_request(self, **kwargs): + time.sleep(5) # simulate a wedged hax-sdk call + + start = time.monotonic() + with pytest.raises(RuntimeError, match="wedged"): + await create_hax_form_request_with_timeout( + app=_FakeApp(), + hax_client=_WedgedHax(), + form=SimpleNamespace(to_payload=lambda: {"fields": []}), + title="t", + description=None, + expires_in_seconds=3600, + user_id=None, + webhook_url=None, + metadata=None, + timeout_seconds=0.2, + ) + assert time.monotonic() - start < 3.0 # failed fast, well under the 5s sleep diff --git a/tests/test_orchestrator_hitl.py b/tests/test_orchestrator_hitl.py new file mode 100644 index 0000000..b6a60e8 --- /dev/null +++ b/tests/test_orchestrator_hitl.py @@ -0,0 +1,163 @@ +"""Orchestrator-level tests for the HITL revision loop. + +Maps to the validation contract items about end-to-end gate behavior: HITL off +posts directly; approve-subset posts only the selected findings; rerun re-runs +the review phases with feedback then posts; reject/expire/cap posts nothing. + +The review phases and GitHub post are stubbed — these tests exercise the control +flow in ``ReviewOrchestrator.run``, not the LLM pipeline. +""" + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +import pr_af.orchestrator as orch_mod +from pr_af.hitl import ACTION_POST, ACTION_REJECT, ACTION_RERUN, ReviewDecision +from pr_af.orchestrator import ReviewOrchestrator +from pr_af.schemas.input import GitHubPRData, ReviewInput +from pr_af.schemas.output import ScoredFinding + + +def _finding(fid: str) -> ScoredFinding: + return ScoredFinding( + id=fid, + dimension_id="d1", + dimension_name="dim", + file_path="src/foo.py", + line_start=10, + line_end=10, + severity="important", + title=f"title-{fid}", + body="body", + ) + + +class _FakeApp: + def __init__(self): + self.notes = [] + self.ctx = SimpleNamespace(execution_id="exec_1") + + def note(self, msg, **kwargs): + self.notes.append(msg) + + +def _make_orchestrator(monkeypatch, *, findings, hitl_on, decisions): + """Build an orchestrator with all heavy phases stubbed. + + ``decisions`` is a list of ReviewDecision returned by successive + request_review_approval calls. + """ + inp = ReviewInput(pr_url="https://github.com/o/r/pull/1", dry_run=False) + orch = ReviewOrchestrator(app=_FakeApp(), input=inp) + + async def fake_intake(): + orch.pr_data = GitHubPRData(owner="o", repo="r", number=1, title="t", description="") + return SimpleNamespace(pr_summary="adds caching", pr_type="feature", complexity="low") + + async def fake_anatomy(intake): + return SimpleNamespace(files=[], clusters=[]) + + feedbacks: list[str] = [] + + async def fake_phases(intake, anatomy, depth, reviewer_feedback=""): + feedbacks.append(reviewer_feedback) + return SimpleNamespace(dimensions=[]), list(findings) + + captured: dict = {} + + async def fake_generate(scored, intake, anatomy, plan, *, post_to_github=True): + captured["findings"] = list(scored) + captured["post"] = post_to_github + return SimpleNamespace( + summary=SimpleNamespace(total_findings=len(scored), cost_usd=0.0) + ) + + monkeypatch.setattr(orch, "_run_intake", fake_intake) + monkeypatch.setattr(orch, "_run_anatomy", fake_anatomy) + monkeypatch.setattr(orch, "_resolve_depth", lambda intake: "standard") + monkeypatch.setattr(orch, "_run_review_phases", fake_phases) + monkeypatch.setattr(orch, "_generate_output", fake_generate) + monkeypatch.setattr(orch, "_cleanup_context_dir", lambda: None) + + monkeypatch.setattr( + orch_mod, "build_hax_client_from_env", lambda: object() if hitl_on else None + ) + monkeypatch.setattr(orch_mod, "approval_webhook_url", lambda app: None) + + call_count = {"n": 0} + + async def fake_request(**kwargs): + idx = call_count["n"] + call_count["n"] += 1 + return decisions[idx] + + monkeypatch.setattr(orch_mod, "request_review_approval", fake_request) + + return orch, captured, feedbacks, call_count + + +async def test_hitl_off_posts_directly(monkeypatch): + findings = [_finding("f1"), _finding("f2")] + orch, captured, feedbacks, calls = _make_orchestrator( + monkeypatch, findings=findings, hitl_on=False, decisions=[] + ) + await orch.run() + assert captured["post"] is True + assert {f.id for f in captured["findings"]} == {"f1", "f2"} + assert calls["n"] == 0 # gate never consulted + + +async def test_approve_subset_posts_only_selected(monkeypatch): + findings = [_finding("f1"), _finding("f2"), _finding("f3")] + decision = ReviewDecision(action=ACTION_POST, selected_finding_ids={"f1", "f3"}) + orch, captured, feedbacks, calls = _make_orchestrator( + monkeypatch, findings=findings, hitl_on=True, decisions=[decision] + ) + await orch.run() + assert captured["post"] is True + assert {f.id for f in captured["findings"]} == {"f1", "f3"} + + +async def test_rerun_then_post_threads_feedback(monkeypatch): + findings = [_finding("f1")] + decisions = [ + ReviewDecision(action=ACTION_RERUN, instructions="too aggressive, tone it down"), + ReviewDecision(action=ACTION_POST, selected_finding_ids={"f1"}), + ] + orch, captured, feedbacks, calls = _make_orchestrator( + monkeypatch, findings=findings, hitl_on=True, decisions=decisions + ) + await orch.run() + assert calls["n"] == 2 # asked twice + # First phase run had no feedback; the re-run carried the reviewer's words. + assert feedbacks[0] == "" + assert "tone it down" in feedbacks[1] + assert captured["post"] is True + + +async def test_reject_posts_nothing(monkeypatch): + findings = [_finding("f1")] + decision = ReviewDecision(action=ACTION_REJECT, decision_raw="rejected") + orch, captured, feedbacks, calls = _make_orchestrator( + monkeypatch, findings=findings, hitl_on=True, decisions=[decision] + ) + await orch.run() + assert captured["post"] is False + + +async def test_rerun_past_cap_posts_nothing(monkeypatch): + findings = [_finding("f1")] + orch, captured, feedbacks, calls = _make_orchestrator( + monkeypatch, + findings=findings, + hitl_on=True, + decisions=[ReviewDecision(action=ACTION_RERUN, instructions="again")] * 5, + ) + # Default max_review_revisions = 2 → 3 prompts (iters 0,1,2), then no post. + orch.config.hitl.max_review_revisions = 2 + await orch.run() + assert captured["post"] is False + assert calls["n"] == 3