Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- **Extract pipeline cleanup pass** (Issue #1410): hardens the extraction-related plumbing introduced by #1381 / #1380 / #1399 against three medium-impact failure modes and four documentation/observability gaps surfaced during code review.
- **`opencontractserver/tasks/embeddings_task.py:_batch_embed_text_annotations`** — Transient HTTP errors (`requests.Timeout`, `requests.ConnectionError`, `EmbeddingServerError`) now drop queued sub-batches and shut down the `ThreadPoolExecutor` with `wait=False, cancel_futures=True` before re-raising. Previously the exception propagated out of the `with` block, which calls `shutdown(wait=True, cancel_futures=False)` by default and blocked Celery autoretry by up to ~`max_workers` × the per-sub-batch round-trip latency. In-flight HTTP calls cannot be torn down from Python, but at least we no longer wait on them.
- **`opencontractserver/benchmarks/loader.py:force_celery_eager`** — Refuses to mutate the global Celery config unless `settings.MODE == "TEST"` *or* the new `OC_BENCHMARK_CLI` env var is set; outside test mode also refuses when `task_always_eager` is already `True` (concurrent benchmark runs would race on the global save/restore). In test mode an already-`True` flag is the ambient state imposed by `CELERY_TASK_ALWAYS_EAGER=True`, so the helper is a no-op there. The `run_benchmark` management command now sets `OC_BENCHMARK_CLI=1` automatically. Prevents the benchmark helper from silently routing every task in a live web/worker process through the in-process executor.
- **`opencontractserver/llms/agents/pydantic_ai_agents.py`** — `PydanticAICorpusAgent._build_structured_system_prompt` now says "most legal corpora need multiple targeted queries" instead of "most legal documents …" — corpora-scoped wording for the corpus agent. The document-agent prompt is unchanged.
- **`config/settings/test.py`** — The `DEFAULT_EMBEDDER` env-var override is now gated behind an explicit `BENCHMARK_MODE=1` env var. Without that opt-in, a stray `DEFAULT_EMBEDDER` value in the CI environment would silently push the regular test suite onto a real embedder and start making live network calls. The default `TestEmbedder` keeps regular `pytest` runs hermetic.
- **`opencontractserver/tasks/data_extract_tasks.py`** — When `model_override` is accepted in the unrestricted (operator-only) path because `BENCHMARK_ALLOWED_MODEL_OVERRIDES` is unset, an `INFO`-level log line now records the override + cell ID. Lets operators `grep` production logs to confirm the open mode is fired only by the benchmark tooling and never by an unexpected web/GraphQL caller.
- **`opencontractserver/pipeline/utils.py:get_default_reranker_instance`** — Docstring now warns that `bulk_update` / `QuerySet.update` / data-migration writes to `PipelineSettings.default_reranker` bypass the `auto_now` `modified` field used as part of the cache key, so callers who mutate the singleton via those paths must also call `invalidate_reranker_cache()` (or touch `modified` explicitly).

### Fixed

- **Shared-protocol contract drift surfaced by PR #1400 follow-up review** (Issue #1408):
Expand All @@ -17,6 +27,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Tests for issue #1410 fixes**:
- `opencontractserver/tests/test_data_extract_failure_classification.py::PydanticAiSchemaCanaryTests` — Pin the pydantic-ai message-schema discriminators (`ModelResponse.kind == "response"`, `ToolCallPart.part_kind == "tool-call"`, `TextPart.part_kind == "text"`) so a future minor version that renames them surfaces immediately rather than silently flipping every `None` extraction into the `empty_history` mis-classification mode.
- `opencontractserver/tests/test_batch_embedding.py::TestBatchEmbedTextAnnotations::test_transient_error_does_not_block_on_in_flight_peers` — Asserts the executor fast-fail path completes well inside the 10 s peer-block window. A regression that waits on in-flight peers (the previous default-shutdown behaviour) would take ~10 s and fail this test.
- `opencontractserver/tests/test_benchmarks.py::ForceCeleryEagerSafetyGuardsTestCase` — Pins the three new `force_celery_eager` safety refusals: non-test mode without the CLI env var, non-test mode *with* the CLI env var (allowed), and stacked invocations (`task_always_eager` already `True`).


- **VCR.py wrapper for LLM calls in `doc_extract_query_task`** — `opencontractserver/utils/vcr_replay.py` exposes a `maybe_vcr_cassette()` context manager that, when `OC_LLM_VCR_MODE` and `OC_LLM_VCR_CASSETTE` are set on the celery worker, records or replays every HTTP call to LLM provider hosts (currently `api.openai.com` / `api.anthropic.com`). A custom request-body matcher strips volatile values (millisecond timestamps, Django document PKs, OpenAI tool-call IDs, UUIDs) so a cassette recorded against one DB replays cleanly against another. With the env vars unset the wrapper is a no-op — production behavior is unchanged. Pre-recorded cassette for the E2E extract spec lives at `opencontractserver/tests/fixtures/cassettes/e2e_extract_pdf_workflow/extract.yaml`. Replay was verified end-to-end against a deliberately-fake `OPENAI_API_KEY` to confirm no real network call is made. See `docs/development/e2e_vcr.md` for record / replay / debug instructions.
- **`.github/workflows/frontend-e2e-extract.yml`** — CI workflow that runs the E2E extract spec against the full `local.yml` stack with `OC_LLM_VCR_MODE=replay` and a fake `OPENAI_API_KEY`. Triggered on `pull_request` (path-filtered to changes that can affect extract behaviour) in addition to `workflow_dispatch`. PDF parsing routes to the in-stack Docling microservice (`PDF_PARSER=docling`); the workflow forces `PipelineSettings.preferred_parsers["application/pdf"]` to Docling explicitly after `migrate` so a pre-existing DB singleton can't silently route through LlamaParse. No external service credentials required.
- **`frontend/tests/e2e/extract-pdf-workflow.spec.ts`** — full-stack Playwright E2E spec for the extract pipeline: login → create corpus → upload two PDFs (`frontend/tests/fixtures/{usc-title-1,eton-agreement}.pdf`) → wait for parse + embedding → create extract with one column → run with a real OpenAI call → CSV export → assert non-empty cells. Adds new helpers to `frontend/tests/e2e/helpers.ts` (`uploadPdfViaUI`, `waitForDocumentReady`, `createExtractViaUI`, `openExtractByName`, `addColumnViaUI`, `addDocumentsToExtractViaUI`, `runExtractAndWaitForFinish`). Gated on `E2E_RUN_LLM_TESTS=true`; skipped in CI until LLM responses can be mocked over the wire. Runs on the live `local.yml` stack; required tweaks to disable Auth0 (`.envs/.local/.django USE_AUTH0=false`) and to widen the celeryworker `watchfiles --ignore-paths` (in `compose/local/django/celery/worker/start` and the `local.yml` command pointer) so editor / Playwright artifact writes don't hot-reload the worker mid-task. Also adds `data-testid="document-card"` (+ `data-processing` on the `/documents`-view variants) to `frontend/src/views/Documents.tsx` and `data-testid="document-card"` to `frontend/src/components/documents/ModernDocumentItem.tsx`, so tests can poll for the `backendLock` UI signal without depending on hover-only action menus. Cards are matched by `[data-testid="document-card"]` filtered with the visible title text — the standard Playwright pattern.
Expand Down
19 changes: 14 additions & 5 deletions config/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,21 @@
#
# Intentionally env-overridable: benchmark runs via the test.yml compose
# stack (see opencontractserver/benchmarks/) need to swap in a real embedder
# at runtime without editing settings. Standard CI never sets DEFAULT_EMBEDDER,
# so the default TestEmbedder keeps regular test runs hermetic.
DEFAULT_EMBEDDER = env(
"DEFAULT_EMBEDDER",
default="opencontractserver.pipeline.embedders.test_embedder.TestEmbedder",
# at runtime without editing settings.
#
# Footgun guard (issue #1410): the override only takes effect when the
# explicit ``BENCHMARK_MODE`` env var is also set. Without this guard, any
# stray ``DEFAULT_EMBEDDER`` value in CI's environment would silently push
# the regular test suite onto a real embedder and start making live
# network calls. Forcing operators to set ``BENCHMARK_MODE=1`` makes the
# escape hatch deliberate.
_OC_TEST_EMBEDDER_DEFAULT = (
"opencontractserver.pipeline.embedders.test_embedder.TestEmbedder"
)
if env.bool("BENCHMARK_MODE", default=False):
DEFAULT_EMBEDDER = env("DEFAULT_EMBEDDER", default=_OC_TEST_EMBEDDER_DEFAULT)
else:
DEFAULT_EMBEDDER = _OC_TEST_EMBEDDER_DEFAULT

# Auth0 settings for tests
# ------------------------------------------------------------------------------
Expand Down
48 changes: 48 additions & 0 deletions opencontractserver/benchmarks/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,61 @@ def force_celery_eager():
database, we temporarily force celery into eager mode. This is safe
because the management command is a one-off CLI, not a web worker.

Safety guards (issue #1410):
* Refuses to run unless ``settings.MODE`` is ``TEST`` *or* the
``OC_BENCHMARK_CLI`` environment variable is set. Calling this
context manager from a live web worker or a non-benchmark Celery
worker silently routes every task dispatched during the benchmark
window through the in-process executor, which would corrupt
production behaviour.
* Refuses to run when celery is already in eager mode *and* the
process is not the test suite — that almost always means another
invocation of this context manager is active on a sibling thread
(e.g. a concurrent benchmark CLI), and our naive save/restore
would leave the flag flipped to ``False`` when the inner block
exits. In ``settings.MODE == "TEST"`` the eager flag is the
ambient state imposed by ``CELERY_TASK_ALWAYS_EAGER``; the helper
is then a no-op (yields without mutating the config) so callers
like the benchmark integration tests work transparently.

Warning: This mutates the global Celery config for the current process.
Do not call from a shared worker process or web request handler — only
from one-off CLIs, notebooks, and test suites.
"""
import os

from django.conf import settings as django_settings

is_test_mode = getattr(django_settings, "MODE", "").upper() == "TEST"
is_benchmark_cli = bool(os.environ.get("OC_BENCHMARK_CLI"))
if not (is_test_mode or is_benchmark_cli):
raise RuntimeError(
"force_celery_eager() refuses to mutate the global Celery config "
"outside test mode or a benchmark CLI invocation. Set "
"OC_BENCHMARK_CLI=1 in the environment if this really is the "
"benchmark management command."
)

conf = current_app.conf
prev_always_eager = conf.task_always_eager
prev_eager_propagates = conf.task_eager_propagates
if prev_always_eager:
if is_test_mode:
# Test settings set ``CELERY_TASK_ALWAYS_EAGER = True`` globally,
# so eager is the ambient state — not a concurrent benchmark.
# Yield without mutating the config; the caller already has the
# in-process executor it asked for.
yield
return
# Outside test mode, an already-eager flag almost certainly means a
# sibling benchmark is mutating the global config; our naive
# save/restore would clobber its expected ``True`` on exit. Loud
# failure beats silent corruption.
raise RuntimeError(
"force_celery_eager() called while task_always_eager is already "
"True. Concurrent benchmark runs in the same process are not "
"supported because they would race on the global Celery config."
)
conf.task_always_eager = True
conf.task_eager_propagates = True
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ def add_arguments(self, parser: argparse.ArgumentParser) -> None:
)

def handle(self, *args, **options) -> None:
# Mark the process as a benchmark CLI invocation so
# ``force_celery_eager`` will accept the global Celery-config
# mutation. Without this opt-in env var the helper refuses to
# run in non-test mode (issue #1410).
import os

os.environ.setdefault("OC_BENCHMARK_CLI", "1")

username = options["user"]
User = get_user_model()
try:
Expand Down
2 changes: 1 addition & 1 deletion opencontractserver/llms/agents/pydantic_ai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2610,7 +2610,7 @@ def _build_structured_system_prompt(
"different angles (paraphrase the question, search for key "
"terms, search for likely answer phrasings). A single failed "
"search is NOT sufficient evidence that the information is "
"missing — most legal documents need multiple targeted "
"missing — most legal corpora need multiple targeted "
"queries to surface a relevant span. This rule applies only "
"to giving up; once you have a confident answer, rule #3 "
"takes precedence and you commit immediately.\n"
Expand Down
10 changes: 10 additions & 0 deletions opencontractserver/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,16 @@ def get_default_reranker_instance(
in-memory will hit stale instances — set ``STRICT_RERANKER`` (which
bypasses the cache fast-path) or call :func:`invalidate_reranker_cache`
explicitly if you need a fresh instance from a fixture.

Caveat (issue #1410): ``modified`` is an ``auto_now`` field that only
ticks on ``save()`` / ``Model.save()``-equivalent code paths. Any code
that bypasses ``save()`` — e.g., ``QuerySet.update``, ``bulk_update``,
or a data migration that writes ``default_reranker`` directly — will
*not* bump ``modified``, so workers can stay pinned on the stale cached
instance until their process restarts. If you must mutate the singleton
from a migration or bulk-update path, also call
:func:`invalidate_reranker_cache` (or touch ``modified`` explicitly) so
every worker picks up the new config on its next lookup.
"""
from django.conf import settings as django_settings

Expand Down
12 changes: 12 additions & 0 deletions opencontractserver/tasks/data_extract_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ def sync_get_corpus_id(document):
f"model_override {model_override!r} is not in "
f"BENCHMARK_ALLOWED_MODEL_OVERRIDES"
)
elif allowed is None:
# Unrestricted (operator-only) path — emit a WARNING so the
# line stands out in production alerting. An operator who
# forgets to set ``BENCHMARK_ALLOWED_MODEL_OVERRIDES`` is
# arguably mis-configured; an unexpected web/GraphQL caller
# firing this path is a potential abuse signal. (issue #1410)
logger.warning(
"BENCHMARK_ALLOWED_MODEL_OVERRIDES unset; accepting "
"model_override=%r (operator-only path) for cell_id=%s",
model_override,
cell_id,
)

document = datacell.document
column = datacell.column
Expand Down
48 changes: 40 additions & 8 deletions opencontractserver/tasks/embeddings_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,17 @@ def _batch_embed_text_annotations(
# call, then return ``(chunk, vectors_or_exc)`` for the main
# thread to drain.
#
# On a transient exception in any future, we re-raise immediately
# so celery's autoretry can fire; in-flight peers are abandoned
# (the main thread exits the executor's ``with`` block which
# cancels still-queued futures and waits for in-flight ones to
# exit naturally).
# On a transient exception in any future we want celery's autoretry
# to fire as soon as possible. Letting the exception propagate out of
# the ``with`` block triggers ``ThreadPoolExecutor.__exit__`` which
# calls ``shutdown(wait=True, cancel_futures=False)`` by default —
# that blocks until every in-flight peer round-trip finishes,
# delaying the retry by up to ~max_workers× the sub-batch latency.
# Instead we capture the first transient exception, break out of the
# ``as_completed`` loop, drop queued futures with
# ``shutdown(wait=False, cancel_futures=True)``, then re-raise. Already
# in-flight HTTP calls cannot be torn down from Python, but at least
# we no longer wait for them. (issue #1410)
max_workers = max(1, getattr(embedder, "embed_max_concurrent_sub_batches", 1))
log_prefix = (
f"embed_texts_batch (sub-batches={len(chunks)}, parallel={max_workers}, "
Expand All @@ -520,7 +526,17 @@ def _embed_one(chunk):
return chunk, embedder.embed_texts_batch(texts_only)

# Map future -> chunk index for logging/sub-batch numbering.
with ThreadPoolExecutor(max_workers=max_workers) as executor:
#
# Transient-error handling: instead of raising directly inside the
# ``with`` block (which would call ``shutdown(wait=True)`` and block
# until in-flight peers complete), we capture the first transient
# exception, break out of the loop, and explicitly call
# ``shutdown(wait=False, cancel_futures=True)`` before re-raising
# outside the block. This unblocks Celery autoretry as fast as
# possible. (issue #1410)
transient_exc: Optional[BaseException] = None
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
future_to_idx = {
executor.submit(_embed_one, chunk): idx for idx, chunk in enumerate(chunks)
}
Expand All @@ -532,15 +548,19 @@ def _embed_one(chunk):
# ValueError indicates a caller contract violation (e.g., batch size
# exceeds embedder maximum). Re-raise rather than silently recording
# as an annotation failure so the programming error surfaces loudly.
executor.shutdown(wait=False, cancel_futures=True)
raise
except (
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
EmbeddingServerError,
):
) as e:
# Transient HTTP errors: re-raise so the task-level Celery
# autoretry_for=(Exception,) decorator can fire a retry.
raise
# Defer the raise so we can drop queued futures first
# without blocking on in-flight peers.
transient_exc = e
break
except EmbeddingClientError as e:
# Client errors (4xx): non-retriable, record as permanent
# per-annotation failures. We explicitly swallow the exception
Expand Down Expand Up @@ -615,6 +635,18 @@ def _embed_one(chunk):
)
result["failed"] += 1
result["errors"].append(f"Annotation {annot.id}: store failed: {e}")
finally:
# On the transient-error fast path we want queued futures dropped
# and the executor shut down without waiting on in-flight peers.
# On the happy path ``shutdown(wait=False)`` is still safe — every
# future has already been drained by ``as_completed``.
executor.shutdown(wait=False, cancel_futures=True)

if transient_exc is not None:
# Re-raise the captured transient exception now that queued
# futures have been cancelled. Celery's task-level
# ``autoretry_for=(Exception,)`` decorator will fire.
raise transient_exc


@shared_task(
Expand Down
Loading
Loading