perf: speed up pgwire INSERT path and flush throughput#45
Conversation
Five changes targeting the production symptom of TF being unable to keep up with consumer ingest rate. Validated with bench/concurrent_load.py and bench/latency_probe.py against a local MinIO. 1. buffered_write_layer.rs — insert path no longer awaits flush_all_now under memory pressure. The previous behaviour stalled pgwire/gRPC threads on S3 commits and held the global flush_lock so one slow tenant froze ingest for everyone. The safety net is the existing hard-limit reject in try_reserve_memory + pressure_notify waking the background flush task. 2. database.rs — insert_records_batch now returns the URIs of files newly added by the commit, derived from the post-write snapshot under the same write lock. Callers (main.rs, bootstrap.rs) drop their pre/ post list_file_uris calls. Eliminates two redundant update_state log scans per flushed bucket — at prod scale (~2,700 add_files) that's the ~200-300ms per-bucket tax that was dominating flush latency. 3. buffered_write_layer.rs — flush_completed_buckets coalesces per (project_id, table_name): one Delta commit per (project, table) per cycle instead of one per bucket. Each commit pays a fixed log-scan + JSON + S3 RTT, so collapsing N micro-commits into one turns N×O(commit) into 1×O(commit). Also eliminates read-write contention measured as p95 dropping from 122-151ms to 70-85ms on the 8-writer + 4-reader bench. 4. buffered_write_layer.rs — tantivy index build spawned off the flush critical path. The Delta commit no longer waits on tar.zst + S3 upload; F4's coalescing naturally bounds the per-cycle tantivy queue depth to num_tables. 5. plan_cache.rs — new handle_extended_query for DML that substitutes params, folds CAST(Literal, T) into Literal_T, then executes. The insert_coerce pass wraps every $N in CAST so pgwire infers placeholder types; after substitution those CAST(literal, T) exprs stayed in every ValuesExec cell and were re-evaluated per (row, col) — at 88 cols x 30+ rows this was ~9-10ms/row of pure pipeline overhead, measured. Folding them up front drops single-conn throughput from 100 r/s to 161 r/s (+61%) and 16-writer throughput from ~440 r/s to 607 r/s (+38%).
Code Review:
|
Extends the plan_cache hook to short-circuit `Dml(Insert) → [Projection →] Values(literals)` after `replace_params_with_values`. Builds the RecordBatch from the substituted literals (with column reorder + rename to match the projection), looks up the target ProjectRoutingTable, and calls a new `fast_insert_batch` that does just the per-batch fixups (`convert_variant_columns`, project-id routing) before `insert_records_batch`. Skips ValuesExec, DataSinkExec, the write_all stream loop, and the per-cell expression evaluation that was the residual ~5-6 ms/row cost after the CAST(Literal) fold. Handles two non-trivial projection shapes that the optimizer produces: - column reorder (Projection emits columns in a different order than Values) - constant-folded NULL defaults (`Alias(Literal, name)` for cols absent from the INSERT) Non-matching plans (subqueries in VALUES, UDFs, etc.) fall back to the existing fold→execute path. SELECTs still go to the vendored handler. Measured on bench/latency_probe.py + bench/concurrent_load.py with the 88-col `otel_logs_and_spans` schema (debug build, local MinIO): scenario master after PR45 after this PR 1 conn b=100 100 r/s 161 r/s 359 r/s (3.6x) 1 conn b=500 102 r/s 162 r/s 366 r/s 8 writers 430 r/s 364 r/s 1,197 r/s (2.8x) 16 writers ~440 r/s 607 r/s 1,218 r/s (2.8x) 32 writers 188 r/s 137 r/s 340 r/s (1.8x) Per-row latency dropped 9.8 ms → 2.8 ms at batch=100. Fast-path hit-rate in the bench was 100% (2,919/2,919 inserts), zero fallback skips for the all-placeholder 88-col INSERT shape monoscope sends.
Update: pgwire INSERT fast path landed (39aa993)Extended the plan_cache hook to bypass DataFusion's executor entirely for Cumulative results (debug build, local MinIO):
Fast path hit rate in the bench: 100% (2,919/2,919 inserts). All-placeholder 88-col INSERTs (exactly what monoscope sends) go through the bypass. 8 writers at 1,197 r/s in debug ≈ prod's 1,491 r/s requirement at debug, so release builds should clear it comfortably across realistic concurrency. Known follow-ups still outstanding (separate PRs):
|
Code Review — perf: speed up pgwire INSERT path and flush throughputThe motivation is clear and the benchmark numbers are compelling. The five changes are architecturally sound individually, but a few correctness and maintainability concerns need addressing before merge. Overview
Potential Bugs / Correctness1. WAL advance failure does not prevent drain ( if let Err(e) = self.wal.advance_by_counts(...) {
warn!("WAL advance_by_counts failed: {}", e);
}
for bucket_id in &source_bucket_ids {
self.mem_buffer.drain_bucket(...); // proceeds unconditionally
}If 2. fn absorb(&mut self, b: FlushableBucket) {
if self.project_id.is_empty() {
self.project_id = b.project_id.clone();
self.table_name = b.table_name.clone();
}
...
}If 3. Pre-existing test failures are still red The PR description acknowledges 4. Detached tantivy tokio::spawn(async move {
if let Err(e) = cb(pid.clone(), tname.clone(), batches, added_files).await {
...
}
});No 5. The local copy hard-codes the Code Quality / Maintainability6. Dead code should be removed, not left with /// Currently unused — kept for reference; the `try_fast_path_insert`
/// bypass is cleaner …
#[allow(dead_code)]
fn values_to_memtable(plan: LogicalPlan) -> …If it's not used, remove it. Git history is the reference. Leaving ~55 lines of dead code in a production hot path creates maintenance debt and confusion for readers. 7. This function is on the critical INSERT path and has many Performance Considerations8. F1 safety net — is The PR says the 120% hard-limit reject in 9. Previously these counted bucket flushes; now they count source bucket IDs within a coalesced commit. The metric value is preserved numerically, but its meaning has changed subtly: a coalesced group can now be one "flush" in internal tracing but N in these counters. If any dashboards or alerts threshold on these counters, verify the fan-out effect doesn't trigger false positives. Minor / Nits
SummaryThe design is solid and the performance wins are real. The blocking items before merge are:
Items 6–9 are recommended but not blocking. 🤖 Generated with Claude Code |
Addresses PR review feedback: - CRITICAL (#1): drain MemBuffer ONLY after WAL advance_by_counts succeeds. The previous flow committed to Delta, ignored WAL advance failures, and drained anyway — restart would not replay those rows (gone from MemBuffer) and the WAL cursor wouldn't reflect them, so rows could be lost. Now on advance failure we leave the buckets in MemBuffer for the next cycle to retry; Delta dedup_keys make the re-commit idempotent on the downstream side. - #2 fragile sentinel: CoalescedGroup used `project_id.is_empty()` as the first-bucket marker, which silently overwrote on every absorb if the project_id was ever "". Switched to `Option<(project_id, table)>` and `get_or_insert_with`. Also satisfies clippy `unwrap_or_default` via `.entry(..).or_default()`. - #4 unbounded tantivy spawning: added a 16-permit semaphore so a cycle where many tables flush together can't fan out beyond S3 / tantivy-heap limits. Permit is acquired inside the spawned task so the Delta critical path still returns immediately. Graceful-shutdown await of in-flight tantivy builds is a follow-up (documented in the struct field). - #5 dml_completion: explicit re-vendor marker so a future datafusion-postgres bump can't silently diverge from upstream (INSERT tag format / count column name / count Arrow type). - #6 dead code: deleted the unused values_to_memtable function and trimmed its now-unused imports. - #9 metrics semantics: counter still counts source bucket IDs (work units done) — documented the change in meaning so dashboards computing "commits per minute" from it don't silently overstate the real Delta commit rate. - nit: trace! log on fold_literal_casts cast misses so unexpected fall-throughs are diagnosable without polluting the hot path. Also fixes pre-existing clippy warnings blocking this PR's CI: - src/dml.rs:310 `identify_target_side` complex tuple — added a scoped `#[allow(clippy::type_complexity)]` with rationale. Plus `cargo fmt` over the touched files. Bench numbers unchanged (1-conn batch=100 ~380 r/s, matches the prior 359 r/s within noise; per-row 2.6 ms/row).
Review fixes (commit d0cfae4)Applied:
CI unblockers:
Deferred (documented):
|
Code Review: perf/insert-path-optimizationsOverview: This PR delivers five targeted optimizations to address Redpanda consumer lag by reducing per-row INSERT latency and flush throughput overhead. The headline numbers (61% single-connection improvement, 38% concurrent improvement) are impressive and the benchmarking methodology is solid. The changes are generally well-reasoned, but there are several issues worth addressing before shipping to production. High Priority1. Memory pressure now creates a cliff, not a slope
The old behavior gave true soft backpressure: inserts stalled (briefly) while the buffer flushed, which created organic flow control. The new behavior:
Between 80% and 120%, inserts still return fast (good), but the background flush task has to drain ~40% of the buffer before the next insert starts being rejected. Under a sustained write storm, the 120% cliff could be hit before the background flush completes a full cycle, causing a burst of rejections rather than gradual backpressure. Consider: does 2. WAL advance failure leaves buckets that already committed to Delta
If This needs explicit verification:
This is a data correctness concern, not just a performance risk. 3. Missing tests for the two largest changes The PR passes the existing test suite but adds no new tests for:
Medium Priority4.
The function is explicitly copied from a
5. Tantivy detached tasks are not awaited on shutdown
The comment acknowledges this. The risk is: after a clean shutdown, the last N tantivy uploads are silently dropped. If a downstream consumer relies on tantivy index completeness (e.g. for full-text search over recent data), there will be a gap after every restart/deploy. At minimum, storing the 6. Counter semantic change will break any "commits per minute" alert
The counters Low Priority / Style7. Semaphore capacity hardcoded to 16
This is a reasonable default but isn't configurable. If a deployment has 50+ distinct tables (multi-tenant), the semaphore becomes a throughput limiter. Consider wiring it through 8.
9.
URL = "host=127.0.0.1 port=12345 user=postgres password=postgres dbname=postgres"Fine for a local bench script, but if this ever gets copy-pasted into CI config it will fail or worse embed credentials. Reading from env vars ( What's Well Done
Summary: The performance wins are real and the approach is sound. Before merging, I'd prioritize: (1) verifying dedup_keys exist or adding them if not, (2) adding tests for |
Summary
Five changes targeting the production symptom of TF being unable to keep up with consumer ingest rate (Redpanda dropping events). Validated end-to-end against a local MinIO +
bench/concurrent_load.pyand a newbench/latency_probe.py.Headline numbers (debug build, single connection, 100-row INSERT):
Concurrent ingest (8/16 writers, 30-row batches):
Per-cycle flush throughput on the same bench: 8 buckets fully drained in 30s vs 1 bucket in master (matches the ~5-8× improvement predicted by the hot-path analysis).
What changed
Insert path no longer awaits S3 under memory pressure (
buffered_write_layer.rs). Previous behaviour stalled pgwire/gRPC threads on Delta commits and held the globalflush_lockso one slow tenant froze ingest for everyone. Existing hard-limit reject +pressure_notifyare the safety nets.Eliminate redundant Delta log scans per flush (
database.rs,main.rs,bootstrap.rs).insert_records_batchnow returns the URIs of files added by the commit, derived from the post-write snapshot under the same write lock. Callers drop their pre/postlist_file_uriscalls. At prod scale (~2,700 add_files) those redundantupdate_statescans were the ~200-300ms per-bucket tax dominating flush latency.Coalesce per-(project, table) commits (
buffered_write_layer.rs).flush_completed_bucketsgroups buckets by(project_id, table_name)and writes one Delta commit per (project, table) per cycle. NewCoalescedGroupaccumulator merges batches, sums per-shard WAL counts, and takes max per-shard WAL positions. Each commit pays a fixed log-scan + JSON + S3 RTT, so N micro-commits → 1 commit turns N×O(commit) into 1×O(commit). Also dropped read p95 from 122-151ms to 70-85ms.Spawn tantivy build off the flush critical path (
buffered_write_layer.rs). Delta commits no longer wait on tar.zst + S3 upload. F4's coalescing naturally bounds the per-cycle tantivy queue depth tonum_tables.Fold CAST(Literal) in the plan-cache hook for DML (
plan_cache.rs). Theinsert_coercepass wraps every$Nplaceholder inCAST($N AS T)so pgwire infers placeholder types correctly. Afterreplace_params_with_valuesthose casts becomeCAST(literal AS T)inside everyValuesExeccell, re-evaluated per (row, column) — ~9-10ms/row of pure overhead at the 88-col schema, measured. The newhandle_extended_querysubstitutes params, foldsCAST(Literal, T)toLiteral_Tvia a small tree walker, and then executes viasession_context.execute_logical_plan. SELECTs fall through to the vendored path.Test plan
cargo test --lib— 140 passed, 2 pre-existing failures on master (test_flushable_buckets_carry_all_batches,test_batch_queue_under_load), verified viagit stash. No new failures.bench/latency_probe.py— single-conn per-row latency at batch sizes 1, 10, 30, 100, 500.bench/concurrent_load.py --writers {8,16,32}— multi-tenant concurrent ingest matching the prod symptom of many projects writing to the unifiedotel_logs_and_spanstable.Known follow-ups (not in this PR)
PhysicalOptimizerRulerecognizingValuesExec(all_literals)and replacing with a precomputedMemoryExecwould attack the remaining ~6ms/row. Larger change, not done here.