Skip to content

Commit d8e8aa7

Browse files
committed
Tunable per-embedder batch caps + concurrent sub-batches + transient hardening
Per-embedder tunables on BaseEmbedder (with global defaults retained): api_batch_size — sub-batch sizing for embed_texts_batch embed_max_concurrent_sub_batches — in-flight sub-batches per task Subclass overrides: OpenAIEmbedder: api_batch_size=256, concurrency=4, max_retries=8 MicroserviceEmbedder: api_batch_size=cap, concurrency=2 (gunicorn workers) calculate_embeddings_for_annotation_batch reads embedder.api_batch_size when carving sub-batches, then submits them to a ThreadPoolExecutor sized by embedder.embed_max_concurrent_sub_batches. DB writes (add_embedding) stay in the calling thread to dodge Django's per-thread connection bookkeeping; the futures only own HTTP work. Hardening: * OpenAI: 429 / APITimeoutError / APIConnectionError now re-raise so celery's autoretry_for=(Exception,) takes over with proper backoff (the SDK absorbs the first OPENAI_CLIENT_MAX_RETRIES with Retry-After honouring; only after that budget exhausts do we surface to celery). AuthenticationError and BadRequestError still return None — those are permanent and shouldn't burn celery retry budget. * Microservice: process-wide requests.Session singleton with HTTPAdapter mounting a urllib3 Retry config (3 attempts, exp backoff, status_forcelist=(429, 502, 503, 504), allowed_methods={POST}, raise_on_status=False, pool_maxsize=16). Connection reuse + transient absorption with no behaviour change on 4xx (still EmbeddingClientError). Tests: * TestOpenAIEmbedderBatch — 11 tests covering empty list / single HTTP call / dimensions handling / count-mismatch defensive guard / re-raise vs return-None for each error class / 30K-char truncation. * TestMicroserviceEmbedderHardening — 6 tests pinning the Session singleton, retry config (covers 429+5xx but not 4xx), POST allowance, pool sizing, and concurrency=2 / api_batch_size=cap defaults. * TestBatchEmbedTextAnnotations — concurrent-sub-batches-fire-in-parallel via threading.Barrier (a serial impl would deadlock and the test would fail), plus a serial-when-concurrency=1 sanity test. Local timings on LegalBench-RAG cuad subset (29 docs, ~2200 annotations): OLD (per-annotation serial) 53m 5s (ingest 50m 26s) + bulk_create + batch task 11m 11s (ingest 7m 59s) + bigger batch + parallel 10m 47s (ingest 7m 25s) + Session + retry (microservice) 9m 24s (ingest 7m 59s) probe_char_recall identical to 4 decimals across all variants — the speedup preserves retrieval quality. docs/deployment/performance_tuning.md captures the run history, phase split, what changed and why, every operator-facing knob (constants, microservice env vars, celery worker concurrency), open work (top suspect: per-annotation guardian writes are dead — visible_to_user doesn't consult them), and seven non-obvious lessons learned.
1 parent 60a0551 commit d8e8aa7

7 files changed

Lines changed: 935 additions & 128 deletions

File tree

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
# Performance Tuning — Document Ingestion
2+
3+
This document captures what we've measured and changed about the OpenContracts
4+
ingestion pipeline, what's left, and the operator-facing knobs available for
5+
tuning a deployment. It exists because "ingestion is slow" is the most common
6+
production complaint and the codebase has accumulated several layered
7+
optimisations that interact in non-obvious ways.
8+
9+
## Where time goes during ingestion
10+
11+
Measured against the **LegalBench-RAG `cuad` subset** (29 documents,
12+
~2,200 paragraph annotations) under two embedder configs. Baseline numbers
13+
are paper-faithful retrieval-only benchmark runs; the harness uses the
14+
production `corpus.import_content` → parser → `import_annotations` chain,
15+
so timings are representative of what a production worker will see when
16+
ingesting the same volume of work.
17+
18+
### Run history
19+
20+
| Variant | Embedder | Total wall-clock | Ingest only | Notes |
21+
|---|---|---:|---:|---|
22+
| Pre-batch baseline | OpenAI 3-large | 53m 5s | 50m 26s | One celery task per annotation; one HTTP round-trip per task |
23+
| `bulk_create` + batch task w/ `embedder_path` | OpenAI 3-large | 11m 11s | 7m 59s | Single batch task per doc, sub-batches of 50 |
24+
| `+ api_batch_size=256, parallel sub-batches=4, max_retries=8, transient re-raise` | OpenAI 3-large | 10m 47s | 7m 25s | Halves HTTP-call count for typical paragraph docs |
25+
| `+ Session pooling, urllib3 retry, parallel sub-batches=2` | Microservice (MiniLM) | 9m 24s | 7m 59s | Matches gunicorn worker count; connection reuse |
26+
27+
### Phase split (microservice run, 9m 24s total)
28+
29+
```
30+
parser + import_annotations + permissions + structural set + file save ~6:30 (~80%)
31+
embedder HTTP calls ~1:30 (~18%)
32+
retrieval probe + report write ~1:24 (~14%)
33+
```
34+
35+
**The embedding endpoint is no longer the bottleneck** for either
36+
provider once batched + concurrent. The remaining 80% is per-doc
37+
fixed overhead in the Django ORM layer.
38+
39+
## What we've changed (and why)
40+
41+
All changes are at module scope — no PipelineSettings, no per-corpus
42+
config, no migrations. Operators get the tuned defaults automatically.
43+
44+
### Embedder-level changes
45+
46+
#### `BaseEmbedder` gained two tunables
47+
48+
```python
49+
class BaseEmbedder(...):
50+
api_batch_size: int = 50 # Default fallback, override per subclass
51+
embed_max_concurrent_sub_batches: int = 1 # Serial by default
52+
```
53+
54+
Per-embedder overrides:
55+
56+
| Embedder | `api_batch_size` | `embed_max_concurrent_sub_batches` | Rationale |
57+
|---|---:|---:|---|
58+
| `OpenAIEmbedder` | 256 | 4 | OpenAI accepts 2048 inputs / ~8M tokens per call; ~96K tokens at 256×typical paragraph fits comfortably. 4 in-flight sub-batches stay inside Tier-1 RPM (3000/min). |
59+
| `MicroserviceEmbedder` | 100 (= service cap) | 2 | Gunicorn `--workers 2` in the reference deployment; matching saturates without queueing. |
60+
61+
`calculate_embeddings_for_annotation_batch` now reads
62+
`embedder.api_batch_size` (with `EMBEDDING_API_BATCH_SIZE` as a fallback
63+
for embedders without an override) when carving sub-batches, and uses a
64+
`ThreadPoolExecutor(max_workers=embed_max_concurrent_sub_batches)` for
65+
the HTTP work. **DB writes (`add_embedding`) stay in the calling thread**
66+
to dodge Django's per-thread connection bookkeeping.
67+
68+
#### OpenAI client hardening
69+
70+
```python
71+
openai.OpenAI(
72+
api_key=...,
73+
base_url=...,
74+
max_retries=8, # Was: SDK default of 2
75+
)
76+
```
77+
78+
The OpenAI SDK retries 429/5xx with exponential backoff and honours the
79+
`Retry-After` header on 429. Raising `max_retries` from 2 → 8 covers
80+
~minute-long rate-limit windows without the celery layer ever needing to
81+
see the failure. After the SDK budget exhausts, our code now **re-raises**
82+
`RateLimitError` / `APITimeoutError` / `APIConnectionError` so celery's
83+
`autoretry_for=(Exception,)` decorator fires (with its 60s countdown).
84+
The previous `return None` swallowed the failure silently.
85+
86+
`AuthenticationError` and `BadRequestError` still return `None` — those
87+
are permanent (wrong key / malformed input), retrying burns budget.
88+
89+
#### Microservice client hardening
90+
91+
```python
92+
# sent_transformer_microservice.py
93+
session = requests.Session()
94+
session.mount("http://", HTTPAdapter(
95+
max_retries=Retry(
96+
total=3, backoff_factor=1.0,
97+
status_forcelist=(429, 502, 503, 504),
98+
allowed_methods={"POST"}, # urllib3 default is GET-only
99+
raise_on_status=False, # Let our code see the final status
100+
),
101+
pool_connections=16, pool_maxsize=16,
102+
))
103+
```
104+
105+
A process-wide singleton session gives us:
106+
107+
1. **Connection pooling.** Every `requests.post` used to open a fresh TCP
108+
handshake. Multiplied by thousands of sub-batches, that's measurable
109+
overhead, and on Cloud Run it's *significant* (TLS too).
110+
2. **urllib3-level retry on transients.** Tighter loop than celery's
111+
60s-countdown autoretry. A 502 blip lasting <1s gets absorbed by
112+
urllib3 in the same request; the embedder code never sees it.
113+
3. **Pool size headroom.** Sized for the highest embedder concurrency
114+
(currently 4 for OpenAI) plus slop. No "Connection pool is full"
115+
warnings under concurrent ingest.
116+
117+
4xx (other than 429) is deliberately *not* in `status_forcelist`. Those
118+
are permanent failures that should surface as `EmbeddingClientError`
119+
immediately, not waste retry budget.
120+
121+
### Annotation-creation changes
122+
123+
#### `import_annotations` uses `bulk_create` + dispatched batch task
124+
125+
```python
126+
# Old: per-row .create() fires post_save → per-annotation embedding task
127+
for ann_data in annotations_data:
128+
Annotation.objects.create(...) # → signal → 1 HTTP round-trip per ann
129+
130+
# New: collect, bulk_create (skips signals), dispatch ONE batch task
131+
instances = [Annotation(...) for ann_data in annotations_data]
132+
Annotation.objects.bulk_create(instances)
133+
# Plus: explicit dispatch with embedder_path so the batch path engages
134+
calculate_embeddings_for_annotation_batch.delay(
135+
annotation_ids=[a.pk for a in instances],
136+
corpus_id=corpus_id,
137+
embedder_path=get_default_embedder_path(), # Critical
138+
)
139+
```
140+
141+
The `embedder_path=` kwarg is load-bearing.
142+
`calculate_embeddings_for_annotation_batch` only takes the
143+
`embed_texts_batch` fast path **when an explicit path is supplied**;
144+
without one it falls through to per-annotation
145+
`_apply_dual_embedding_strategy` — i.e. the slow path we were trying to
146+
remove. We learned this the hard way after the first speedup commit
147+
silently regressed to the per-annotation HTTP loop.
148+
149+
Dual embedding (default + corpus-preferred when different) is mirrored
150+
at dispatch time: one batch task per embedder path, sub-batched by
151+
`EMBEDDING_BATCH_SIZE` to match the production
152+
`corpus_tasks.ensure_embeddings_for_corpus` pattern.
153+
154+
## Operator-facing knobs
155+
156+
### Constants you might tune at deployment scope
157+
158+
| Constant | Default | When to raise | When to lower |
159+
|---|---:|---|---|
160+
| `EMBEDDING_BATCH_SIZE` | 100 | Big-doc corpora (>256 annotations/doc); unlocks parallel sub-batch path | Memory-pressured workers |
161+
| `EMBEDDING_API_BATCH_SIZE` (fallback) | 50 | Embedders without an explicit override ||
162+
| `MICROSERVICE_EMBEDDER_MAX_BATCH_SIZE` | 100 | Set in tandem with the service-side `MAX_TEXTS_PER_BATCH` env var; raising one without the other 400s ||
163+
| `OpenAIEmbedder.api_batch_size` | 256 | Tier 4+ (10M+ TPM); could go to 512 | Tight rate limits; halve to 128 |
164+
| `OpenAIEmbedder.embed_max_concurrent_sub_batches` | 4 | Tier 4+ (10K+ RPM); 8 is safe | Tier 1 if seeing 429s in steady state |
165+
| `OpenAIEmbedder.OPENAI_CLIENT_MAX_RETRIES` | 8 | Hostile rate-limit environments | Fail-fast deployments |
166+
| `MicroserviceEmbedder.embed_max_concurrent_sub_batches` | 2 | Match `gunicorn --workers N` | Single-worker deployments |
167+
168+
### Service-side env vars (microservice deployment)
169+
170+
| Env var | Default | Notes |
171+
|---|---:|---|
172+
| `MAX_TEXTS_PER_BATCH` | 100 | Server-side cap; set together with `MICROSERVICE_EMBEDDER_MAX_BATCH_SIZE` |
173+
| Gunicorn `--workers` | 2 | Each worker is single-threaded; bump for higher concurrency throughput |
174+
| `MAX_BATCH_SIZE` (internal `encode()` chunk) | 8 | Internal sentence-transformer batching; affects per-call latency |
175+
176+
### Celery worker concurrency (deployment)
177+
178+
The `--concurrency` flag on celery workers is a major lever for
179+
multi-document throughput. Default deployments often sit at 4, but a
180+
sentence-transformer microservice with 2 workers can comfortably feed
181+
8+ celery worker concurrency for ingestion (each celery worker fires
182+
one batched embedding HTTP call per doc; workers fan out across the
183+
gunicorn fleet).
184+
185+
```bash
186+
# Example: bump celery-worker concurrency for ingest-heavy deployments
187+
celery -A config.celery_app worker --concurrency=8 -Q ingest
188+
```
189+
190+
## What's still slow (open work)
191+
192+
The 80% per-doc overhead breaks down roughly as:
193+
194+
| Suspected cost | Per-doc estimate | Status |
195+
|---|---:|---|
196+
| Per-annotation `set_permissions_for_obj_to_user` | ~5-15s | **Likely dead work**`AnnotationQuerySet.visible_to_user` doesn't consult `AnnotationUserObjectPermission` |
197+
| Label create/lookup (`load_or_create_labels`) | ~50-100ms | Cached after first doc; small |
198+
| Doc-level embedding (single HTTP call) | ~100-500ms | Could piggyback on annotation batch task |
199+
| Thumbnail generation | ~150ms | Separate celery task; non-blocking on batch |
200+
| Badge auto-check | ~50ms × N tasks | Could move to corpus-level |
201+
| File saves (`txt_extract_file`) | ~50ms | Disk-bound; minimal upside |
202+
203+
**Top suspect: per-annotation permission writes are dead work.**
204+
`AnnotationQuerySet.visible_to_user` doesn't query `AnnotationUserObjectPermission`
205+
— annotation visibility derives from document + corpus + structural +
206+
creator + analysis/extract privacy. The per-annotation `assign_perm` /
207+
`remove_perm` calls in `import_annotations` populate guardian rows that
208+
are never read. Removing them is potentially the single biggest
209+
production-side win remaining (~5-15s/doc on annotation-heavy ingests),
210+
behaviour-preserving, and orthogonal to the embedder work above.
211+
212+
Dual-tracking via `docs/permissioning/consolidated_permissioning_guide.md`
213+
which explicitly states "Annotations & Relationships: NO individual
214+
permissions - inherited from document + corpus."
215+
216+
## Lessons learned
217+
218+
These are non-obvious things that bit us during the speedup work.
219+
Capturing them so future maintainers don't re-discover.
220+
221+
### `bulk_create` skips post_save signals
222+
223+
Django's `Manager.bulk_create()` does NOT fire `pre_save` / `post_save`
224+
signals by default. This was the *load-bearing* property that let us
225+
collapse N per-annotation embedding tasks into one batch dispatch. Any
226+
new signal handler attached to `Annotation.post_save` for ingest-time
227+
work needs to be reflected explicitly in `import_annotations` (e.g.
228+
the badges-tick handler, if it ever needs to apply per-annotation).
229+
230+
### `calculate_embeddings_for_annotation_batch` requires `embedder_path`
231+
232+
The task body has two branches: an `embedder_path`-supplied fast path
233+
that uses `embed_texts_batch`, and a fallback path that loops
234+
per-annotation through `_apply_dual_embedding_strategy`. Dispatching
235+
without `embedder_path=` looks correct from the call site but silently
236+
takes the slow branch. Always pass it.
237+
238+
### Default `urllib3.Retry` is GET-only
239+
240+
`urllib3.util.retry.Retry()` defaults to retrying only safe HTTP methods.
241+
Embedding endpoints are POST. You must set
242+
`allowed_methods=frozenset(["POST"])` (or a superset) or no retries fire.
243+
The default behaviour was masked in our case because the celery
244+
autoretry layer compensates — we just ate the 60s countdown unnecessarily.
245+
246+
### Django connections are per-thread
247+
248+
When fanning HTTP work across a `ThreadPoolExecutor`, each thread gets
249+
its own Django DB connection lazily. Doing ORM writes from worker
250+
threads is correct but requires explicit `connections.close_all()` at
251+
thread exit, otherwise idle connections accumulate against
252+
`max_connections`. We sidestepped this by keeping DB writes in the
253+
calling thread and giving worker threads only the HTTP work.
254+
255+
### `max_retries` on `openai.OpenAI()` honours `Retry-After`
256+
257+
The Anthropic and OpenAI SDKs both honour the server's `Retry-After`
258+
header on 429 responses out of the box. Raising the SDK-level
259+
`max_retries` is the correct lever for handling rate-limit blips, *not*
260+
adding a custom retry loop in the embedder. Re-raising RateLimitError
261+
*after* the SDK budget exhausts lets celery take over with its own
262+
backoff for the genuinely-long rate-limit windows.
263+
264+
### `force_celery_eager()` serializes docs in the benchmark
265+
266+
The benchmark loader uses `force_celery_eager()` so every doc's celery
267+
chain runs synchronously in the calling thread before the next doc
268+
starts. **Production doesn't have this property** — each doc upload is
269+
its own celery task and they fan out across worker concurrency. Don't
270+
optimise the benchmark loader's serial loop and quote the resulting
271+
speedup as a production-side gain; the production gain is whatever
272+
celery worker concurrency already gave you.
273+
274+
### Module-level `requests.Session` pays for itself
275+
276+
For any client that fires many small POSTs to the same host (embedding,
277+
reranking, parsing microservices), a process-wide `requests.Session`
278+
with a sized `HTTPAdapter` cuts handshake overhead measurably. The
279+
pattern in `sent_transformer_microservice._get_session()` is a
280+
ready-to-copy template — pair it with a `urllib3.Retry` config and
281+
you've got connection reuse + transient-failure absorption for free.
282+
283+
## How to validate a tuning change locally
284+
285+
1. Wipe the DB:
286+
```bash
287+
docker compose -f local.yml run --rm django python manage.py shell -c "
288+
from opencontractserver.documents.models import Document, DocumentPath
289+
from opencontractserver.annotations.models import Annotation, StructuralAnnotationSet
290+
from opencontractserver.corpuses.models import Corpus
291+
from opencontractserver.extracts.models import Extract, Fieldset, Datacell, Column
292+
for m in [Datacell, Extract, Column, Fieldset, Annotation, DocumentPath,
293+
Document, StructuralAnnotationSet, Corpus]:
294+
m.objects.all().delete()
295+
"
296+
```
297+
298+
2. Stage a pipeline config (see `docs/benchmarks/legalbench_rag_results.md`
299+
for Config A/B/C templates).
300+
301+
3. Run the cuad subset of LegalBench-RAG retrieval-only:
302+
```bash
303+
docker compose -f local.yml run --rm \
304+
-v /path/to/legalbenchrag_data:/data/legalbenchrag:ro \
305+
-v /tmp/runs:/data/runs \
306+
django python manage.py run_benchmark \
307+
--benchmark legalbench-rag --path /data/legalbenchrag \
308+
--user admin --top-k 32 --retrieval-only --corpus-wide \
309+
--subsets cuad --run-dir /data/runs/cuad_$(date +%s)
310+
```
311+
312+
4. Read `Ingesting N documents` and `Creating N columns` log timestamps to
313+
isolate the ingest phase from retrieval/eval. The wall-clock
314+
between those two log lines is the ingestion bottleneck under test.
315+
316+
5. Compare against the run-history table above. Speedups should
317+
reproduce within ~10% on the same hardware.
318+
319+
## Cross-references
320+
321+
- LegalBench-RAG benchmark methodology and results:
322+
[`docs/benchmarks/legalbench_rag_results.md`](../benchmarks/legalbench_rag_results.md)
323+
- Permission system (why annotation-level permissions are dead work):
324+
[`docs/permissioning/consolidated_permissioning_guide.md`](../permissioning/consolidated_permissioning_guide.md)
325+
- Pipeline component architecture:
326+
[`docs/pipelines/pipeline_overview.md`](../pipelines/pipeline_overview.md)

opencontractserver/constants/document_processing.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@
2626
# Controls how many annotations are processed per Celery task to prevent queue flooding
2727
EMBEDDING_BATCH_SIZE = 100
2828

29-
# Maximum number of texts to send in a single embedder API batch request.
30-
# This is the sub-batch size used *within* a Celery task when calling
31-
# embedder.embed_texts_batch(). Kept separate from EMBEDDING_BATCH_SIZE
32-
# (task-level grouping) because API limits may differ from task sizing.
33-
# Must be <= MICROSERVICE_EMBEDDER_MAX_BATCH_SIZE (currently 100).
29+
# Default sub-batch size used *within* a Celery task when calling
30+
# ``embedder.embed_texts_batch()``. Kept separate from
31+
# ``EMBEDDING_BATCH_SIZE`` (task-level grouping) because API limits may
32+
# differ from task sizing. This is a global *fallback* — concrete
33+
# embedders should override ``BaseEmbedder.api_batch_size`` with a value
34+
# appropriate for their provider (OpenAI accepts up to 2048 inputs;
35+
# the local microservice caps at MICROSERVICE_EMBEDDER_MAX_BATCH_SIZE).
36+
# Must be <= MICROSERVICE_EMBEDDER_MAX_BATCH_SIZE for the microservice
37+
# embedder; the system check in documents/checks.py validates this.
3438
EMBEDDING_API_BATCH_SIZE = 50
3539

3640
# Maximum number of texts accepted by MicroserviceEmbedder.embed_texts_batch().

opencontractserver/pipeline/base/embedder.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,32 @@ class BaseEmbedder(PipelineComponentBase, ABC):
4545
# Override in subclasses to add multimodal support
4646
supported_modalities: set[ContentModality] = {ContentModality.TEXT}
4747

48+
# ------------------------------------------------------------------ #
49+
# Batch-embedding tunables (override per subclass)
50+
# ------------------------------------------------------------------ #
51+
#
52+
# ``api_batch_size``: maximum number of inputs sent in one
53+
# ``embed_texts_batch`` call to the underlying provider. The
54+
# ``calculate_embeddings_for_annotation_batch`` task reads this when
55+
# carving annotations into sub-batches. Defaults to the global
56+
# ``EMBEDDING_API_BATCH_SIZE`` constant; subclasses pick a value
57+
# appropriate for the provider's published batch limit and the
58+
# subclass's typical input-token budget.
59+
#
60+
# ``embed_max_concurrent_sub_batches``: how many sub-batches the
61+
# embedding task is allowed to fly in parallel against the provider.
62+
# 1 means strictly sequential (the historical behaviour). Subclasses
63+
# that talk to a high-throughput hosted API can raise this to push
64+
# ingest wall-clock down. The task uses a thread pool, so concurrent
65+
# sub-batches share the same Python process — keep this modest
66+
# (4-8 is usually enough; higher just pressures provider RPM).
67+
#
68+
# Both are class attributes (not instance settings) so they don't
69+
# need to round-trip through ``PipelineSettings`` for what's a static
70+
# provider characteristic. Override at class definition time.
71+
api_batch_size: int = 50
72+
embed_max_concurrent_sub_batches: int = 1
73+
4874
# Convenience properties derived from supported_modalities
4975
@property
5076
def is_multimodal(self) -> bool:

0 commit comments

Comments
 (0)