From 4921f9f650b31489932db097385c90b5e41b861e Mon Sep 17 00:00:00 2001 From: Dana Burks Date: Wed, 13 May 2026 00:12:07 -0700 Subject: [PATCH 1/2] [bench] tests: add comprehensive test suite and CI workflow - 9 new test files covering all previously untested modules: ledger (chain, verify), pipeline (constitution, challenger, defender, oracle, runner), hook entry point, and API client - GitHub Actions CI workflow running on Python 3.11-3.13 - Total suite: 264 tests, 14 files, all passing Co-Authored-By: Claude Opus 4.6 --- .github/workflows/tests.yml | 23 +++ tests/test_api_extended.py | 238 +++++++++++++++++++++++++++++++ tests/test_chain.py | 218 ++++++++++++++++++++++++++++ tests/test_challenger.py | 189 ++++++++++++++++++++++++ tests/test_constitution.py | 159 +++++++++++++++++++++ tests/test_defender.py | 224 +++++++++++++++++++++++++++++ tests/test_hook.py | 174 ++++++++++++++++++++++ tests/test_oracle.py | 273 +++++++++++++++++++++++++++++++++++ tests/test_runner.py | 277 ++++++++++++++++++++++++++++++++++++ tests/test_verify.py | 182 +++++++++++++++++++++++ 10 files changed, 1957 insertions(+) create mode 100644 .github/workflows/tests.yml create mode 100644 tests/test_api_extended.py create mode 100644 tests/test_chain.py create mode 100644 tests/test_challenger.py create mode 100644 tests/test_constitution.py create mode 100644 tests/test_defender.py create mode 100644 tests/test_hook.py create mode 100644 tests/test_oracle.py create mode 100644 tests/test_runner.py create mode 100644 tests/test_verify.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..e9a92b7 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,23 @@ +name: tests + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11", "3.12", "3.13"] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: pip install -r requirements.txt + - name: Run tests + run: python -m unittest discover -s tests -v diff --git a/tests/test_api_extended.py b/tests/test_api_extended.py new file mode 100644 index 0000000..acdf3f9 --- /dev/null +++ b/tests/test_api_extended.py @@ -0,0 +1,238 @@ +"""Extended tests for utils.api — call_model, strip_code_fences, _try_parse_dict. + +Complements test_api.py (which covers _sanitize_error_detail). All provider +calls are mocked — no network traffic. + +Run: python -m unittest tests.test_api_extended -v +""" + +import os +import sys +import unittest +from typing import Any +from unittest.mock import MagicMock, patch + +from pathlib import Path + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from utils.api import ( # noqa: E402 + _ProviderError, + _anthropic_call, + _try_parse_dict, + call_model, + strip_code_fences, +) + + +class StripCodeFencesTests(unittest.TestCase): + def test_removes_json_fence(self) -> None: + text: str = '```json\n{"a": 1}\n```' + self.assertEqual(strip_code_fences(text), '{"a": 1}') + + def test_removes_plain_fence(self) -> None: + text: str = '```\n{"a": 1}\n```' + self.assertEqual(strip_code_fences(text), '{"a": 1}') + + def test_case_insensitive_language_tag(self) -> None: + text: str = '```JSON\n{"a": 1}\n```' + self.assertEqual(strip_code_fences(text), '{"a": 1}') + + def test_no_fence_returns_unchanged(self) -> None: + text: str = '{"a": 1}' + self.assertEqual(strip_code_fences(text), '{"a": 1}') + + def test_short_string_returns_unchanged(self) -> None: + self.assertEqual(strip_code_fences("hi"), "hi") + + def test_strips_surrounding_whitespace(self) -> None: + text: str = ' \n```json\n{"a": 1}\n```\n ' + self.assertEqual(strip_code_fences(text), '{"a": 1}') + + +class TryParseDictTests(unittest.TestCase): + def test_valid_json_object_returns_dict(self) -> None: + result: Any = _try_parse_dict('{"a": 1}') + self.assertEqual(result, {"a": 1}) + + def test_json_array_returns_none(self) -> None: + self.assertIsNone(_try_parse_dict("[1, 2, 3]")) + + def test_json_string_returns_none(self) -> None: + self.assertIsNone(_try_parse_dict('"hello"')) + + def test_invalid_json_returns_none(self) -> None: + self.assertIsNone(_try_parse_dict("{{{malformed")) + + def test_strips_code_fences_before_parsing(self) -> None: + text: str = '```json\n{"ok": true}\n```' + result: Any = _try_parse_dict(text) + self.assertEqual(result, {"ok": True}) + + def test_json_integer_returns_none(self) -> None: + self.assertIsNone(_try_parse_dict("42")) + + +class CallModelProviderDispatchTests(unittest.TestCase): + @patch("utils.api._anthropic_call") + def test_default_provider_is_anthropic(self, mock_call: MagicMock) -> None: + mock_call.return_value = ('{"status":"ok"}', 10, 20) + env = os.environ.copy() + env.pop("BENCH_PROVIDER", None) + with patch.dict("os.environ", env, clear=True): + call_model("model", "sys", "user") + mock_call.assert_called_once() + + @patch("utils.api._anthropic_call") + def test_explicit_anthropic_provider(self, mock_call: MagicMock) -> None: + mock_call.return_value = ('{"status":"ok"}', 10, 20) + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + call_model("model", "sys", "user") + mock_call.assert_called_once() + + @patch("utils.api._openrouter_call") + def test_openrouter_provider(self, mock_call: MagicMock) -> None: + mock_call.return_value = ('{"status":"ok"}', 10, 20) + with patch.dict("os.environ", {"BENCH_PROVIDER": "openrouter"}): + call_model("model", "sys", "user") + mock_call.assert_called_once() + + def test_unknown_provider_returns_api_error(self) -> None: + with patch.dict("os.environ", {"BENCH_PROVIDER": "unknown"}): + result: dict = call_model("model", "sys", "user") + self.assertEqual(result["error"], "API_ERROR") + self.assertIn("unknown", result["detail"]) + + +class CallModelSuccessTests(unittest.TestCase): + @patch("utils.api._anthropic_call") + def test_successful_parse_returns_dict_with_tokens( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = ('{"status": "CLEAR"}', 10, 20) + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertEqual(result["status"], "CLEAR") + self.assertEqual(result["_tokens"], {"input": 10, "output": 20}) + + +class CallModelRetryTests(unittest.TestCase): + @patch("utils.api._anthropic_call") + def test_retry_on_parse_failure_succeeds( + self, mock_call: MagicMock + ) -> None: + mock_call.side_effect = [ + ("not json at all", 10, 20), + ('{"ok": true}', 15, 25), + ] + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertTrue(result["ok"]) + + @patch("utils.api._anthropic_call") + def test_tokens_accumulated_across_retry( + self, mock_call: MagicMock + ) -> None: + mock_call.side_effect = [ + ("not json", 10, 20), + ('{"ok": true}', 15, 25), + ] + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertEqual(result["_tokens"], {"input": 25, "output": 45}) + + @patch("utils.api._anthropic_call") + def test_both_parses_fail_returns_parse_failure( + self, mock_call: MagicMock + ) -> None: + mock_call.side_effect = [ + ("bad1", 10, 20), + ("bad2", 15, 25), + ] + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertEqual(result["error"], "PARSE_FAILURE") + self.assertEqual(result["raw_response"], "bad2") + self.assertEqual(result["_tokens"], {"input": 25, "output": 45}) + + +class CallModelApiErrorTests(unittest.TestCase): + @patch("utils.api._anthropic_call") + def test_provider_error_returns_api_error( + self, mock_call: MagicMock + ) -> None: + mock_call.side_effect = _ProviderError("connection failed") + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertEqual(result["error"], "API_ERROR") + + @patch("utils.api._anthropic_call") + def test_retry_provider_error_returns_api_error( + self, mock_call: MagicMock + ) -> None: + mock_call.side_effect = [ + ("not json", 10, 20), + _ProviderError("retry failed"), + ] + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertEqual(result["error"], "API_ERROR") + + @patch("utils.api._anthropic_call") + def test_error_detail_is_sanitized(self, mock_call: MagicMock) -> None: + mock_call.side_effect = _ProviderError( + "AuthenticationError: Invalid API key sk-ant-1234567890abcdef" + ) + with patch.dict("os.environ", {"BENCH_PROVIDER": "anthropic"}): + result: dict = call_model("model", "sys", "user") + self.assertNotIn("sk-ant-1234567890abcdef", result["detail"]) + self.assertIn("[REDACTED]", result["detail"]) + + +class AnthropicCallTests(unittest.TestCase): + @patch("utils.api.anthropic.Anthropic") + def test_successful_call_extracts_text_and_tokens( + self, mock_cls: MagicMock + ) -> None: + mock_response: MagicMock = MagicMock() + mock_response.content = [MagicMock(text='{"result": true}')] + mock_response.usage.input_tokens = 50 + mock_response.usage.output_tokens = 100 + mock_cls.return_value.messages.create.return_value = mock_response + + text, in_tok, out_tok = _anthropic_call( + "model", "system", [{"role": "user", "content": "hi"}], 4096 + ) + self.assertEqual(text, '{"result": true}') + self.assertEqual(in_tok, 50) + self.assertEqual(out_tok, 100) + + @patch("utils.api.anthropic.Anthropic") + def test_anthropic_error_raises_provider_error( + self, mock_cls: MagicMock + ) -> None: + import anthropic + + mock_cls.return_value.messages.create.side_effect = ( + anthropic.APIConnectionError(request=MagicMock()) + ) + with self.assertRaises(_ProviderError): + _anthropic_call( + "model", "system", [{"role": "user", "content": "hi"}], 4096 + ) + + @patch("utils.api.anthropic.Anthropic") + def test_type_error_raises_provider_error( + self, mock_cls: MagicMock + ) -> None: + mock_cls.side_effect = TypeError("bad config") + with self.assertRaises(_ProviderError): + _anthropic_call( + "model", "system", [{"role": "user", "content": "hi"}], 4096 + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_chain.py b/tests/test_chain.py new file mode 100644 index 0000000..4800d55 --- /dev/null +++ b/tests/test_chain.py @@ -0,0 +1,218 @@ +"""Tests for ledger.chain — hash computation, chain linking, append, truncation. + +Covers: compute_entry_hash determinism and field exclusion, load_ledger +error handling, _cap_stage_fields truncation, append_entry chain linking +and metadata sync, _atomic_write_json atomicity. + +Run: python -m unittest tests.test_chain -v +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from datetime import datetime +from pathlib import Path +from typing import Any + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from ledger.chain import ( # noqa: E402 + _atomic_write_json, + _cap_stage_fields, + append_entry, + compute_entry_hash, + load_ledger, +) + + +class ComputeEntryHashTests(unittest.TestCase): + def test_deterministic_for_identical_entries(self) -> None: + entry: dict = {"a": 1, "b": "hello"} + self.assertEqual(compute_entry_hash(entry), compute_entry_hash(entry)) + + def test_excludes_entry_hash_field(self) -> None: + base: dict = {"a": 1, "b": 2} + with_hash: dict = {"a": 1, "b": 2, "entry_hash": "should_be_ignored"} + self.assertEqual(compute_entry_hash(base), compute_entry_hash(with_hash)) + + def test_different_entries_produce_different_hashes(self) -> None: + e1: dict = {"a": 1} + e2: dict = {"a": 2} + self.assertNotEqual(compute_entry_hash(e1), compute_entry_hash(e2)) + + def test_hash_is_64_char_hex_string(self) -> None: + result: str = compute_entry_hash({"x": "y"}) + self.assertRegex(result, r"^[0-9a-f]{64}$") + + def test_sort_keys_ensures_key_order_independence(self) -> None: + e1: dict = {"a": 1, "b": 2} + e2: dict = {"b": 2, "a": 1} + self.assertEqual(compute_entry_hash(e1), compute_entry_hash(e2)) + + def test_handles_non_json_native_values(self) -> None: + entry: dict = {"ts": datetime(2026, 1, 1)} + result: str = compute_entry_hash(entry) + self.assertRegex(result, r"^[0-9a-f]{64}$") + + +class LoadLedgerTests(unittest.TestCase): + def setUp(self) -> None: + self._tmp: str = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self._tmp) + + def _path(self, name: str = "ledger.json") -> str: + return os.path.join(self._tmp, name) + + def test_missing_file_returns_empty_list(self) -> None: + self.assertEqual(load_ledger(self._path("nonexistent.json")), []) + + def test_valid_json_array_loaded(self) -> None: + p: str = self._path() + data: list = [{"entry_hash": "abc", "x": 1}] + Path(p).write_text(json.dumps(data), encoding="utf-8") + self.assertEqual(load_ledger(p), data) + + def test_corrupt_json_returns_empty_list(self) -> None: + p: str = self._path() + Path(p).write_text("{{{bad", encoding="utf-8") + self.assertEqual(load_ledger(p), []) + + def test_non_array_json_returns_empty_list(self) -> None: + p: str = self._path() + Path(p).write_text('{"key": "val"}', encoding="utf-8") + self.assertEqual(load_ledger(p), []) + + +class CapStageFieldsTests(unittest.TestCase): + def test_non_dict_passes_through(self) -> None: + self.assertEqual(_cap_stage_fields("hello"), "hello") + + def test_short_fields_unchanged(self) -> None: + stage: dict = {"status": "CLEAR", "summary": "ok"} + self.assertEqual(_cap_stage_fields(stage), stage) + + def test_long_string_field_truncated(self) -> None: + stage: dict = {"big": "x" * 15_000} + result: dict = _cap_stage_fields(stage) + self.assertTrue(result["big"].endswith("[TRUNCATED]")) + self.assertLessEqual(len(result["big"]), 10_000 + 20) + + def test_nested_list_items_truncated(self) -> None: + stage: dict = {"findings": [{"evidence": "y" * 15_000}]} + result: dict = _cap_stage_fields(stage) + self.assertTrue(result["findings"][0]["evidence"].endswith("[TRUNCATED]")) + + def test_total_serialized_over_50k_collapses(self) -> None: + stage: dict = {f"f{i}": "z" * 9_999 for i in range(6)} + stage["status"] = "FINDINGS" + stage["verdict"] = "PASS" + result: dict = _cap_stage_fields(stage) + self.assertTrue(result.get("_capped")) + self.assertEqual(result["status"], "FINDINGS") + + +class AppendEntryTests(unittest.TestCase): + def setUp(self) -> None: + self._tmp: str = tempfile.mkdtemp() + self._ledger: str = os.path.join(self._tmp, "ledger.json") + self._meta: str = os.path.join(self._tmp, "ledger-meta.json") + self.addCleanup(shutil.rmtree, self._tmp) + + def _minimal_result(self) -> dict: + return { + "verdict": "PASS", + "reason": "test", + "constitution_hash": "abc123", + "change": {"file": "test.py", "tool": "Write", "diff_summary": {}}, + "challenger": {"status": "CLEAR"}, + "defender": {"status": "CONFIRM_CLEAR"}, + "oracle": {"verdict": "PASS"}, + } + + def test_first_entry_uses_genesis_marker(self) -> None: + entry: dict = append_entry(self._minimal_result(), path=self._ledger) + self.assertEqual(entry["previous_hash"], "GENESIS") + + def test_second_entry_links_to_first(self) -> None: + first: dict = append_entry(self._minimal_result(), path=self._ledger) + second: dict = append_entry(self._minimal_result(), path=self._ledger) + self.assertEqual(second["previous_hash"], first["entry_hash"]) + + def test_entry_hash_is_valid(self) -> None: + entry: dict = append_entry(self._minimal_result(), path=self._ledger) + recomputed: str = compute_entry_hash(entry) + self.assertEqual(entry["entry_hash"], recomputed) + + def test_entry_has_uuid_entry_id(self) -> None: + entry: dict = append_entry(self._minimal_result(), path=self._ledger) + uuid_pattern: str = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" + self.assertRegex(entry["entry_id"], uuid_pattern) + + def test_entry_has_utc_iso_timestamp(self) -> None: + entry: dict = append_entry(self._minimal_result(), path=self._ledger) + ts: str = entry["timestamp"] + parsed: datetime = datetime.fromisoformat(ts) + self.assertIn("+00:00", ts) + self.assertIsNotNone(parsed) + + def test_missing_change_fields_fallback(self) -> None: + result: dict = {"verdict": "PASS"} + entry: dict = append_entry(result, path=self._ledger) + self.assertEqual(entry["change"]["file"], "unknown") + self.assertEqual(entry["change"]["tool"], "unknown") + + def test_ledger_file_created_on_first_append(self) -> None: + self.assertFalse(os.path.exists(self._ledger)) + append_entry(self._minimal_result(), path=self._ledger) + self.assertTrue(os.path.exists(self._ledger)) + + def test_meta_file_created_on_first_append(self) -> None: + append_entry(self._minimal_result(), path=self._ledger) + self.assertTrue(os.path.exists(self._meta)) + + def test_meta_entry_count_incremented(self) -> None: + append_entry(self._minimal_result(), path=self._ledger) + append_entry(self._minimal_result(), path=self._ledger) + meta: dict = json.loads(Path(self._meta).read_text(encoding="utf-8")) + self.assertEqual(meta["entry_count"], 2) + + def test_meta_latest_hash_matches(self) -> None: + entry: dict = append_entry(self._minimal_result(), path=self._ledger) + meta: dict = json.loads(Path(self._meta).read_text(encoding="utf-8")) + self.assertEqual(meta["latest_hash"], entry["entry_hash"]) + + def test_stages_are_cap_truncated(self) -> None: + result: dict = self._minimal_result() + result["challenger"] = {"status": "FINDINGS", "big": "a" * 15_000} + entry: dict = append_entry(result, path=self._ledger) + self.assertTrue( + entry["challenger"]["big"].endswith("[TRUNCATED]") + ) + + +class AtomicWriteJsonTests(unittest.TestCase): + def setUp(self) -> None: + self._tmp: str = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self._tmp) + + def test_writes_valid_json(self) -> None: + target: Path = Path(self._tmp) / "out.json" + _atomic_write_json(target, {"key": "value"}) + result: Any = json.loads(target.read_text(encoding="utf-8")) + self.assertEqual(result, {"key": "value"}) + + def test_replaces_existing_file(self) -> None: + target: Path = Path(self._tmp) / "out.json" + _atomic_write_json(target, {"v": 1}) + _atomic_write_json(target, {"v": 2}) + result: Any = json.loads(target.read_text(encoding="utf-8")) + self.assertEqual(result["v"], 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_challenger.py b/tests/test_challenger.py new file mode 100644 index 0000000..ed62dc6 --- /dev/null +++ b/tests/test_challenger.py @@ -0,0 +1,189 @@ +"""Tests for pipeline.challenger — response validation, content building, run_challenger. + +All model calls are mocked. Covers: _validate_challenger_response schema +checks, _build_user_content assembly, and run_challenger end-to-end flow +including error wrapping. + +Run: python -m unittest tests.test_challenger -v +""" + +import sys +import unittest +from typing import Any +from unittest.mock import MagicMock, patch + +from pathlib import Path + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from pipeline.challenger import ( # noqa: E402 + _build_user_content, + _validate_challenger_response, + run_challenger, +) + + +def _valid_finding() -> dict: + return { + "constraint_id": "C-001", + "severity": "VIOLATION", + "location": "test.py:10", + "evidence": "empty except block", + "reasoning": "violates C-001", + } + + +def _valid_diff() -> dict: + return {"file_path": "test.py", "change_type": "edit"} + + +def _valid_constitution() -> dict: + return { + "constraints": [ + {"id": "C-001", "name": "No Silent Errors", "rule": "...", "severity": "veto"} + ] + } + + +class ValidateChallengerResponseTests(unittest.TestCase): + def test_clear_status_is_valid(self) -> None: + self.assertTrue( + _validate_challenger_response({"status": "CLEAR", "findings": []}) + ) + + def test_clear_without_findings_key_is_valid(self) -> None: + self.assertTrue(_validate_challenger_response({"status": "CLEAR"})) + + def test_findings_with_all_fields_is_valid(self) -> None: + resp: dict = {"status": "FINDINGS", "findings": [_valid_finding()]} + self.assertTrue(_validate_challenger_response(resp)) + + def test_invalid_status_rejected(self) -> None: + self.assertFalse( + _validate_challenger_response({"status": "INVALID", "findings": []}) + ) + + def test_missing_status_rejected(self) -> None: + self.assertFalse(_validate_challenger_response({"findings": []})) + + def test_findings_not_list_rejected(self) -> None: + self.assertFalse( + _validate_challenger_response({"status": "FINDINGS", "findings": "string"}) + ) + + def test_finding_missing_required_field_rejected(self) -> None: + finding: dict = _valid_finding() + del finding["constraint_id"] + self.assertFalse( + _validate_challenger_response({"status": "FINDINGS", "findings": [finding]}) + ) + + def test_finding_empty_string_field_rejected(self) -> None: + finding: dict = _valid_finding() + finding["evidence"] = "" + self.assertFalse( + _validate_challenger_response({"status": "FINDINGS", "findings": [finding]}) + ) + + def test_finding_invalid_severity_rejected(self) -> None: + finding: dict = _valid_finding() + finding["severity"] = "CRITICAL" + self.assertFalse( + _validate_challenger_response({"status": "FINDINGS", "findings": [finding]}) + ) + + def test_finding_non_dict_rejected(self) -> None: + self.assertFalse( + _validate_challenger_response( + {"status": "FINDINGS", "findings": ["not a dict"]} + ) + ) + + +class BuildUserContentTests(unittest.TestCase): + def test_contains_diff_and_constitution_sections(self) -> None: + content: str = _build_user_content(_valid_diff(), _valid_constitution(), "") + self.assertIn("PROPOSED CHANGE:", content) + self.assertIn("CONSTITUTION:", content) + + def test_file_context_appended_when_present(self) -> None: + content: str = _build_user_content( + _valid_diff(), _valid_constitution(), "def foo(): pass" + ) + self.assertIn("FILE CONTEXT:", content) + self.assertIn("def foo(): pass", content) + + def test_file_context_omitted_when_empty(self) -> None: + content: str = _build_user_content(_valid_diff(), _valid_constitution(), "") + self.assertNotIn("FILE CONTEXT:", content) + + +class RunChallengerTests(unittest.TestCase): + @patch("pipeline.challenger.call_model") + def test_valid_clear_response_passed_through( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "status": "CLEAR", + "findings": [], + "_tokens": {"input": 10, "output": 20}, + } + result: dict = run_challenger(_valid_diff(), _valid_constitution(), "hash") + self.assertEqual(result["status"], "CLEAR") + + @patch("pipeline.challenger.call_model") + def test_valid_findings_response_passed_through( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "status": "FINDINGS", + "findings": [_valid_finding()], + "_tokens": {"input": 10, "output": 20}, + } + result: dict = run_challenger(_valid_diff(), _valid_constitution(), "hash") + self.assertEqual(result["status"], "FINDINGS") + self.assertEqual(len(result["findings"]), 1) + + @patch("pipeline.challenger.call_model") + def test_api_error_returns_pipeline_error( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "error": "API_ERROR", + "detail": "timeout", + "_tokens": {"input": 0, "output": 0}, + } + result: dict = run_challenger(_valid_diff(), _valid_constitution(), "hash") + self.assertEqual(result["status"], "PIPELINE_ERROR") + + @patch("pipeline.challenger.call_model") + def test_invalid_response_returns_pipeline_error_with_raw( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "garbage": True, + "_tokens": {"input": 10, "output": 20}, + } + result: dict = run_challenger(_valid_diff(), _valid_constitution(), "hash") + self.assertEqual(result["status"], "PIPELINE_ERROR") + self.assertIn("raw_response", result) + + def test_input_validation_failure_returns_pipeline_error(self) -> None: + result: dict = run_challenger({}, _valid_constitution(), "hash") + self.assertEqual(result["status"], "PIPELINE_ERROR") + self.assertIn("INVALID_CHALLENGER_INPUT", result["error"]) + + @patch("pipeline.challenger.call_model") + def test_tokens_preserved_on_all_paths(self, mock_call: MagicMock) -> None: + mock_call.return_value = { + "status": "CLEAR", + "_tokens": {"input": 5, "output": 15}, + } + result: dict = run_challenger(_valid_diff(), _valid_constitution(), "hash") + self.assertIn("_tokens", result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_constitution.py b/tests/test_constitution.py new file mode 100644 index 0000000..6e527b0 --- /dev/null +++ b/tests/test_constitution.py @@ -0,0 +1,159 @@ +"""Tests for pipeline.constitution — loading, hashing, schema validation. + +Covers: load_constitution_snapshot with valid/invalid files and all three +exception types, get_constraint_by_id lookup, hash determinism. + +Run: python -m unittest tests.test_constitution -v +""" + +import hashlib +import json +import os +import shutil +import sys +import tempfile +import unittest +from pathlib import Path + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from pipeline.constitution import ( # noqa: E402 + ConstitutionNotFoundError, + ConstitutionParseError, + ConstitutionSchemaError, + get_constraint_by_id, + load_constitution_snapshot, +) + + +def _valid_constitution() -> dict: + return { + "constitution": "bench-v1", + "version": 1, + "constraints": [ + { + "id": "C-001", + "name": "Test Constraint", + "rule": "No silent error swallowing", + "severity": "veto", + } + ], + } + + +class LoadConstitutionSnapshotTests(unittest.TestCase): + def setUp(self) -> None: + self._tmp: str = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self._tmp) + + def _write(self, content: str, name: str = "bench.json") -> str: + p: str = os.path.join(self._tmp, name) + Path(p).write_text(content, encoding="utf-8") + return p + + def test_valid_constitution_returns_data_and_hash(self) -> None: + raw: str = json.dumps(_valid_constitution()) + path: str = self._write(raw) + data, h = load_constitution_snapshot(path) + self.assertIsInstance(data, dict) + self.assertRegex(h, r"^[0-9a-f]{64}$") + self.assertEqual(data["constitution"], "bench-v1") + + def test_hash_matches_sha256_of_raw_bytes(self) -> None: + raw: str = json.dumps(_valid_constitution()) + path: str = self._write(raw) + _, h = load_constitution_snapshot(path) + expected: str = hashlib.sha256(raw.encode("utf-8")).hexdigest() + self.assertEqual(h, expected) + + def test_file_not_found_raises_not_found_error(self) -> None: + with self.assertRaises(ConstitutionNotFoundError): + load_constitution_snapshot(os.path.join(self._tmp, "nope.json")) + + def test_invalid_json_raises_parse_error(self) -> None: + path: str = self._write("{{{bad json") + with self.assertRaises(ConstitutionParseError): + load_constitution_snapshot(path) + + def test_non_dict_root_raises_schema_error(self) -> None: + path: str = self._write("[1, 2, 3]") + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_missing_top_level_field_raises_schema_error(self) -> None: + path: str = self._write(json.dumps({"constitution": "v1", "version": 1})) + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_constraints_not_list_raises_schema_error(self) -> None: + doc: dict = _valid_constitution() + doc["constraints"] = "not a list" + path: str = self._write(json.dumps(doc)) + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_constraint_not_dict_raises_schema_error(self) -> None: + doc: dict = _valid_constitution() + doc["constraints"] = ["not a dict"] + path: str = self._write(json.dumps(doc)) + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_constraint_missing_required_field_raises_schema_error(self) -> None: + doc: dict = _valid_constitution() + del doc["constraints"][0]["id"] + path: str = self._write(json.dumps(doc)) + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_constraint_empty_string_field_raises_schema_error(self) -> None: + doc: dict = _valid_constitution() + doc["constraints"][0]["id"] = "" + path: str = self._write(json.dumps(doc)) + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_constraint_non_string_field_raises_schema_error(self) -> None: + doc: dict = _valid_constitution() + doc["constraints"][0]["severity"] = 42 + path: str = self._write(json.dumps(doc)) + with self.assertRaises(ConstitutionSchemaError): + load_constitution_snapshot(path) + + def test_multiple_valid_constraints_accepted(self) -> None: + doc: dict = _valid_constitution() + doc["constraints"].append({ + "id": "C-002", + "name": "Second", + "rule": "Scope boundary", + "severity": "veto", + }) + path: str = self._write(json.dumps(doc)) + data, _ = load_constitution_snapshot(path) + self.assertEqual(len(data["constraints"]), 2) + + +class GetConstraintByIdTests(unittest.TestCase): + def test_finds_existing_constraint(self) -> None: + doc: dict = _valid_constitution() + result = get_constraint_by_id(doc, "C-001") + self.assertIsNotNone(result) + self.assertEqual(result["id"], "C-001") + + def test_returns_none_for_missing_id(self) -> None: + doc: dict = _valid_constitution() + self.assertIsNone(get_constraint_by_id(doc, "C-999")) + + def test_handles_missing_constraints_key(self) -> None: + self.assertIsNone(get_constraint_by_id({}, "C-001")) + + def test_handles_non_dict_entries_in_list(self) -> None: + doc: dict = {"constraints": ["not a dict", {"id": "C-001"}]} + result = get_constraint_by_id(doc, "C-001") + self.assertIsNotNone(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_defender.py b/tests/test_defender.py new file mode 100644 index 0000000..5f03650 --- /dev/null +++ b/tests/test_defender.py @@ -0,0 +1,224 @@ +"""Tests for pipeline.defender — response validation, content building, run_defender. + +All model calls are mocked. Covers: _validate_defender_response schema checks +including rebuttal field validation (bool finding_index, position enum), +_build_user_content assembly, and run_defender end-to-end flow. + +Run: python -m unittest tests.test_defender -v +""" + +import sys +import unittest +from typing import Any +from unittest.mock import MagicMock, patch + +from pathlib import Path + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from pipeline.defender import ( # noqa: E402 + _build_user_content, + _validate_defender_response, + run_defender, +) + + +def _valid_rebuttal() -> dict: + return { + "finding_index": 0, + "position": "REBUT", + "argument": "The error is logged on the next line", + "evidence": "see line 12", + } + + +def _valid_diff() -> dict: + return {"file_path": "test.py", "change_type": "edit"} + + +def _valid_constitution() -> dict: + return { + "constraints": [ + {"id": "C-001", "name": "No Silent Errors", "rule": "...", "severity": "veto"} + ] + } + + +def _valid_challenger() -> dict: + return {"status": "FINDINGS", "findings": []} + + +class ValidateDefenderResponseTests(unittest.TestCase): + def test_confirm_clear_with_summary_is_valid(self) -> None: + self.assertTrue( + _validate_defender_response( + {"status": "CONFIRM_CLEAR", "summary": "All clear."} + ) + ) + + def test_concede_all_with_summary_is_valid(self) -> None: + self.assertTrue( + _validate_defender_response( + {"status": "CONCEDE_ALL", "summary": "Conceded."} + ) + ) + + def test_rebuttal_with_valid_rebuttals_is_valid(self) -> None: + resp: dict = { + "status": "REBUTTAL", + "summary": "Rebutted one finding.", + "rebuttals": [_valid_rebuttal()], + } + self.assertTrue(_validate_defender_response(resp)) + + def test_invalid_status_rejected(self) -> None: + self.assertFalse( + _validate_defender_response({"status": "UNKNOWN", "summary": "x"}) + ) + + def test_missing_summary_rejected(self) -> None: + self.assertFalse( + _validate_defender_response({"status": "CONFIRM_CLEAR"}) + ) + + def test_empty_summary_rejected(self) -> None: + self.assertFalse( + _validate_defender_response({"status": "CONFIRM_CLEAR", "summary": ""}) + ) + + def test_rebuttal_without_rebuttals_list_rejected(self) -> None: + self.assertFalse( + _validate_defender_response({"status": "REBUTTAL", "summary": "x"}) + ) + + def test_rebuttal_non_dict_entry_rejected(self) -> None: + self.assertFalse( + _validate_defender_response( + {"status": "REBUTTAL", "summary": "x", "rebuttals": ["not a dict"]} + ) + ) + + def test_rebuttal_non_int_finding_index_rejected(self) -> None: + r: dict = _valid_rebuttal() + r["finding_index"] = "zero" + self.assertFalse( + _validate_defender_response( + {"status": "REBUTTAL", "summary": "x", "rebuttals": [r]} + ) + ) + + def test_rebuttal_bool_finding_index_rejected(self) -> None: + r: dict = _valid_rebuttal() + r["finding_index"] = True + self.assertFalse( + _validate_defender_response( + {"status": "REBUTTAL", "summary": "x", "rebuttals": [r]} + ) + ) + + def test_rebuttal_missing_argument_rejected(self) -> None: + r: dict = _valid_rebuttal() + del r["argument"] + self.assertFalse( + _validate_defender_response( + {"status": "REBUTTAL", "summary": "x", "rebuttals": [r]} + ) + ) + + def test_rebuttal_invalid_position_rejected(self) -> None: + r: dict = _valid_rebuttal() + r["position"] = "ARGUE" + self.assertFalse( + _validate_defender_response( + {"status": "REBUTTAL", "summary": "x", "rebuttals": [r]} + ) + ) + + +class BuildUserContentTests(unittest.TestCase): + def test_contains_all_sections(self) -> None: + content: str = _build_user_content( + _valid_diff(), _valid_constitution(), _valid_challenger(), "" + ) + self.assertIn("PROPOSED CHANGE:", content) + self.assertIn("CONSTITUTION:", content) + self.assertIn("CHALLENGER FINDINGS:", content) + + def test_file_context_appended_when_present(self) -> None: + content: str = _build_user_content( + _valid_diff(), _valid_constitution(), _valid_challenger(), "source code" + ) + self.assertIn("FILE CONTEXT:", content) + + def test_file_context_omitted_when_empty(self) -> None: + content: str = _build_user_content( + _valid_diff(), _valid_constitution(), _valid_challenger(), "" + ) + self.assertNotIn("FILE CONTEXT:", content) + + +class RunDefenderTests(unittest.TestCase): + @patch("pipeline.defender.call_model") + def test_valid_response_passed_through(self, mock_call: MagicMock) -> None: + mock_call.return_value = { + "status": "REBUTTAL", + "summary": "Rebutted.", + "rebuttals": [_valid_rebuttal()], + "_tokens": {"input": 10, "output": 20}, + } + result: dict = run_defender( + _valid_diff(), _valid_constitution(), "hash", _valid_challenger() + ) + self.assertEqual(result["status"], "REBUTTAL") + + @patch("pipeline.defender.call_model") + def test_api_error_returns_pipeline_error( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "error": "API_ERROR", + "_tokens": {"input": 0, "output": 0}, + } + result: dict = run_defender( + _valid_diff(), _valid_constitution(), "hash", _valid_challenger() + ) + self.assertEqual(result["status"], "PIPELINE_ERROR") + + @patch("pipeline.defender.call_model") + def test_invalid_response_returns_pipeline_error( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "garbage": True, + "_tokens": {"input": 10, "output": 20}, + } + result: dict = run_defender( + _valid_diff(), _valid_constitution(), "hash", _valid_challenger() + ) + self.assertEqual(result["status"], "PIPELINE_ERROR") + self.assertIn("raw_response", result) + + def test_input_validation_failure_returns_pipeline_error(self) -> None: + result: dict = run_defender( + _valid_diff(), _valid_constitution(), "hash", {} + ) + self.assertEqual(result["status"], "PIPELINE_ERROR") + self.assertIn("INVALID_DEFENDER_INPUT", result["error"]) + + @patch("pipeline.defender.call_model") + def test_tokens_preserved_on_all_paths(self, mock_call: MagicMock) -> None: + mock_call.return_value = { + "status": "CONFIRM_CLEAR", + "summary": "ok", + "_tokens": {"input": 5, "output": 15}, + } + result: dict = run_defender( + _valid_diff(), _valid_constitution(), "hash", _valid_challenger() + ) + self.assertIn("_tokens", result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_hook.py b/tests/test_hook.py new file mode 100644 index 0000000..f195a4a --- /dev/null +++ b/tests/test_hook.py @@ -0,0 +1,174 @@ +"""Tests for hooks/pre-tool-use.py — response builders, verdict translation, main flow. + +The hook module uses a hyphen in its filename, so it is imported via +importlib (same pattern as test_input_validation.py). Pipeline execution +is mocked to prevent real API calls. + +Run: python -m unittest tests.test_hook -v +""" + +import importlib.util +import io +import json +import sys +import unittest +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +_HOOK_PATH: Path = _REPO_ROOT / "hooks" / "pre-tool-use.py" +_spec = importlib.util.spec_from_file_location("pre_tool_use", str(_HOOK_PATH)) +_hook_module = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_hook_module) + +build_allow_response = _hook_module.build_allow_response +build_deny_response = _hook_module.build_deny_response +build_response_from_verdict = _hook_module.build_response_from_verdict +main = _hook_module.main + + +class BuildAllowResponseTests(unittest.TestCase): + def test_structure_matches_schema(self) -> None: + resp: dict = build_allow_response("test message") + hook_out: dict = resp["hookSpecificOutput"] + self.assertEqual(hook_out["hookEventName"], "PreToolUse") + self.assertEqual(hook_out["permissionDecision"], "allow") + + def test_message_in_additional_context(self) -> None: + resp: dict = build_allow_response("governance passed") + self.assertEqual( + resp["hookSpecificOutput"]["additionalContext"], "governance passed" + ) + + +class BuildDenyResponseTests(unittest.TestCase): + def test_structure_matches_schema(self) -> None: + resp: dict = build_deny_response("VETO C-001", "fix the error") + hook_out: dict = resp["hookSpecificOutput"] + self.assertEqual(hook_out["permissionDecision"], "deny") + + def test_reason_and_remediation_placed_correctly(self) -> None: + resp: dict = build_deny_response("VETO C-001", "fix the error") + hook_out: dict = resp["hookSpecificOutput"] + self.assertEqual(hook_out["permissionDecisionReason"], "VETO C-001") + self.assertEqual(hook_out["additionalContext"], "fix the error") + + +class BuildResponseFromVerdictTests(unittest.TestCase): + def test_pass_verdict_returns_allow(self) -> None: + resp: dict = build_response_from_verdict({"verdict": "PASS"}) + self.assertEqual( + resp["hookSpecificOutput"]["permissionDecision"], "allow" + ) + + def test_veto_verdict_returns_deny(self) -> None: + resp: dict = build_response_from_verdict({ + "verdict": "VETO", + "reason": "C-001 violated", + "remediation": "Add error handling", + }) + self.assertEqual( + resp["hookSpecificOutput"]["permissionDecision"], "deny" + ) + + def test_veto_uses_default_reason_when_missing(self) -> None: + resp: dict = build_response_from_verdict({"verdict": "VETO"}) + reason: str = resp["hookSpecificOutput"]["permissionDecisionReason"] + self.assertTrue(len(reason) > 0) + + def test_veto_uses_default_remediation_when_missing(self) -> None: + resp: dict = build_response_from_verdict({"verdict": "VETO"}) + ctx: str = resp["hookSpecificOutput"]["additionalContext"] + self.assertTrue(len(ctx) > 0) + + def test_missing_verdict_treated_as_pass(self) -> None: + resp: dict = build_response_from_verdict({}) + self.assertEqual( + resp["hookSpecificOutput"]["permissionDecision"], "allow" + ) + + +class MainFlowTests(unittest.TestCase): + def _run_main_with_stdin(self, stdin_content: str) -> tuple[int, str]: + """Run main() with mocked stdin/stdout, return (exit_code, stdout_text).""" + mock_stdin: io.StringIO = io.StringIO(stdin_content) + mock_stdout: io.StringIO = io.StringIO() + with patch.object(sys, "stdin", mock_stdin), \ + patch.object(sys, "stdout", mock_stdout): + exit_code: int = main() + return exit_code, mock_stdout.getvalue() + + @patch.object(_hook_module, "run_governance_pipeline") + def test_governed_tool_invokes_pipeline( + self, mock_pipeline: MagicMock + ) -> None: + mock_pipeline.return_value = {"verdict": "PASS"} + payload: str = json.dumps({ + "tool_name": "Write", + "tool_input": {"file_path": "test.py", "content": "hello"}, + }) + code, output = self._run_main_with_stdin(payload) + self.assertEqual(code, 0) + mock_pipeline.assert_called_once() + + def test_pipeline_import_failure_fails_open(self) -> None: + original = _hook_module.run_governance_pipeline + try: + _hook_module.run_governance_pipeline = None + payload: str = json.dumps({ + "tool_name": "Write", + "tool_input": {"file_path": "test.py", "content": "hello"}, + }) + code, output = self._run_main_with_stdin(payload) + self.assertEqual(code, 0) + resp: dict = json.loads(output) + self.assertEqual( + resp["hookSpecificOutput"]["permissionDecision"], "allow" + ) + finally: + _hook_module.run_governance_pipeline = original + + def test_invalid_json_stdin_fails_open(self) -> None: + code, output = self._run_main_with_stdin("{{{bad json") + self.assertEqual(code, 0) + resp: dict = json.loads(output) + self.assertEqual( + resp["hookSpecificOutput"]["permissionDecision"], "allow" + ) + + def test_non_dict_payload_fails_open(self) -> None: + code, output = self._run_main_with_stdin("[1, 2, 3]") + self.assertEqual(code, 0) + resp: dict = json.loads(output) + self.assertEqual( + resp["hookSpecificOutput"]["permissionDecision"], "allow" + ) + + @patch.object(_hook_module, "run_governance_pipeline") + def test_always_returns_zero(self, mock_pipeline: MagicMock) -> None: + mock_pipeline.return_value = {"verdict": "VETO", "reason": "x", "remediation": "y"} + payload: str = json.dumps({ + "tool_name": "Write", + "tool_input": {"file_path": "test.py", "content": "hello"}, + }) + code, _ = self._run_main_with_stdin(payload) + self.assertEqual(code, 0) + + @patch.object(_hook_module, "run_governance_pipeline") + def test_stdout_is_valid_json(self, mock_pipeline: MagicMock) -> None: + mock_pipeline.return_value = {"verdict": "PASS"} + payload: str = json.dumps({ + "tool_name": "Write", + "tool_input": {"file_path": "test.py", "content": "hello"}, + }) + _, output = self._run_main_with_stdin(payload) + parsed: dict = json.loads(output) + self.assertIn("hookSpecificOutput", parsed) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_oracle.py b/tests/test_oracle.py new file mode 100644 index 0000000..29de137 --- /dev/null +++ b/tests/test_oracle.py @@ -0,0 +1,273 @@ +"""Tests for pipeline.oracle — response validation, content building, run_oracle. + +All model calls are mocked. Covers: _validate_oracle_response including the +critical VETO-requires-remediation and PASS-requires-null-remediation +invariants, citation/advisory schema, confidence enum. + +Run: python -m unittest tests.test_oracle -v +""" + +import sys +import unittest +from typing import Any +from unittest.mock import MagicMock, patch + +from pathlib import Path + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from pipeline.oracle import ( # noqa: E402 + _build_user_content, + _validate_oracle_response, + run_oracle, +) + + +def _valid_pass() -> dict: + return { + "verdict": "PASS", + "reasoning": "Change satisfies all constraints.", + "confidence": "HIGH", + "constraint_citations": [ + { + "constraint_id": "C-001", + "disposition": "SATISFIED", + "note": "Error handling present.", + } + ], + "advisories": [], + "remediation": None, + } + + +def _valid_veto() -> dict: + return { + "verdict": "VETO", + "reasoning": "Silent error swallowing detected.", + "confidence": "HIGH", + "constraint_citations": [ + { + "constraint_id": "C-001", + "disposition": "VIOLATED", + "note": "Empty except block.", + } + ], + "advisories": [], + "remediation": "Add logging or re-raise in the except block.", + } + + +def _valid_diff() -> dict: + return {"file_path": "test.py", "change_type": "edit"} + + +def _valid_constitution() -> dict: + return { + "constraints": [ + {"id": "C-001", "name": "No Silent Errors", "rule": "...", "severity": "veto"} + ] + } + + +def _valid_challenger() -> dict: + return {"status": "FINDINGS"} + + +def _valid_defender() -> dict: + return {"status": "REBUTTAL"} + + +class ValidateOracleResponseTests(unittest.TestCase): + def test_valid_pass_response(self) -> None: + self.assertTrue(_validate_oracle_response(_valid_pass())) + + def test_valid_veto_response(self) -> None: + self.assertTrue(_validate_oracle_response(_valid_veto())) + + def test_invalid_verdict_rejected(self) -> None: + resp: dict = _valid_pass() + resp["verdict"] = "ALLOW" + self.assertFalse(_validate_oracle_response(resp)) + + def test_missing_verdict_rejected(self) -> None: + resp: dict = _valid_pass() + del resp["verdict"] + self.assertFalse(_validate_oracle_response(resp)) + + def test_missing_reasoning_rejected(self) -> None: + resp: dict = _valid_pass() + del resp["reasoning"] + self.assertFalse(_validate_oracle_response(resp)) + + def test_empty_reasoning_rejected(self) -> None: + resp: dict = _valid_pass() + resp["reasoning"] = "" + self.assertFalse(_validate_oracle_response(resp)) + + def test_invalid_confidence_rejected(self) -> None: + resp: dict = _valid_pass() + resp["confidence"] = "VERY_HIGH" + self.assertFalse(_validate_oracle_response(resp)) + + def test_citations_not_list_rejected(self) -> None: + resp: dict = _valid_pass() + resp["constraint_citations"] = "string" + self.assertFalse(_validate_oracle_response(resp)) + + def test_citation_missing_field_rejected(self) -> None: + resp: dict = _valid_pass() + del resp["constraint_citations"][0]["constraint_id"] + self.assertFalse(_validate_oracle_response(resp)) + + def test_citation_invalid_disposition_rejected(self) -> None: + resp: dict = _valid_pass() + resp["constraint_citations"][0]["disposition"] = "MAYBE" + self.assertFalse(_validate_oracle_response(resp)) + + def test_advisories_not_list_rejected(self) -> None: + resp: dict = _valid_pass() + resp["advisories"] = "string" + self.assertFalse(_validate_oracle_response(resp)) + + def test_advisory_empty_string_rejected(self) -> None: + resp: dict = _valid_pass() + resp["advisories"] = [""] + self.assertFalse(_validate_oracle_response(resp)) + + def test_veto_without_remediation_rejected(self) -> None: + resp: dict = _valid_veto() + resp["remediation"] = None + self.assertFalse(_validate_oracle_response(resp)) + + def test_veto_with_empty_remediation_rejected(self) -> None: + resp: dict = _valid_veto() + resp["remediation"] = "" + self.assertFalse(_validate_oracle_response(resp)) + + def test_pass_with_non_null_remediation_rejected(self) -> None: + resp: dict = _valid_pass() + resp["remediation"] = "some text" + self.assertFalse(_validate_oracle_response(resp)) + + def test_missing_remediation_key_rejected(self) -> None: + resp: dict = _valid_pass() + del resp["remediation"] + self.assertFalse(_validate_oracle_response(resp)) + + +class BuildUserContentTests(unittest.TestCase): + def test_contains_all_sections(self) -> None: + content: str = _build_user_content( + _valid_diff(), + _valid_constitution(), + _valid_challenger(), + _valid_defender(), + "", + ) + self.assertIn("PROPOSED CHANGE:", content) + self.assertIn("CONSTITUTION:", content) + self.assertIn("CHALLENGER FINDINGS:", content) + self.assertIn("DEFENDER REBUTTALS:", content) + + def test_file_context_appended_when_present(self) -> None: + content: str = _build_user_content( + _valid_diff(), + _valid_constitution(), + _valid_challenger(), + _valid_defender(), + "source code here", + ) + self.assertIn("FILE CONTEXT:", content) + + def test_file_context_omitted_when_empty(self) -> None: + content: str = _build_user_content( + _valid_diff(), + _valid_constitution(), + _valid_challenger(), + _valid_defender(), + "", + ) + self.assertNotIn("FILE CONTEXT:", content) + + +class RunOracleTests(unittest.TestCase): + @patch("pipeline.oracle.call_model") + def test_valid_pass_response_passed_through( + self, mock_call: MagicMock + ) -> None: + resp: dict = _valid_pass() + resp["_tokens"] = {"input": 10, "output": 20} + mock_call.return_value = resp + result: dict = run_oracle( + _valid_diff(), _valid_constitution(), "hash", + _valid_challenger(), _valid_defender(), + ) + self.assertEqual(result["verdict"], "PASS") + + @patch("pipeline.oracle.call_model") + def test_valid_veto_response_passed_through( + self, mock_call: MagicMock + ) -> None: + resp: dict = _valid_veto() + resp["_tokens"] = {"input": 10, "output": 20} + mock_call.return_value = resp + result: dict = run_oracle( + _valid_diff(), _valid_constitution(), "hash", + _valid_challenger(), _valid_defender(), + ) + self.assertEqual(result["verdict"], "VETO") + self.assertIsNotNone(result["remediation"]) + + @patch("pipeline.oracle.call_model") + def test_api_error_returns_pipeline_error( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "error": "API_ERROR", + "_tokens": {"input": 0, "output": 0}, + } + result: dict = run_oracle( + _valid_diff(), _valid_constitution(), "hash", + _valid_challenger(), _valid_defender(), + ) + self.assertEqual(result["status"], "PIPELINE_ERROR") + + @patch("pipeline.oracle.call_model") + def test_invalid_response_returns_pipeline_error( + self, mock_call: MagicMock + ) -> None: + mock_call.return_value = { + "garbage": True, + "_tokens": {"input": 10, "output": 20}, + } + result: dict = run_oracle( + _valid_diff(), _valid_constitution(), "hash", + _valid_challenger(), _valid_defender(), + ) + self.assertEqual(result["status"], "PIPELINE_ERROR") + self.assertIn("raw_response", result) + + def test_input_validation_failure_returns_pipeline_error(self) -> None: + result: dict = run_oracle( + _valid_diff(), _valid_constitution(), "hash", + _valid_challenger(), {}, + ) + self.assertEqual(result["status"], "PIPELINE_ERROR") + self.assertIn("INVALID_ORACLE_INPUT", result["error"]) + + @patch("pipeline.oracle.call_model") + def test_tokens_preserved_on_all_paths(self, mock_call: MagicMock) -> None: + resp: dict = _valid_pass() + resp["_tokens"] = {"input": 5, "output": 15} + mock_call.return_value = resp + result: dict = run_oracle( + _valid_diff(), _valid_constitution(), "hash", + _valid_challenger(), _valid_defender(), + ) + self.assertIn("_tokens", result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..bae2808 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,277 @@ +"""Tests for pipeline.runner — orchestration, fail-open, CLEAR optimization, tokens. + +All pipeline stages, constitution loading, and ledger append are mocked. +Covers: happy paths (PASS/VETO), fail-open on every error source, +CLEAR-skips-defender optimization, token accumulation, and finalize behavior. + +Run: python -m unittest tests.test_runner -v +""" + +import sys +import unittest +from typing import Any +from unittest.mock import MagicMock, call, patch + +from pathlib import Path + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from pipeline.constitution import ConstitutionError # noqa: E402 +from pipeline.runner import run_governance_pipeline # noqa: E402 + + +_MOCK_CONSTITUTION: tuple[dict, str] = ( + {"constraints": [{"id": "C-001", "name": "Test", "rule": "...", "severity": "veto"}]}, + "abc123hash", +) + +_DIFF: dict = {"file_path": "test.py", "change_type": "edit"} +_TOOL_INPUT: dict = {"file_path": "test.py"} + + +def _clear_challenger() -> dict: + return {"status": "CLEAR", "findings": [], "_tokens": {"input": 10, "output": 20}} + + +def _findings_challenger() -> dict: + return {"status": "FINDINGS", "findings": [{"constraint_id": "C-001"}], "_tokens": {"input": 10, "output": 20}} + + +def _rebuttal_defender() -> dict: + return {"status": "REBUTTAL", "summary": "Rebutted.", "rebuttals": [], "_tokens": {"input": 30, "output": 40}} + + +def _pass_oracle() -> dict: + return { + "verdict": "PASS", + "reasoning": "All good.", + "remediation": None, + "status": "ok", + "_tokens": {"input": 50, "output": 60}, + } + + +def _veto_oracle() -> dict: + return { + "verdict": "VETO", + "reasoning": "Violation found.", + "remediation": "Fix the error handling.", + "status": "ok", + "_tokens": {"input": 50, "output": 60}, + } + + +def _pipeline_error_stage() -> dict: + return {"status": "PIPELINE_ERROR", "error": "something broke", "_tokens": {"input": 0, "output": 0}} + + +@patch("pipeline.runner.append_entry", return_value={}) +@patch("pipeline.runner.run_oracle") +@patch("pipeline.runner.run_defender") +@patch("pipeline.runner.run_challenger") +@patch("pipeline.runner.load_constitution_snapshot") +class HappyPathTests(unittest.TestCase): + def test_pass_with_clear_challenger( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _clear_challenger() + mock_oracle.return_value = _pass_oracle() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + mock_def.assert_not_called() + + def test_pass_with_findings_challenger( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _findings_challenger() + mock_def.return_value = _rebuttal_defender() + mock_oracle.return_value = _pass_oracle() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + mock_def.assert_called_once() + + def test_veto_verdict_propagated( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _findings_challenger() + mock_def.return_value = _rebuttal_defender() + mock_oracle.return_value = _veto_oracle() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "VETO") + self.assertIsNotNone(result["remediation"]) + + +@patch("pipeline.runner.append_entry", return_value={}) +@patch("pipeline.runner.run_oracle") +@patch("pipeline.runner.run_defender") +@patch("pipeline.runner.run_challenger") +@patch("pipeline.runner.load_constitution_snapshot") +class FailOpenTests(unittest.TestCase): + def test_constitution_load_failure_fails_open( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.side_effect = ConstitutionError("file missing") + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + self.assertTrue(result.get("pipeline_error")) + + def test_challenger_pipeline_error_fails_open( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _pipeline_error_stage() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + self.assertTrue(result.get("pipeline_error")) + + def test_defender_pipeline_error_fails_open( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _findings_challenger() + mock_def.return_value = _pipeline_error_stage() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + self.assertTrue(result.get("pipeline_error")) + + def test_oracle_pipeline_error_fails_open( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _findings_challenger() + mock_def.return_value = _rebuttal_defender() + mock_oracle.return_value = _pipeline_error_stage() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + self.assertTrue(result.get("pipeline_error")) + + def test_ledger_failure_does_not_block_verdict( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _clear_challenger() + mock_oracle.return_value = _pass_oracle() + mock_ledger.side_effect = Exception("disk full") + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["verdict"], "PASS") + + +@patch("pipeline.runner.append_entry", return_value={}) +@patch("pipeline.runner.run_oracle") +@patch("pipeline.runner.run_defender") +@patch("pipeline.runner.run_challenger") +@patch("pipeline.runner.load_constitution_snapshot") +class ClearOptimizationTests(unittest.TestCase): + def test_clear_challenger_skips_defender_call( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _clear_challenger() + mock_oracle.return_value = _pass_oracle() + run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + mock_def.assert_not_called() + + def test_synthetic_defender_result_has_confirm_clear( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _clear_challenger() + mock_oracle.return_value = _pass_oracle() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["defender"]["status"], "CONFIRM_CLEAR") + + +@patch("pipeline.runner.append_entry", return_value={}) +@patch("pipeline.runner.run_oracle") +@patch("pipeline.runner.run_defender") +@patch("pipeline.runner.run_challenger") +@patch("pipeline.runner.load_constitution_snapshot") +class TokenAccumulationTests(unittest.TestCase): + def test_tokens_accumulated_across_all_stages( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _findings_challenger() + mock_def.return_value = _rebuttal_defender() + mock_oracle.return_value = _pass_oracle() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["_tokens"]["input"], 90) + self.assertEqual(result["_tokens"]["output"], 120) + + def test_malformed_tokens_treated_as_zero( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + chall: dict = _clear_challenger() + chall["_tokens"] = "bad" + mock_chall.return_value = chall + oracle: dict = _pass_oracle() + oracle["_tokens"] = {"input": 10, "output": 20} + mock_oracle.return_value = oracle + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["_tokens"]["input"], 10) + + def test_bool_tokens_ignored( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + chall: dict = _clear_challenger() + chall["_tokens"] = {"input": True, "output": False} + mock_chall.return_value = chall + oracle: dict = _pass_oracle() + oracle["_tokens"] = {"input": 10, "output": 20} + mock_oracle.return_value = oracle + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertEqual(result["_tokens"]["input"], 10) + + +@patch("pipeline.runner.append_entry", return_value={}) +@patch("pipeline.runner.run_oracle") +@patch("pipeline.runner.run_defender") +@patch("pipeline.runner.run_challenger") +@patch("pipeline.runner.load_constitution_snapshot") +class FinalizeTests(unittest.TestCase): + def test_change_context_attached_to_result( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _clear_challenger() + mock_oracle.return_value = _pass_oracle() + result: dict = run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + self.assertIn("change", result) + self.assertEqual(result["change"]["tool"], "Write") + + def test_append_entry_called_with_result( + self, mock_const: MagicMock, mock_chall: MagicMock, + mock_def: MagicMock, mock_oracle: MagicMock, mock_ledger: MagicMock, + ) -> None: + mock_const.return_value = _MOCK_CONSTITUTION + mock_chall.return_value = _clear_challenger() + mock_oracle.return_value = _pass_oracle() + run_governance_pipeline("Write", _TOOL_INPUT, _DIFF) + mock_ledger.assert_called_once() + appended: dict = mock_ledger.call_args[0][0] + self.assertIn("verdict", appended) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_verify.py b/tests/test_verify.py new file mode 100644 index 0000000..11c365d --- /dev/null +++ b/tests/test_verify.py @@ -0,0 +1,182 @@ +"""Tests for ledger.verify — chain validation and tamper detection. + +Covers: verify_chain across all 6 failure types (READ_ERROR, PARSE_ERROR, +SCHEMA_ERROR, HASH_MISMATCH, INVALID_GENESIS, CHAIN_BREAK), plus valid +chains of varying lengths. + +Run: python -m unittest tests.test_verify -v +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from pathlib import Path +from typing import Any + +_REPO_ROOT: Path = Path(__file__).resolve().parent.parent +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from ledger.chain import compute_entry_hash # noqa: E402 +from ledger.verify import verify_chain # noqa: E402 + + +def _build_valid_chain(n: int) -> list[dict]: + """Build a correctly-linked chain of n entries starting from GENESIS.""" + entries: list[dict] = [] + for i in range(n): + entry: dict[str, Any] = { + "entry_id": f"id-{i}", + "timestamp": f"2026-01-01T00:00:{i:02d}+00:00", + "previous_hash": "GENESIS" if i == 0 else entries[i - 1]["entry_hash"], + "constitution_hash": "abc", + "change": {"file": "test.py", "tool": "Write"}, + } + entry["entry_hash"] = compute_entry_hash(entry) + entries.append(entry) + return entries + + +class VerifyChainValidTests(unittest.TestCase): + def setUp(self) -> None: + self._tmp: str = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self._tmp) + + def _path(self) -> str: + return os.path.join(self._tmp, "ledger.json") + + def _write(self, content: str) -> None: + Path(self._path()).write_text(content, encoding="utf-8") + + def test_missing_file_is_valid(self) -> None: + result: dict = verify_chain(os.path.join(self._tmp, "no.json")) + self.assertTrue(result["valid"]) + self.assertEqual(result["entries"], 0) + + def test_empty_file_is_valid(self) -> None: + self._write("") + result: dict = verify_chain(self._path()) + self.assertTrue(result["valid"]) + self.assertEqual(result["entries"], 0) + + def test_whitespace_only_file_is_valid(self) -> None: + self._write(" \n ") + result: dict = verify_chain(self._path()) + self.assertTrue(result["valid"]) + self.assertEqual(result["entries"], 0) + + def test_empty_array_is_valid(self) -> None: + self._write("[]") + result: dict = verify_chain(self._path()) + self.assertTrue(result["valid"]) + self.assertEqual(result["entries"], 0) + + def test_single_valid_entry(self) -> None: + chain: list[dict] = _build_valid_chain(1) + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertTrue(result["valid"]) + self.assertEqual(result["entries"], 1) + + def test_multi_entry_valid_chain(self) -> None: + chain: list[dict] = _build_valid_chain(5) + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertTrue(result["valid"]) + self.assertEqual(result["entries"], 5) + self.assertEqual(result["genesis_hash"], chain[0]["entry_hash"]) + self.assertEqual(result["latest_hash"], chain[-1]["entry_hash"]) + + def test_timestamps_in_result(self) -> None: + chain: list[dict] = _build_valid_chain(3) + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertEqual(result["first_entry"], chain[0]["timestamp"]) + self.assertEqual(result["last_entry"], chain[-1]["timestamp"]) + + +class VerifyChainFailureTests(unittest.TestCase): + def setUp(self) -> None: + self._tmp: str = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self._tmp) + + def _path(self) -> str: + return os.path.join(self._tmp, "ledger.json") + + def _write(self, content: str) -> None: + Path(self._path()).write_text(content, encoding="utf-8") + + def test_invalid_json_returns_parse_error(self) -> None: + self._write("{{{bad json") + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "PARSE_ERROR") + + def test_non_array_root_returns_parse_error(self) -> None: + self._write('{"key": "val"}') + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "PARSE_ERROR") + + def test_non_dict_entry_returns_schema_error(self) -> None: + self._write('["not a dict"]') + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "SCHEMA_ERROR") + self.assertEqual(result["failure_index"], 0) + + def test_missing_entry_hash_returns_schema_error(self) -> None: + self._write('[{"previous_hash": "GENESIS"}]') + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "SCHEMA_ERROR") + + def test_tampered_entry_returns_hash_mismatch(self) -> None: + chain: list[dict] = _build_valid_chain(3) + chain[1]["change"]["file"] = "TAMPERED.py" + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "HASH_MISMATCH") + self.assertEqual(result["failure_index"], 1) + + def test_wrong_genesis_marker_returns_invalid_genesis(self) -> None: + chain: list[dict] = _build_valid_chain(1) + chain[0]["previous_hash"] = "NOT_GENESIS" + chain[0]["entry_hash"] = compute_entry_hash(chain[0]) + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "INVALID_GENESIS") + + def test_broken_link_returns_chain_break(self) -> None: + chain: list[dict] = _build_valid_chain(3) + chain[2]["previous_hash"] = "wrong_hash" + chain[2]["entry_hash"] = compute_entry_hash(chain[2]) + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertFalse(result["valid"]) + self.assertEqual(result["failure_type"], "CHAIN_BREAK") + self.assertEqual(result["failure_index"], 2) + + def test_failure_includes_expected_and_found(self) -> None: + self._write("{{{") + result: dict = verify_chain(self._path()) + self.assertIn("expected", result) + self.assertIn("found", result) + + def test_stops_at_first_failure(self) -> None: + chain: list[dict] = _build_valid_chain(5) + chain[1]["change"]["file"] = "TAMPERED" + chain[3]["change"]["file"] = "ALSO_TAMPERED" + self._write(json.dumps(chain)) + result: dict = verify_chain(self._path()) + self.assertEqual(result["failure_index"], 1) + self.assertEqual(result["entries_checked"], 1) + + +if __name__ == "__main__": + unittest.main() From a6cc327cca3bb5373be4d0a1365e6074e3804219 Mon Sep 17 00:00:00 2001 From: Dana Burks Date: Wed, 13 May 2026 00:16:45 -0700 Subject: [PATCH 2/2] [bench] tests: skip Windows-only path test on Linux CI os.path.isabs() does not recognize Windows absolute paths on Linux, causing test_windows_absolute_path_blocked to fail in CI. Co-Authored-By: Claude Opus 4.6 --- tests/test_diff.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_diff.py b/tests/test_diff.py index 4a9d02f..749ae83 100644 --- a/tests/test_diff.py +++ b/tests/test_diff.py @@ -9,6 +9,7 @@ Run: python -m unittest tests.test_diff -v """ +import os import sys import unittest from pathlib import Path @@ -220,6 +221,7 @@ def test_absolute_path_blocked(self) -> None: result = _normalize_path("/etc/passwd") self.assertEqual(result, "[PATH_TRAVERSAL_BLOCKED]") + @unittest.skipUnless(os.name == "nt", "Windows absolute paths only detected on Windows") def test_windows_absolute_path_blocked(self) -> None: result = _normalize_path("C:\\Windows\\System32\\config") self.assertEqual(result, "[PATH_TRAVERSAL_BLOCKED]")