diff --git a/CHANGELOG.md b/CHANGELOG.md index d352ed735..653c7a4e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- **Extract pipeline cleanup pass** (Issue #1410): hardens the extraction-related plumbing introduced by #1381 / #1380 / #1399 against three medium-impact failure modes and four documentation/observability gaps surfaced during code review. + - **`opencontractserver/tasks/embeddings_task.py:_batch_embed_text_annotations`** — Transient HTTP errors (`requests.Timeout`, `requests.ConnectionError`, `EmbeddingServerError`) now drop queued sub-batches and shut down the `ThreadPoolExecutor` with `wait=False, cancel_futures=True` before re-raising. Previously the exception propagated out of the `with` block, which calls `shutdown(wait=True, cancel_futures=False)` by default and blocked Celery autoretry by up to ~`max_workers` × the per-sub-batch round-trip latency. In-flight HTTP calls cannot be torn down from Python, but at least we no longer wait on them. + - **`opencontractserver/benchmarks/loader.py:force_celery_eager`** — Refuses to mutate the global Celery config unless `settings.MODE == "TEST"` *or* the new `OC_BENCHMARK_CLI` env var is set; outside test mode also refuses when `task_always_eager` is already `True` (concurrent benchmark runs would race on the global save/restore). In test mode an already-`True` flag is the ambient state imposed by `CELERY_TASK_ALWAYS_EAGER=True`, so the helper is a no-op there. The `run_benchmark` management command now sets `OC_BENCHMARK_CLI=1` automatically. Prevents the benchmark helper from silently routing every task in a live web/worker process through the in-process executor. + - **`opencontractserver/llms/agents/pydantic_ai_agents.py`** — `PydanticAICorpusAgent._build_structured_system_prompt` now says "most legal corpora need multiple targeted queries" instead of "most legal documents …" — corpora-scoped wording for the corpus agent. The document-agent prompt is unchanged. + - **`config/settings/test.py`** — The `DEFAULT_EMBEDDER` env-var override is now gated behind an explicit `BENCHMARK_MODE=1` env var. Without that opt-in, a stray `DEFAULT_EMBEDDER` value in the CI environment would silently push the regular test suite onto a real embedder and start making live network calls. The default `TestEmbedder` keeps regular `pytest` runs hermetic. + - **`opencontractserver/tasks/data_extract_tasks.py`** — When `model_override` is accepted in the unrestricted (operator-only) path because `BENCHMARK_ALLOWED_MODEL_OVERRIDES` is unset, an `INFO`-level log line now records the override + cell ID. Lets operators `grep` production logs to confirm the open mode is fired only by the benchmark tooling and never by an unexpected web/GraphQL caller. + - **`opencontractserver/pipeline/utils.py:get_default_reranker_instance`** — Docstring now warns that `bulk_update` / `QuerySet.update` / data-migration writes to `PipelineSettings.default_reranker` bypass the `auto_now` `modified` field used as part of the cache key, so callers who mutate the singleton via those paths must also call `invalidate_reranker_cache()` (or touch `modified` explicitly). + ### Fixed - **Shared-protocol contract drift surfaced by PR #1400 follow-up review** (Issue #1408): @@ -17,6 +27,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Tests for issue #1410 fixes**: + - `opencontractserver/tests/test_data_extract_failure_classification.py::PydanticAiSchemaCanaryTests` — Pin the pydantic-ai message-schema discriminators (`ModelResponse.kind == "response"`, `ToolCallPart.part_kind == "tool-call"`, `TextPart.part_kind == "text"`) so a future minor version that renames them surfaces immediately rather than silently flipping every `None` extraction into the `empty_history` mis-classification mode. + - `opencontractserver/tests/test_batch_embedding.py::TestBatchEmbedTextAnnotations::test_transient_error_does_not_block_on_in_flight_peers` — Asserts the executor fast-fail path completes well inside the 10 s peer-block window. A regression that waits on in-flight peers (the previous default-shutdown behaviour) would take ~10 s and fail this test. + - `opencontractserver/tests/test_benchmarks.py::ForceCeleryEagerSafetyGuardsTestCase` — Pins the three new `force_celery_eager` safety refusals: non-test mode without the CLI env var, non-test mode *with* the CLI env var (allowed), and stacked invocations (`task_always_eager` already `True`). + + - **VCR.py wrapper for LLM calls in `doc_extract_query_task`** — `opencontractserver/utils/vcr_replay.py` exposes a `maybe_vcr_cassette()` context manager that, when `OC_LLM_VCR_MODE` and `OC_LLM_VCR_CASSETTE` are set on the celery worker, records or replays every HTTP call to LLM provider hosts (currently `api.openai.com` / `api.anthropic.com`). A custom request-body matcher strips volatile values (millisecond timestamps, Django document PKs, OpenAI tool-call IDs, UUIDs) so a cassette recorded against one DB replays cleanly against another. With the env vars unset the wrapper is a no-op — production behavior is unchanged. Pre-recorded cassette for the E2E extract spec lives at `opencontractserver/tests/fixtures/cassettes/e2e_extract_pdf_workflow/extract.yaml`. Replay was verified end-to-end against a deliberately-fake `OPENAI_API_KEY` to confirm no real network call is made. See `docs/development/e2e_vcr.md` for record / replay / debug instructions. - **`.github/workflows/frontend-e2e-extract.yml`** — CI workflow that runs the E2E extract spec against the full `local.yml` stack with `OC_LLM_VCR_MODE=replay` and a fake `OPENAI_API_KEY`. Triggered on `pull_request` (path-filtered to changes that can affect extract behaviour) in addition to `workflow_dispatch`. PDF parsing routes to the in-stack Docling microservice (`PDF_PARSER=docling`); the workflow forces `PipelineSettings.preferred_parsers["application/pdf"]` to Docling explicitly after `migrate` so a pre-existing DB singleton can't silently route through LlamaParse. No external service credentials required. - **`frontend/tests/e2e/extract-pdf-workflow.spec.ts`** — full-stack Playwright E2E spec for the extract pipeline: login → create corpus → upload two PDFs (`frontend/tests/fixtures/{usc-title-1,eton-agreement}.pdf`) → wait for parse + embedding → create extract with one column → run with a real OpenAI call → CSV export → assert non-empty cells. Adds new helpers to `frontend/tests/e2e/helpers.ts` (`uploadPdfViaUI`, `waitForDocumentReady`, `createExtractViaUI`, `openExtractByName`, `addColumnViaUI`, `addDocumentsToExtractViaUI`, `runExtractAndWaitForFinish`). Gated on `E2E_RUN_LLM_TESTS=true`; skipped in CI until LLM responses can be mocked over the wire. Runs on the live `local.yml` stack; required tweaks to disable Auth0 (`.envs/.local/.django USE_AUTH0=false`) and to widen the celeryworker `watchfiles --ignore-paths` (in `compose/local/django/celery/worker/start` and the `local.yml` command pointer) so editor / Playwright artifact writes don't hot-reload the worker mid-task. Also adds `data-testid="document-card"` (+ `data-processing` on the `/documents`-view variants) to `frontend/src/views/Documents.tsx` and `data-testid="document-card"` to `frontend/src/components/documents/ModernDocumentItem.tsx`, so tests can poll for the `backendLock` UI signal without depending on hover-only action menus. Cards are matched by `[data-testid="document-card"]` filtered with the visible title text — the standard Playwright pattern. diff --git a/config/settings/test.py b/config/settings/test.py index 103a5e5b0..d577d16a8 100644 --- a/config/settings/test.py +++ b/config/settings/test.py @@ -133,12 +133,21 @@ # # Intentionally env-overridable: benchmark runs via the test.yml compose # stack (see opencontractserver/benchmarks/) need to swap in a real embedder -# at runtime without editing settings. Standard CI never sets DEFAULT_EMBEDDER, -# so the default TestEmbedder keeps regular test runs hermetic. -DEFAULT_EMBEDDER = env( - "DEFAULT_EMBEDDER", - default="opencontractserver.pipeline.embedders.test_embedder.TestEmbedder", +# at runtime without editing settings. +# +# Footgun guard (issue #1410): the override only takes effect when the +# explicit ``BENCHMARK_MODE`` env var is also set. Without this guard, any +# stray ``DEFAULT_EMBEDDER`` value in CI's environment would silently push +# the regular test suite onto a real embedder and start making live +# network calls. Forcing operators to set ``BENCHMARK_MODE=1`` makes the +# escape hatch deliberate. +_OC_TEST_EMBEDDER_DEFAULT = ( + "opencontractserver.pipeline.embedders.test_embedder.TestEmbedder" ) +if env.bool("BENCHMARK_MODE", default=False): + DEFAULT_EMBEDDER = env("DEFAULT_EMBEDDER", default=_OC_TEST_EMBEDDER_DEFAULT) +else: + DEFAULT_EMBEDDER = _OC_TEST_EMBEDDER_DEFAULT # Auth0 settings for tests # ------------------------------------------------------------------------------ diff --git a/opencontractserver/benchmarks/loader.py b/opencontractserver/benchmarks/loader.py index ca984239b..7919a6a07 100644 --- a/opencontractserver/benchmarks/loader.py +++ b/opencontractserver/benchmarks/loader.py @@ -85,13 +85,61 @@ def force_celery_eager(): database, we temporarily force celery into eager mode. This is safe because the management command is a one-off CLI, not a web worker. + Safety guards (issue #1410): + * Refuses to run unless ``settings.MODE`` is ``TEST`` *or* the + ``OC_BENCHMARK_CLI`` environment variable is set. Calling this + context manager from a live web worker or a non-benchmark Celery + worker silently routes every task dispatched during the benchmark + window through the in-process executor, which would corrupt + production behaviour. + * Refuses to run when celery is already in eager mode *and* the + process is not the test suite — that almost always means another + invocation of this context manager is active on a sibling thread + (e.g. a concurrent benchmark CLI), and our naive save/restore + would leave the flag flipped to ``False`` when the inner block + exits. In ``settings.MODE == "TEST"`` the eager flag is the + ambient state imposed by ``CELERY_TASK_ALWAYS_EAGER``; the helper + is then a no-op (yields without mutating the config) so callers + like the benchmark integration tests work transparently. + Warning: This mutates the global Celery config for the current process. Do not call from a shared worker process or web request handler — only from one-off CLIs, notebooks, and test suites. """ + import os + + from django.conf import settings as django_settings + + is_test_mode = getattr(django_settings, "MODE", "").upper() == "TEST" + is_benchmark_cli = bool(os.environ.get("OC_BENCHMARK_CLI")) + if not (is_test_mode or is_benchmark_cli): + raise RuntimeError( + "force_celery_eager() refuses to mutate the global Celery config " + "outside test mode or a benchmark CLI invocation. Set " + "OC_BENCHMARK_CLI=1 in the environment if this really is the " + "benchmark management command." + ) + conf = current_app.conf prev_always_eager = conf.task_always_eager prev_eager_propagates = conf.task_eager_propagates + if prev_always_eager: + if is_test_mode: + # Test settings set ``CELERY_TASK_ALWAYS_EAGER = True`` globally, + # so eager is the ambient state — not a concurrent benchmark. + # Yield without mutating the config; the caller already has the + # in-process executor it asked for. + yield + return + # Outside test mode, an already-eager flag almost certainly means a + # sibling benchmark is mutating the global config; our naive + # save/restore would clobber its expected ``True`` on exit. Loud + # failure beats silent corruption. + raise RuntimeError( + "force_celery_eager() called while task_always_eager is already " + "True. Concurrent benchmark runs in the same process are not " + "supported because they would race on the global Celery config." + ) conf.task_always_eager = True conf.task_eager_propagates = True try: diff --git a/opencontractserver/benchmarks/management/commands/run_benchmark.py b/opencontractserver/benchmarks/management/commands/run_benchmark.py index f44fd5213..d43168355 100644 --- a/opencontractserver/benchmarks/management/commands/run_benchmark.py +++ b/opencontractserver/benchmarks/management/commands/run_benchmark.py @@ -178,6 +178,14 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None: ) def handle(self, *args, **options) -> None: + # Mark the process as a benchmark CLI invocation so + # ``force_celery_eager`` will accept the global Celery-config + # mutation. Without this opt-in env var the helper refuses to + # run in non-test mode (issue #1410). + import os + + os.environ.setdefault("OC_BENCHMARK_CLI", "1") + username = options["user"] User = get_user_model() try: diff --git a/opencontractserver/llms/agents/pydantic_ai_agents.py b/opencontractserver/llms/agents/pydantic_ai_agents.py index e420f6600..1d0888a56 100644 --- a/opencontractserver/llms/agents/pydantic_ai_agents.py +++ b/opencontractserver/llms/agents/pydantic_ai_agents.py @@ -2610,7 +2610,7 @@ def _build_structured_system_prompt( "different angles (paraphrase the question, search for key " "terms, search for likely answer phrasings). A single failed " "search is NOT sufficient evidence that the information is " - "missing — most legal documents need multiple targeted " + "missing — most legal corpora need multiple targeted " "queries to surface a relevant span. This rule applies only " "to giving up; once you have a confident answer, rule #3 " "takes precedence and you commit immediately.\n" diff --git a/opencontractserver/pipeline/utils.py b/opencontractserver/pipeline/utils.py index a5deaf32f..ef7298e8e 100644 --- a/opencontractserver/pipeline/utils.py +++ b/opencontractserver/pipeline/utils.py @@ -619,6 +619,16 @@ def get_default_reranker_instance( in-memory will hit stale instances — set ``STRICT_RERANKER`` (which bypasses the cache fast-path) or call :func:`invalidate_reranker_cache` explicitly if you need a fresh instance from a fixture. + + Caveat (issue #1410): ``modified`` is an ``auto_now`` field that only + ticks on ``save()`` / ``Model.save()``-equivalent code paths. Any code + that bypasses ``save()`` — e.g., ``QuerySet.update``, ``bulk_update``, + or a data migration that writes ``default_reranker`` directly — will + *not* bump ``modified``, so workers can stay pinned on the stale cached + instance until their process restarts. If you must mutate the singleton + from a migration or bulk-update path, also call + :func:`invalidate_reranker_cache` (or touch ``modified`` explicitly) so + every worker picks up the new config on its next lookup. """ from django.conf import settings as django_settings diff --git a/opencontractserver/tasks/data_extract_tasks.py b/opencontractserver/tasks/data_extract_tasks.py index 54726ad19..4730fd71a 100644 --- a/opencontractserver/tasks/data_extract_tasks.py +++ b/opencontractserver/tasks/data_extract_tasks.py @@ -347,6 +347,18 @@ def sync_get_corpus_id(document): f"model_override {model_override!r} is not in " f"BENCHMARK_ALLOWED_MODEL_OVERRIDES" ) + elif allowed is None: + # Unrestricted (operator-only) path — emit a WARNING so the + # line stands out in production alerting. An operator who + # forgets to set ``BENCHMARK_ALLOWED_MODEL_OVERRIDES`` is + # arguably mis-configured; an unexpected web/GraphQL caller + # firing this path is a potential abuse signal. (issue #1410) + logger.warning( + "BENCHMARK_ALLOWED_MODEL_OVERRIDES unset; accepting " + "model_override=%r (operator-only path) for cell_id=%s", + model_override, + cell_id, + ) document = datacell.document column = datacell.column diff --git a/opencontractserver/tasks/embeddings_task.py b/opencontractserver/tasks/embeddings_task.py index 147bf3c4d..9e13dbdc8 100644 --- a/opencontractserver/tasks/embeddings_task.py +++ b/opencontractserver/tasks/embeddings_task.py @@ -503,11 +503,17 @@ def _batch_embed_text_annotations( # call, then return ``(chunk, vectors_or_exc)`` for the main # thread to drain. # - # On a transient exception in any future, we re-raise immediately - # so celery's autoretry can fire; in-flight peers are abandoned - # (the main thread exits the executor's ``with`` block which - # cancels still-queued futures and waits for in-flight ones to - # exit naturally). + # On a transient exception in any future we want celery's autoretry + # to fire as soon as possible. Letting the exception propagate out of + # the ``with`` block triggers ``ThreadPoolExecutor.__exit__`` which + # calls ``shutdown(wait=True, cancel_futures=False)`` by default — + # that blocks until every in-flight peer round-trip finishes, + # delaying the retry by up to ~max_workers× the sub-batch latency. + # Instead we capture the first transient exception, break out of the + # ``as_completed`` loop, drop queued futures with + # ``shutdown(wait=False, cancel_futures=True)``, then re-raise. Already + # in-flight HTTP calls cannot be torn down from Python, but at least + # we no longer wait for them. (issue #1410) max_workers = max(1, getattr(embedder, "embed_max_concurrent_sub_batches", 1)) log_prefix = ( f"embed_texts_batch (sub-batches={len(chunks)}, parallel={max_workers}, " @@ -520,7 +526,17 @@ def _embed_one(chunk): return chunk, embedder.embed_texts_batch(texts_only) # Map future -> chunk index for logging/sub-batch numbering. - with ThreadPoolExecutor(max_workers=max_workers) as executor: + # + # Transient-error handling: instead of raising directly inside the + # ``with`` block (which would call ``shutdown(wait=True)`` and block + # until in-flight peers complete), we capture the first transient + # exception, break out of the loop, and explicitly call + # ``shutdown(wait=False, cancel_futures=True)`` before re-raising + # outside the block. This unblocks Celery autoretry as fast as + # possible. (issue #1410) + transient_exc: Optional[BaseException] = None + executor = ThreadPoolExecutor(max_workers=max_workers) + try: future_to_idx = { executor.submit(_embed_one, chunk): idx for idx, chunk in enumerate(chunks) } @@ -532,15 +548,19 @@ def _embed_one(chunk): # ValueError indicates a caller contract violation (e.g., batch size # exceeds embedder maximum). Re-raise rather than silently recording # as an annotation failure so the programming error surfaces loudly. + executor.shutdown(wait=False, cancel_futures=True) raise except ( requests.exceptions.Timeout, requests.exceptions.ConnectionError, EmbeddingServerError, - ): + ) as e: # Transient HTTP errors: re-raise so the task-level Celery # autoretry_for=(Exception,) decorator can fire a retry. - raise + # Defer the raise so we can drop queued futures first + # without blocking on in-flight peers. + transient_exc = e + break except EmbeddingClientError as e: # Client errors (4xx): non-retriable, record as permanent # per-annotation failures. We explicitly swallow the exception @@ -615,6 +635,18 @@ def _embed_one(chunk): ) result["failed"] += 1 result["errors"].append(f"Annotation {annot.id}: store failed: {e}") + finally: + # On the transient-error fast path we want queued futures dropped + # and the executor shut down without waiting on in-flight peers. + # On the happy path ``shutdown(wait=False)`` is still safe — every + # future has already been drained by ``as_completed``. + executor.shutdown(wait=False, cancel_futures=True) + + if transient_exc is not None: + # Re-raise the captured transient exception now that queued + # futures have been cancelled. Celery's task-level + # ``autoretry_for=(Exception,)`` decorator will fire. + raise transient_exc @shared_task( diff --git a/opencontractserver/tests/test_batch_embedding.py b/opencontractserver/tests/test_batch_embedding.py index 398f43150..ee7c4bf4c 100644 --- a/opencontractserver/tests/test_batch_embedding.py +++ b/opencontractserver/tests/test_batch_embedding.py @@ -412,6 +412,101 @@ def embed_texts_batch(self, texts, **kw): annots, ServerErrorEmbedder(), "test.ServerErrorEmbedder", 50, result ) + def test_transient_error_does_not_block_on_in_flight_peers(self): + """Transient HTTP errors short-circuit the executor (issue #1410). + + With ``max_workers > 1`` and a slow peer sub-batch, the previous + implementation would let the executor's default + ``shutdown(wait=True, cancel_futures=False)`` block until every + in-flight peer round-trip finished. The fix re-raises *outside* + the ``with`` block after explicitly calling + ``shutdown(wait=False, cancel_futures=True)``. + + The test fans out 4 sub-batches: index 0 raises ``Timeout`` + immediately; the remaining 3 each block on a ``threading.Event`` + that the test never sets. A correct implementation propagates + the ``Timeout`` quickly (well under the per-call hang); a + regression would hang until the wait timeout below kicks in. + """ + import threading + import time + + never_release = threading.Event() + call_started = threading.Event() + # Track whether peer sub-batches were ever cancelled (i.e. never called). + peer_call_count = [0] + + class FastFailThenBlockEmbedder(DummyEmbedder384): + embed_max_concurrent_sub_batches = 4 + _calls = [0] + _calls_lock = threading.Lock() + + def embed_texts_batch(self, texts, **kw): + with self._calls_lock: + self._calls[0] += 1 + nth = self._calls[0] + if nth == 1: + # First in -> trigger the fast-fail path. + call_started.set() + raise requests.exceptions.Timeout("simulated timeout") + # Peer sub-batches: count the call so we can assert that + # at least one peer started before being abandoned, then + # block until the test releases (or we time out trying). + with self._calls_lock: + peer_call_count[0] += 1 + # Cap the block at a generous bound so a regression times + # out the test rather than hanging the whole suite. + released = never_release.wait(timeout=10.0) + if not released: + return [[0.1] * 384] * len(texts) + return [[0.1] * 384] * len(texts) + + # 4 sub-batches with api_batch_size=1. + annots = [_make_mock_annotation(i, f"Text {i}") for i in range(4)] + result = self._make_result() + + embedder = FastFailThenBlockEmbedder() + + deadline_seconds = 5.0 + start = time.monotonic() + with self.assertRaises(requests.exceptions.Timeout): + _batch_embed_text_annotations( + annots, embedder, "test.FastFailThenBlockEmbedder", 1, result + ) + elapsed = time.monotonic() - start + + # Always release peers so daemon threads exit cleanly. + never_release.set() + + # Confirm the first sub-batch actually fired before checking the + # timing — otherwise a regression that never schedules sub-batch 0 + # would silently pass the elapsed-time check. + self.assertTrue( + call_started.is_set(), + msg="first sub-batch never started; timing assertion would be vacuous", + ) + + # The fast-fail path must complete well inside the 10s peer + # block. A regression that waits on in-flight peers would take + # ~10s; we give a generous 5s headroom over scheduling jitter. + self.assertLess( + elapsed, + deadline_seconds, + msg=( + f"Fast-fail path took {elapsed:.2f}s; expected < " + f"{deadline_seconds}s. Executor likely waited on in-flight peers." + ), + ) + + # At least one peer sub-batch entered ``embed_texts_batch`` before + # being abandoned — otherwise the executor cancelled every queued + # sub-batch and the test no longer exercises the in-flight path. + self.assertGreater( + peer_call_count[0], + 0, + msg="no peer sub-batches started; in-flight fast-fail path not exercised", + ) + def test_client_error_recorded_as_permanent_failure(self): """EmbeddingClientError from embed_texts_batch is caught and recorded. diff --git a/opencontractserver/tests/test_benchmarks.py b/opencontractserver/tests/test_benchmarks.py index 95df9256f..5545459a1 100644 --- a/opencontractserver/tests/test_benchmarks.py +++ b/opencontractserver/tests/test_benchmarks.py @@ -611,6 +611,100 @@ def _rmtree(path: str) -> None: shutil.rmtree(path, ignore_errors=True) +class ForceCeleryEagerSafetyGuardsTestCase(PyUnitTestCase): + """Tests for the safety guards on :func:`force_celery_eager` (issue #1410). + + The context manager mutates the global Celery config; calling it from + a live worker or a non-benchmark process would silently route every + task dispatched during the benchmark window through the in-process + executor. These tests pin the explicit refusals. + """ + + def test_refuses_outside_test_mode_without_cli_env(self): + """Non-test, non-CLI processes must raise rather than mutate config.""" + import os + + from opencontractserver.benchmarks.loader import force_celery_eager + + prev_cli = os.environ.pop("OC_BENCHMARK_CLI", None) + try: + with override_settings(MODE="PROD"): + with self.assertRaises(RuntimeError) as ctx: + with force_celery_eager(): + pass + self.assertIn("benchmark", str(ctx.exception).lower()) + finally: + if prev_cli is not None: + os.environ["OC_BENCHMARK_CLI"] = prev_cli + + def test_allows_when_oc_benchmark_cli_env_is_set(self): + """An explicit ``OC_BENCHMARK_CLI`` env var unlocks the helper. + + The helper otherwise refuses outside test mode; the env var lets + the benchmark CLI invoke it from a non-test process. Note: the + Celery conf in this project is bound to Django settings via + ``config_from_object``, which makes ``conf.task_always_eager`` + effectively read-only — so we verify the unlock by asserting the + helper *does not raise*, not by inspecting the flag. + """ + import os + + from opencontractserver.benchmarks.loader import force_celery_eager + + prev_cli = os.environ.get("OC_BENCHMARK_CLI") + os.environ["OC_BENCHMARK_CLI"] = "1" + try: + with override_settings( + MODE="PROD", CELERY_TASK_ALWAYS_EAGER=False + ): + # Helper enters the non-eager CLI path and yields without + # raising. No assertion on the conf — see docstring. + with force_celery_eager(): + pass + finally: + if prev_cli is None: + os.environ.pop("OC_BENCHMARK_CLI", None) + else: + os.environ["OC_BENCHMARK_CLI"] = prev_cli + + def test_refuses_when_already_eager_outside_test_mode(self): + """Concurrent (or stacked) CLI invocations are rejected loudly.""" + import os + + from opencontractserver.benchmarks.loader import force_celery_eager + + prev_cli = os.environ.get("OC_BENCHMARK_CLI") + os.environ["OC_BENCHMARK_CLI"] = "1" + try: + with override_settings( + MODE="PROD", CELERY_TASK_ALWAYS_EAGER=True + ): + with self.assertRaises(RuntimeError) as ctx: + with force_celery_eager(): + pass + self.assertIn("already", str(ctx.exception).lower()) + finally: + if prev_cli is None: + os.environ.pop("OC_BENCHMARK_CLI", None) + else: + os.environ["OC_BENCHMARK_CLI"] = prev_cli + + def test_test_mode_no_op_when_already_eager(self): + """Test mode treats an already-eager flag as the ambient state.""" + from celery import current_app + + from opencontractserver.benchmarks.loader import force_celery_eager + + # Default settings.MODE == "TEST" and CELERY_TASK_ALWAYS_EAGER == True; + # the helper should yield without mutating the global config and never + # raise — this is exactly the path + # ``BenchmarkRunnerIntegrationTestCase`` relies on. + self.assertTrue(current_app.conf.task_always_eager) + with force_celery_eager(): + self.assertTrue(current_app.conf.task_always_eager) + self.assertTrue(current_app.conf.task_always_eager) + + class BenchmarkTaskDataclassTestCase(PyUnitTestCase): """Guard against accidental changes to the public BenchmarkTask shape.""" diff --git a/opencontractserver/tests/test_data_extract_failure_classification.py b/opencontractserver/tests/test_data_extract_failure_classification.py index a622af09d..81051a336 100644 --- a/opencontractserver/tests/test_data_extract_failure_classification.py +++ b/opencontractserver/tests/test_data_extract_failure_classification.py @@ -47,6 +47,43 @@ def _text(content: str) -> Any: return TextPart(content=content) +class PydanticAiSchemaCanaryTests(SimpleTestCase): + """Pin the pydantic-ai message-schema assumptions that the classifier relies on. + + ``_classify_none_result`` uses ``isinstance(msg, ModelResponse)`` and + ``isinstance(part, ToolCallPart)`` rather than string-matching the + private ``.kind`` / ``.part_kind`` discriminators (issue #1410). These + canary tests fail fast if a pydantic-ai version bump renames either + the class import path or the discriminator values, so the silent + ``empty_history`` mis-classification mode is impossible to slip past. + """ + + def test_model_response_is_importable_and_constructible(self) -> None: + from pydantic_ai.messages import ModelResponse, TextPart + + # Constructible with at least one part — the shape we depend on. + msg = ModelResponse(parts=[TextPart(content="ok")]) + self.assertIsInstance(msg, ModelResponse) + # The discriminator we used to depend on (string-match) still + # exists; if upstream renames it the assertion forces a deliberate + # update of the classifier rather than a silent regression. + self.assertEqual(getattr(msg, "kind", None), "response") + + def test_tool_call_part_discriminator_unchanged(self) -> None: + from pydantic_ai.messages import ToolCallPart + + part = ToolCallPart( + tool_name="final_result", args={"value": None}, tool_call_id="canary" + ) + self.assertEqual(getattr(part, "part_kind", None), "tool-call") + + def test_text_part_discriminator_unchanged(self) -> None: + from pydantic_ai.messages import TextPart + + part = TextPart(content="hello") + self.assertEqual(getattr(part, "part_kind", None), "text") + + class ClassifyNoneResultTests(SimpleTestCase): """Unit tests for :func:`_classify_none_result`."""