Skip to content

Extract pipeline cleanup follow-ups (#1410)#1417

Open
JSv4 wants to merge 1 commit intomainfrom
claude/resolve-issue-1410-aIXoP
Open

Extract pipeline cleanup follow-ups (#1410)#1417
JSv4 wants to merge 1 commit intomainfrom
claude/resolve-issue-1410-aIXoP

Conversation

@JSv4
Copy link
Copy Markdown
Collaborator

@JSv4 JSv4 commented Apr 30, 2026

Summary

Resolves the seven items raised in #1410 — three medium-impact failure modes and four documentation / observability gaps surfaced during code review of the extract-pipeline work in #1381 / #1380 / #1399.

Closes #1410

Changes

Medium

  1. opencontractserver/tasks/embeddings_task.py:_batch_embed_text_annotations — Transient HTTP errors (requests.Timeout, requests.ConnectionError, EmbeddingServerError) now drop queued sub-batches and shut down the ThreadPoolExecutor with wait=False, cancel_futures=True before re-raising. Previously the exception propagated out of the with block, which calls shutdown(wait=True, cancel_futures=False) by default and blocked Celery autoretry by up to ~max_workers × the per-sub-batch round-trip latency. In-flight HTTP calls cannot be torn down from Python, but at least we no longer wait on them.
  2. opencontractserver/benchmarks/loader.py:force_celery_eager — Refuses to mutate the global Celery config unless settings.MODE == "TEST" or the new OC_BENCHMARK_CLI env var is set; also refuses when task_always_eager is already True (concurrent benchmark runs would race on the global save / restore). The run_benchmark management command now sets OC_BENCHMARK_CLI=1 automatically. Prevents the benchmark helper from silently routing every task in a live web / worker process through the in-process executor.
  3. opencontractserver/tasks/data_extract_tasks.py:_classify_none_result — Verified the implementation already uses isinstance(msg, ModelResponse) / isinstance(part, ToolCallPart) (no string-matching on .kind). Added PydanticAiSchemaCanaryTests so a future pydantic-ai version that renames the discriminators surfaces immediately rather than silently flipping every None extraction into the empty_history mis-classification mode.

Minor

  1. opencontractserver/llms/agents/pydantic_ai_agents.pyPydanticAICorpusAgent._build_structured_system_prompt now says "most legal corpora need multiple targeted queries" instead of "most legal documents …" — corpus-scoped wording for the corpus agent. The document-agent prompt is unchanged.
  2. config/settings/test.py — The DEFAULT_EMBEDDER env-var override is now gated behind an explicit BENCHMARK_MODE=1 env var. Without that opt-in, a stray DEFAULT_EMBEDDER value in the CI environment would silently push the regular test suite onto a real embedder and start making live network calls. The default TestEmbedder keeps regular pytest runs hermetic.
  3. opencontractserver/tasks/data_extract_tasks.py — When model_override is accepted in the unrestricted (operator-only) path because BENCHMARK_ALLOWED_MODEL_OVERRIDES is unset, an INFO-level log line now records the override + cell ID. Lets operators grep production logs to confirm the open mode is fired only by the benchmark tooling and never by an unexpected web/GraphQL caller.
  4. opencontractserver/pipeline/utils.py:get_default_reranker_instance — Docstring now warns that bulk_update / QuerySet.update / data-migration writes to PipelineSettings.default_reranker bypass the auto_now modified field used as part of the cache key, so callers who mutate the singleton via those paths must also call invalidate_reranker_cache() (or touch modified explicitly).

Tests

  • opencontractserver/tests/test_data_extract_failure_classification.py::PydanticAiSchemaCanaryTests — pin the pydantic-ai message-schema discriminators.
  • opencontractserver/tests/test_batch_embedding.py::TestBatchEmbedTextAnnotations::test_transient_error_does_not_block_on_in_flight_peers — asserts the executor fast-fail path completes well inside the 10 s peer-block window. A regression that waits on in-flight peers (the previous default-shutdown behaviour) would take ~10 s and fail this test.
  • opencontractserver/tests/test_benchmarks.py::ForceCeleryEagerSafetyGuardsTestCase — pins the three new force_celery_eager safety refusals.

Test plan

  • CI: pytest opencontractserver/tests/test_data_extract_failure_classification.py opencontractserver/tests/test_batch_embedding.py opencontractserver/tests/test_benchmarks.py passes.
  • CI: full backend suite passes (no regressions in tests that exercise force_celery_eager or the embeddings batch helper).
  • Pre-commit (black / isort / flake8 / pyupgrade) clean on the modified Python files.
  • Manual smoke: python manage.py run_benchmark --benchmark legalbench-rag … still works end-to-end and now logs the OC_BENCHMARK_CLI opt-in implicitly.

Generated by Claude Code

Hardens the extraction-related plumbing introduced by #1381 / #1380 / #1399
against three medium-impact failure modes and four documentation /
observability gaps surfaced during code review.

Medium:
* embeddings_task: transient HTTP errors now drop queued sub-batches and
  shutdown the executor with cancel_futures=True before re-raising, so
  Celery autoretry isn't blocked waiting on in-flight peer round trips.
* benchmarks/loader: force_celery_eager refuses to mutate the global
  Celery config unless MODE=TEST or OC_BENCHMARK_CLI=1; refuses when
  task_always_eager is already True. run_benchmark management command
  sets OC_BENCHMARK_CLI=1 automatically.
* config/settings/test: DEFAULT_EMBEDDER env override is now gated behind
  an explicit BENCHMARK_MODE=1 to prevent stray CI env from silently
  pushing the test suite onto a real embedder.

Minor:
* CorpusAgent prompt now says "most legal corpora" instead of
  "most legal documents" — corpus-scoped wording.
* doc_extract_query_task: log when model_override is accepted in the
  unrestricted (operator-only) path so operators can grep production
  logs.
* get_default_reranker_instance: docstring warns that bulk_update /
  data-migration writes to PipelineSettings.default_reranker bypass the
  auto_now `modified` field used as part of the cache key.

Tests:
* PydanticAiSchemaCanaryTests pin ModelResponse.kind == "response",
  ToolCallPart.part_kind == "tool-call", TextPart.part_kind == "text"
  so a future pydantic-ai version bump surfaces immediately.
* test_transient_error_does_not_block_on_in_flight_peers asserts the
  executor fast-fail path completes well inside the 10s peer block.
* ForceCeleryEagerSafetyGuardsTestCase pins the three new safety
  refusals.

Closes #1410
# ValueError indicates a caller contract violation (e.g., batch size
# exceeds embedder maximum). Re-raise rather than silently recording
# as an annotation failure so the programming error surfaces loudly.
transient_exc = None # programming error path
@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Code Review — PR #1417: Extract pipeline cleanup follow-ups

Overall this is a well-scoped and well-motivated cleanup PR. Each fix is clearly tied to the issue it resolves, tests are purposeful, and the CHANGELOG is properly updated. A few items worth discussing below.


ThreadPoolExecutor fast-fail path (embeddings_task.py)

The core change is sound: replacing the with executor: context manager with a manual try/finally + executor.shutdown(wait=False, cancel_futures=True) avoids blocking Celery autoretry on the default shutdown(wait=True) behavior.

Minor: redundant transient_exc = None in the ValueError branch

except ValueError:
    transient_exc = None  # programming error path  ← already None; no-op
    executor.shutdown(wait=False, cancel_futures=True)
    raise

transient_exc is initialised to None before the loop, so the reassignment is a no-op. The finally block will also call shutdown a second time (idempotently). Neither is a bug, but the two redundant lines add noise. Consider just removing the transient_exc = None line and letting the finally handle shutdown unconditionally.


Timing-based test (test_batch_embedding.py)

The test_transient_error_does_not_block_on_in_flight_peers test is clever — using a threading.Event that never releases to simulate a slow peer is a clean technique.

Minor: call_started event is set but never used

call_started = threading.Event()
...
call_started.set()   # set in the embedder
# never waited on or asserted against in the test body

This looks like leftover scaffolding. It could be removed without changing the test semantics. Alternatively, adding call_started.wait(timeout=2.0) before the assertLess timing check would make the test more deterministic by confirming the first sub-batch actually started before checking the elapsed time.

Minor: peer_call_count is tracked but never asserted

peer_call_count = [0]
...
with self._calls_lock:
    peer_call_count[0] += 1

The variable is set up and incremented but never asserted on. If the intent is to confirm that at least one peer started (so the "fast-fail" path actually had in-flight peers to abandon), adding self.assertGreater(peer_call_count[0], 0) after releasing the event would make that invariant explicit and useful.


force_celery_eager safety guards (benchmarks/loader.py)

The two guards are well-designed. One potential edge case worth documenting:

Concern: already-eager check may bite legitimate test-suite uses

If any test in the suite sets CELERY_TASK_ALWAYS_EAGER = True at the settings level (a common Django testing pattern), calling force_celery_eager() from within a test will hit the "already True" guard and raise, even though that's a valid test-suite scenario. The guard correctly prevents concurrent benchmark runs racing on the global config, but the error message could be improved to distinguish "already set by test settings" from "another benchmark invocation is active":

raise RuntimeError(
    "force_celery_eager() called while task_always_eager is already True. "
    "This usually means Celery is already in eager mode (e.g. set in test "
    "settings) or a concurrent benchmark invocation is active. Concurrent "
    "benchmark runs in the same process are not supported."
)

Minor: test_allows_when_oc_benchmark_cli_env_is_set does not guard against pre-existing eager mode

prev_eager = current_app.conf.task_always_eager
os.environ["OC_BENCHMARK_CLI"] = "1"
try:
    with override_settings(MODE="PROD"):
        with force_celery_eager():   # will raise if prev_eager is True
            ...

If the test suite happens to run with task_always_eager = True in the test settings, this test will unexpectedly fail. A guard like self.skipTest(...) or an assume would make it more robust:

if current_app.conf.task_always_eager:
    self.skipTest("Celery is already in eager mode; cannot test force_celery_eager guard")

config/settings/test.py — BENCHMARK_MODE guard

Clean and correct. The _OC_TEST_EMBEDDER_DEFAULT private-module constant and the explicit BENCHMARK_MODE opt-in both follow project conventions.


data_extract_tasks.py — INFO log for unrestricted model override

Good observability addition. The %r / %s format-string style is consistent with the rest of the codebase.

Nit: consider WARN level

An operator forgetting to set BENCHMARK_ALLOWED_MODEL_OVERRIDES in production is arguably a misconfiguration that warrants logger.warning(...) rather than logger.info(...). At INFO level this line might get lost in normal logs; at WARN it would stand out in production alerting.


pipeline/utils.py docstring addition

Accurate and useful. The auto_now gotcha with bulk_update/QuerySet.update bypassing the cache key is exactly the kind of non-obvious invariant that belongs in a docstring. No issues.


pydantic_ai_agents.py wording fix

Correct. "corpora" → "documents" was clearly copy-paste from the document agent.


PydanticAiSchemaCanaryTests

Good use of getattr(msg, "kind", None) so the test gracefully handles a rename (returns None → assertion fails) rather than raising AttributeError. The "canary" framing is appropriate.


Summary

Area Status
Core fix (executor fast-fail) ✅ Correct and well-tested
force_celery_eager guards ✅ Correct; minor edge cases documented above
BENCHMARK_MODE guard ✅ Clean
Logging addition ✅ Good; consider WARN level
Canary tests ✅ Good design
Timing test ✅ Should pass; two minor nits
CHANGELOG ✅ Properly updated

The redundant transient_exc = None and the unused call_started variable are the only items I'd ask to clean up before merge. The force_celery_eager already-eager / pre-existing test settings edge case is worth at least documenting in a comment even if not acted on now.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 30, 2026

Codecov Report

❌ Patch coverage is 25.00000% with 15 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
opencontractserver/benchmarks/loader.py 0.00% 8 Missing ⚠️
opencontractserver/tasks/embeddings_task.py 50.00% 5 Missing ⚠️
...er/benchmarks/management/commands/run_benchmark.py 0.00% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Extract Pipeline Cleanup

2 participants