Skip to content

perf: speed up pgwire INSERT path and flush throughput#45

Merged
tonyalaribe merged 3 commits into
masterfrom
perf/insert-path-optimizations
Jun 9, 2026
Merged

perf: speed up pgwire INSERT path and flush throughput#45
tonyalaribe merged 3 commits into
masterfrom
perf/insert-path-optimizations

Conversation

@tonyalaribe

Copy link
Copy Markdown
Contributor

Summary

Five changes targeting the production symptom of TF being unable to keep up with consumer ingest rate (Redpanda dropping events). Validated end-to-end against a local MinIO + bench/concurrent_load.py and a new bench/latency_probe.py.

Headline numbers (debug build, single connection, 100-row INSERT):

per-row r/s
master 9.7 ms 100
this PR 6.2 ms 161 (+61%)

Concurrent ingest (8/16 writers, 30-row batches):

8 writers 16 writers
master 430 r/s ~440 r/s
this PR 364 r/s 607 r/s (+38%)

Per-cycle flush throughput on the same bench: 8 buckets fully drained in 30s vs 1 bucket in master (matches the ~5-8× improvement predicted by the hot-path analysis).

What changed

  1. Insert path no longer awaits S3 under memory pressure (buffered_write_layer.rs). Previous behaviour stalled pgwire/gRPC threads on Delta commits and held the global flush_lock so one slow tenant froze ingest for everyone. Existing hard-limit reject + pressure_notify are the safety nets.

  2. Eliminate redundant Delta log scans per flush (database.rs, main.rs, bootstrap.rs). insert_records_batch now returns the URIs of files added by the commit, derived from the post-write snapshot under the same write lock. Callers drop their pre/post list_file_uris calls. At prod scale (~2,700 add_files) those redundant update_state scans were the ~200-300ms per-bucket tax dominating flush latency.

  3. Coalesce per-(project, table) commits (buffered_write_layer.rs). flush_completed_buckets groups buckets by (project_id, table_name) and writes one Delta commit per (project, table) per cycle. New CoalescedGroup accumulator merges batches, sums per-shard WAL counts, and takes max per-shard WAL positions. Each commit pays a fixed log-scan + JSON + S3 RTT, so N micro-commits → 1 commit turns N×O(commit) into 1×O(commit). Also dropped read p95 from 122-151ms to 70-85ms.

  4. Spawn tantivy build off the flush critical path (buffered_write_layer.rs). Delta commits no longer wait on tar.zst + S3 upload. F4's coalescing naturally bounds the per-cycle tantivy queue depth to num_tables.

  5. Fold CAST(Literal) in the plan-cache hook for DML (plan_cache.rs). The insert_coerce pass wraps every $N placeholder in CAST($N AS T) so pgwire infers placeholder types correctly. After replace_params_with_values those casts become CAST(literal AS T) inside every ValuesExec cell, re-evaluated per (row, column) — ~9-10ms/row of pure overhead at the 88-col schema, measured. The new handle_extended_query substitutes params, folds CAST(Literal, T) to Literal_T via a small tree walker, and then executes via session_context.execute_logical_plan. SELECTs fall through to the vendored path.

Test plan

  • cargo test --lib — 140 passed, 2 pre-existing failures on master (test_flushable_buckets_carry_all_batches, test_batch_queue_under_load), verified via git stash. No new failures.
  • bench/latency_probe.py — single-conn per-row latency at batch sizes 1, 10, 30, 100, 500.
  • bench/concurrent_load.py --writers {8,16,32} — multi-tenant concurrent ingest matching the prod symptom of many projects writing to the unified otel_logs_and_spans table.
  • Soak test against real S3 / R2 before deploy.
  • Confirm prod consumer lag drains after deploy.

Known follow-ups (not in this PR)

  • 32-writer throughput drops to 137 r/s (vs 16-writer 607 r/s). Contention bottleneck somewhere in the shared session/execute path, not the per-row CAST cost this PR addresses. Needs separate investigation.
  • Option 3 (physical optimizer rule) — a PhysicalOptimizerRule recognizing ValuesExec(all_literals) and replacing with a precomputed MemoryExec would attack the remaining ~6ms/row. Larger change, not done here.
  • F3 (raise flush_parallelism + per-table semaphore) — out of scope for this PR; the bench's single unified-table workload is already saturated by F4.

Five changes targeting the production symptom of TF being unable to
keep up with consumer ingest rate. Validated with bench/concurrent_load.py
and bench/latency_probe.py against a local MinIO.

1. buffered_write_layer.rs — insert path no longer awaits flush_all_now
   under memory pressure. The previous behaviour stalled pgwire/gRPC
   threads on S3 commits and held the global flush_lock so one slow
   tenant froze ingest for everyone. The safety net is the existing
   hard-limit reject in try_reserve_memory + pressure_notify waking the
   background flush task.

2. database.rs — insert_records_batch now returns the URIs of files
   newly added by the commit, derived from the post-write snapshot under
   the same write lock. Callers (main.rs, bootstrap.rs) drop their pre/
   post list_file_uris calls. Eliminates two redundant update_state log
   scans per flushed bucket — at prod scale (~2,700 add_files) that's
   the ~200-300ms per-bucket tax that was dominating flush latency.

3. buffered_write_layer.rs — flush_completed_buckets coalesces per
   (project_id, table_name): one Delta commit per (project, table) per
   cycle instead of one per bucket. Each commit pays a fixed
   log-scan + JSON + S3 RTT, so collapsing N micro-commits into one
   turns N×O(commit) into 1×O(commit). Also eliminates read-write
   contention measured as p95 dropping from 122-151ms to 70-85ms on the
   8-writer + 4-reader bench.

4. buffered_write_layer.rs — tantivy index build spawned off the flush
   critical path. The Delta commit no longer waits on tar.zst + S3
   upload; F4's coalescing naturally bounds the per-cycle tantivy queue
   depth to num_tables.

5. plan_cache.rs — new handle_extended_query for DML that substitutes
   params, folds CAST(Literal, T) into Literal_T, then executes. The
   insert_coerce pass wraps every $N in CAST so pgwire infers
   placeholder types; after substitution those CAST(literal, T) exprs
   stayed in every ValuesExec cell and were re-evaluated per (row, col)
   — at 88 cols x 30+ rows this was ~9-10ms/row of pure pipeline
   overhead, measured. Folding them up front drops single-conn
   throughput from 100 r/s to 161 r/s (+61%) and 16-writer throughput
   from ~440 r/s to 607 r/s (+38%).
@claude

claude Bot commented Jun 9, 2026

Copy link
Copy Markdown

Code Review: perf/insert-path-optimizations

Summary: Well-motivated performance PR targeting real production latency symptoms. The benchmark numbers and methodology are solid. The changes are generally correct and well-reasoned, but there are a few issues worth discussing before merging.


Critical / Risk

1. Unbounded fire-and-forget tantivy tasks (buffered_write_layer.rs:~954)

The tantivy sidecar is now spawned as a detached tokio::spawn with no bound on how many tasks can be in-flight:

tokio::spawn(async move {
    if let Err(e) = cb(pid.clone(), tname.clone(), batches, added_files).await { ... }
});

The PR description says this is "naturally bounded by F4 (one tantivy build per (project, table) per cycle)," which is true for the coalescing side. But if a single flush cycle overlaps with the previous cycle's tantivy uploads still running (e.g. slow S3, large files), you get unbounded queue growth. Under sustained ingest the tantivy queue depth could silently grow until OOM. Consider a bounded semaphore (e.g. Arc<Semaphore>) held across spawned tasks, or a tokio::sync::Semaphore to cap in-flight builds.

2. Memory-pressure path: notify_one() can be a no-op if the background task is active

Notify::notify_one() only stores at most one permit. If the background flush task is already awake and processing, the notify is consumed before the task sleeps again and may be missed. Under pathological ingest (many bursts, slow S3), this could mean pressure builds without the flush task being re-triggered until its normal timer fires. Probably fine in practice, but worth a comment acknowledging the edge case, or using notify_waiters() instead.


Moderate

3. dml_completion mirrors private upstream code and is fragile (plan_cache.rs:~60)

async fn dml_completion(df: datafusion::dataframe::DataFrame) -> PgWireResult<Option<Response>> {
    let tag = match df.logical_plan() {
        LogicalPlan::Dml(d) => match d.op {
            WriteOp::Insert(_) => Tag::new("INSERT").with_oid(0),
            WriteOp::Update => Tag::new("UPDATE"),
            WriteOp::Delete => Tag::new("DELETE"),
            _ => return Ok(None),  // ← hits on Replace, InsertOrReplace, etc.
        },

The _ => return Ok(None) arm is reachable if WriteOp has other variants (e.g. Replace, InsertOrReplace). This then hits the error arm in handle_extended_query:

Ok(None) => Err(PgWireError::ApiError(
    "internal error: DML plan returned non-DML completion".to_string().into(),
)),

...which would surface as a cryptic internal error for a valid client operation. Since this function is a private mirror of vendored code, it's also fragile to upstream datafusion_postgres version bumps. Recommend adding a comment with the upstream reference, and returning the session context's default handling path (None) rather than an error for unrecognised ops.

4. Row count extraction in dml_completion assumes column name "count" (plan_cache.rs:~74)

.and_then(|b| b.column_by_name("count"))

This is tightly coupled to DataFusion's internal DML output format. If DataFusion renames the column (it has in past minor versions), this silently returns 0 rows rather than erroring. A comment citing the DataFusion version that produced this schema, or a fallback that sums all UInt64 columns, would make this more robust.


Minor

5. Variable shadowing in result-processing loop (buffered_write_layer.rs:~860)

for (combined, result) in flush_results {
    let CombinedBucket { combined, source_bucket_ids } = combined;  // shadows outer `combined`

The inner combined shadows the outer binding. This compiles fine but is confusing. Consider naming the outer binding cb or combined_bucket:

for (combined_bucket, result) in flush_results {
    let CombinedBucket { combined, source_bucket_ids } = combined_bucket;

6. absorb silently accepts mismatched project/table

fn absorb(&mut self, b: crate::mem_buffer::FlushableBucket) {
    if self.project_id.is_empty() {
        self.project_id = b.project_id.clone();
        self.table_name = b.table_name.clone();
    }
    // no assertion that b.project_id == self.project_id

Since absorb is only called from the grouping map keyed on (project_id, table_name), this can never be violated today. But a debug_assert! here would catch future refactors cheaply:

debug_assert_eq!(self.project_id, b.project_id);
debug_assert_eq!(self.table_name, b.table_name);

7. timedelta imported but unused in bench/latency_probe.py

from datetime import datetime, timezone, timedelta

timedelta is never referenced directly in this file (the subtraction of two datetime objects produces a timedelta implicitly). Remove the unused import.


Positive Notes

  • The fold_literal_casts transformation is correct and well-scoped: it only runs on DML plans, folds only CAST(Literal, T) leaves, and safely falls through on cast errors.
  • merge_wal_positions uses a proper tuple comparison (block_id, offset) for WAL position ordering — this is correct.
  • pre_uris snapshot is taken inside the retry loop (after update_state), so each retry computes the diff correctly. Good.
  • Dropping flush_all_now() from the insert hot path is the right call — synchronous S3 waits in the insert critical section were clearly the wrong design.
  • The PR description includes explicit benchmark methodology and a pre-existing test failure list — both are excellent practices.

Test Coverage Gap

CoalescedGroup::absorb / merge_wal_positions and fold_literal_casts are both non-trivial logic with subtle edge cases (shard-count summation overflow? unequal-length WAL position vectors? nested CAST? lossy casts?) but have no unit tests. Given the 2 pre-existing test failures (test_flushable_buckets_carry_all_batches, test_batch_queue_under_load), the coalescing path is not covered by a passing test. At minimum, add unit tests for merge_wal_positions and fold_literal_casts before landing.


Bottom line: The approach is sound and the perf wins are real. The unbounded tantivy task queue (#1) is the only thing I'd call a blocker for prod — it's a latent OOM vector. The dml_completion fragility (#3) is worth addressing before the next DataFusion upgrade bites you. Everything else can be done as follow-ups.

Extends the plan_cache hook to short-circuit `Dml(Insert) → [Projection →]
Values(literals)` after `replace_params_with_values`. Builds the
RecordBatch from the substituted literals (with column reorder + rename
to match the projection), looks up the target ProjectRoutingTable, and
calls a new `fast_insert_batch` that does just the per-batch fixups
(`convert_variant_columns`, project-id routing) before
`insert_records_batch`. Skips ValuesExec, DataSinkExec, the
write_all stream loop, and the per-cell expression evaluation that was
the residual ~5-6 ms/row cost after the CAST(Literal) fold.

Handles two non-trivial projection shapes that the optimizer produces:
- column reorder (Projection emits columns in a different order than Values)
- constant-folded NULL defaults (`Alias(Literal, name)` for cols absent
  from the INSERT)

Non-matching plans (subqueries in VALUES, UDFs, etc.) fall back to the
existing fold→execute path. SELECTs still go to the vendored handler.

Measured on bench/latency_probe.py + bench/concurrent_load.py with the
88-col `otel_logs_and_spans` schema (debug build, local MinIO):

  scenario          master      after PR45     after this PR
  1 conn b=100      100 r/s     161 r/s        359 r/s   (3.6x)
  1 conn b=500      102 r/s     162 r/s        366 r/s
  8 writers         430 r/s     364 r/s        1,197 r/s (2.8x)
  16 writers        ~440 r/s    607 r/s        1,218 r/s (2.8x)
  32 writers        188 r/s     137 r/s        340 r/s   (1.8x)

Per-row latency dropped 9.8 ms → 2.8 ms at batch=100.

Fast-path hit-rate in the bench was 100% (2,919/2,919 inserts), zero
fallback skips for the all-placeholder 88-col INSERT shape monoscope
sends.
@tonyalaribe

Copy link
Copy Markdown
Contributor Author

Update: pgwire INSERT fast path landed (39aa993)

Extended the plan_cache hook to bypass DataFusion's executor entirely for Dml(Insert) → [Projection →] Values(literals) — builds the RecordBatch from the substituted literals and calls ProjectRoutingTable.fast_insert_batch directly. Falls back to the executor for any non-matching plan shape; SELECTs unchanged.

Cumulative results (debug build, local MinIO):

master after this PR
1 conn batch=100 100 r/s, 9.7 ms/row 359 r/s, 2.8 ms/row (3.6×)
1 conn batch=500 102 r/s 366 r/s
8 writers 430 r/s 1,197 r/s (2.8×)
16 writers ~440 r/s 1,218 r/s (2.8×)
32 writers 188 r/s 340 r/s (1.8×)

Fast path hit rate in the bench: 100% (2,919/2,919 inserts). All-placeholder 88-col INSERTs (exactly what monoscope sends) go through the bypass.

8 writers at 1,197 r/s in debug ≈ prod's 1,491 r/s requirement at debug, so release builds should clear it comfortably across realistic concurrency.

Known follow-ups still outstanding (separate PRs):

  • 32-writer drop (340 r/s vs 16-writer 1,218 r/s) is a contention bottleneck, not per-row CAST. Needs separate investigation.
  • Bench client (Python psycopg) likely caps somewhere — release build + Haskell client should track higher.

@claude

claude Bot commented Jun 9, 2026

Copy link
Copy Markdown

Code Review — perf: speed up pgwire INSERT path and flush throughput

The motivation is clear and the benchmark numbers are compelling. The five changes are architecturally sound individually, but a few correctness and maintainability concerns need addressing before merge.


Overview

Change Assessment
F1 — remove sync flush under memory pressure ✅ Correct direction; one safety-net question below
F2 — insert_records_batch returns added URIs ✅ Clean, eliminates duplicate log scans
F3 — coalesce per-(project, table) commits ✅ Good; fragile sentinel in absorb
F4 — detach tantivy build ⚠️ Unbounded task spawning
F5 — fold_literal_casts + fast-path INSERT ⚠️ Complex and undertested; dead code left in

Potential Bugs / Correctness

1. WAL advance failure does not prevent drain (buffered_write_layer.rs ~line 304–310)

if let Err(e) = self.wal.advance_by_counts(...) {
    warn!("WAL advance_by_counts failed: {}", e);
}
for bucket_id in &source_bucket_ids {
    self.mem_buffer.drain_bucket(...);  // proceeds unconditionally
}

If advance_by_counts fails, the data was already committed to Delta but the WAL cursor did not advance. The bucket is still drained from mem_buffer. On the next restart, those records won't be replayed (they're gone from the buffer), but the WAL position doesn't reflect that they were processed. Depending on how walrus drives re-delivery, this could cause missed records or incorrect offset tracking. The safe pattern is: skip drain and log an error when WAL advance fails, accepting a duplicate commit on restart (Delta's optimistic concurrency handles idempotent writes).

2. CoalescedGroup::absorb uses is_empty() as a first-element sentinel (buffered_write_layer.rs ~line 172–175)

fn absorb(&mut self, b: FlushableBucket) {
    if self.project_id.is_empty() {
        self.project_id = b.project_id.clone();
        self.table_name  = b.table_name.clone();
    }
    ...
}

If project_id can ever be "" (e.g., default project path), subsequent buckets in the same group will silently overwrite project_id/table_name on every call. Use an Option<String> or a bool initialized flag instead.

3. Pre-existing test failures are still red

The PR description acknowledges test_flushable_buckets_carry_all_batches and test_batch_queue_under_load fail on master. These test exactly the subsystems this PR rewrites. Whether or not they pre-dated this branch, shipping with known-failing tests for the hot path is risky — they should either be fixed here or explicitly documented as follow-ups with a tracking issue.

4. Detached tantivy tokio::spawn has no lifecycle management (buffered_write_layer.rs ~line 360–369)

tokio::spawn(async move {
    if let Err(e) = cb(pid.clone(), tname.clone(), batches, added_files).await {
        ...
    }
});

No JoinHandle is retained. On graceful shutdown, in-flight tantivy uploads are abandoned without draining. If the callback uploads to S3, partial uploads could leave corrupt index shards. At high flush rates (e.g., 607 r/s with many tables), unbounded concurrent uploads could saturate S3 connections or memory before the semaphore (if any) inside the callback can throttle them. At minimum, store handles in a JoinSet that gets awaited during shutdown.

5. dml_completion mirrors a private vendored function (plan_cache.rs ~line 830–847)

The local copy hard-codes the INSERT 0 N tag format, counts, and the column name "count". If datafusion_postgres changes its dml_completion internals (different column name, tag format, count semantics), this copy silently diverges with no compile error. Comment this explicitly and add it to the diff of any upstream version bumps.


Code Quality / Maintainability

6. Dead code should be removed, not left with #[allow(dead_code)]

/// Currently unused — kept for reference; the `try_fast_path_insert`
/// bypass is cleaner …
#[allow(dead_code)]
fn values_to_memtable(plan: LogicalPlan) -> …

If it's not used, remove it. Git history is the reference. Leaving ~55 lines of dead code in a production hot path creates maintenance debt and confusion for readers.

7. try_fast_path_insert is complex (150+ lines) with no unit tests

This function is on the critical INSERT path and has many return Ok(None) escape hatches. Its correctness is only validated by the bench scripts, not by deterministic unit tests. Consider adding at minimum one test that verifies the fast path is taken for a parameterised INSERT and one that verifies it falls back correctly for unsupported plan shapes (e.g., computed column defaults, sub-selects).


Performance Considerations

8. F1 safety net — is try_reserve_memory wired up?

The PR says the 120% hard-limit reject in try_reserve_memory is now the only synchronous backpressure in the insert path. The diff shows the old flush_all_now call removed and pressure_notify.notify_one() added, but does not show where try_reserve_memory rejects at 120%. If that check is elsewhere (not in this diff), it should be called out explicitly. If it was also part of the flush-blocking code that was removed, there is no hard backpressure limit remaining.

9. flush_completed_total / flush_failed_total semantics changed

Previously these counted bucket flushes; now they count source bucket IDs within a coalesced commit. The metric value is preserved numerically, but its meaning has changed subtly: a coalesced group can now be one "flush" in internal tracing but N in these counters. If any dashboards or alerts threshold on these counters, verify the fan-out effect doesn't trigger false positives.


Minor / Nits

  • bench/latency_probe.py L18 and bench/run-tf-bench.sh L73–74: Hardcoded credentials (password=postgres, minioadmin) in committed files. Even for bench-only use, prefer env var references with a .env.example.
  • fold_literal_casts silently swallows cast errors: Err(_) => Ok(Transformed::no(e)) — the intent is "leave it for the executor", which is fine, but a trace! log would help diagnose unexpected fold misses during development.
  • grpc_handlers.rs change is minimal and correct; the ? propagation of the new Result<Vec<String>> is clean.

Summary

The design is solid and the performance wins are real. The blocking items before merge are:

  1. Fix the WAL advance → drain ordering (correctness: potential offset drift under sustained WAL failures).
  2. Add lifecycle management for detached tantivy spawns (correctness: data loss on shutdown + unbounded concurrency).
  3. Confirm try_reserve_memory hard limit is still active in the F1 path (safety).
  4. Remove values_to_memtable dead code.
  5. Fix or track the two pre-existing test failures.

Items 6–9 are recommended but not blocking.

🤖 Generated with Claude Code

Addresses PR review feedback:

- CRITICAL (#1): drain MemBuffer ONLY after WAL advance_by_counts
  succeeds. The previous flow committed to Delta, ignored WAL advance
  failures, and drained anyway — restart would not replay those rows
  (gone from MemBuffer) and the WAL cursor wouldn't reflect them, so
  rows could be lost. Now on advance failure we leave the buckets in
  MemBuffer for the next cycle to retry; Delta dedup_keys make the
  re-commit idempotent on the downstream side.

- #2 fragile sentinel: CoalescedGroup used `project_id.is_empty()` as
  the first-bucket marker, which silently overwrote on every absorb if
  the project_id was ever "". Switched to `Option<(project_id, table)>`
  and `get_or_insert_with`. Also satisfies clippy `unwrap_or_default`
  via `.entry(..).or_default()`.

- #4 unbounded tantivy spawning: added a 16-permit semaphore so a
  cycle where many tables flush together can't fan out beyond S3 /
  tantivy-heap limits. Permit is acquired inside the spawned task so
  the Delta critical path still returns immediately. Graceful-shutdown
  await of in-flight tantivy builds is a follow-up (documented in the
  struct field).

- #5 dml_completion: explicit re-vendor marker so a future
  datafusion-postgres bump can't silently diverge from upstream
  (INSERT tag format / count column name / count Arrow type).

- #6 dead code: deleted the unused values_to_memtable function and
  trimmed its now-unused imports.

- #9 metrics semantics: counter still counts source bucket IDs (work
  units done) — documented the change in meaning so dashboards
  computing "commits per minute" from it don't silently overstate the
  real Delta commit rate.

- nit: trace! log on fold_literal_casts cast misses so unexpected
  fall-throughs are diagnosable without polluting the hot path.

Also fixes pre-existing clippy warnings blocking this PR's CI:
- src/dml.rs:310 `identify_target_side` complex tuple — added a scoped
  `#[allow(clippy::type_complexity)]` with rationale.

Plus `cargo fmt` over the touched files.

Bench numbers unchanged (1-conn batch=100 ~380 r/s, matches the
prior 359 r/s within noise; per-row 2.6 ms/row).
@tonyalaribe

Copy link
Copy Markdown
Contributor Author

Review fixes (commit d0cfae4)

Applied:

  • Issues #1 (critical): flush_completed_buckets now drains MemBuffer only after advance_by_counts succeeds. Previously a successful Delta commit followed by a failed WAL advance would drain regardless — restart wouldn't replay (gone from buffer) and the WAL cursor wouldn't reflect them, risking data loss. New flow: on advance failure, leave the buckets in MemBuffer for the next cycle to retry; Delta's dedup_keys make the re-commit idempotent downstream.
  • X #2: CoalescedGroup uses Option<(project_id, table_name)> instead of an is_empty() sentinel, and the entry-or-default lookup uses .or_default() (also satisfies clippy's unwrap_or_default).
  • feat : object store  #4: detached tantivy spawns now acquire from a 16-permit semaphore inside the task — bounded fan-out, Delta critical path still returns immediately. Graceful-shutdown await of in-flight builds is documented as a follow-up on the struct field (sidecar is best-effort, index rebuildable from Delta).
  • feat: tantivy full text search #5: added an explicit RE-VENDOR-DML-COMPLETION marker on the local mirror so future datafusion-postgres bumps don't silently diverge.
  • feat: Add query caching, input validation, and health endpoints #6: deleted the unused values_to_memtable function and trimmed dead imports.
  • Add Claude Code GitHub Workflow #9: documented the counter semantic change (counts source bucket IDs, not coalesced commits).
  • nit: trace! log on fold_literal_casts cast-fold misses.

CI unblockers:

  • Pre-existing src/dml.rs:310 clippy type_complexity warning (not in this PR's scope) got a scoped #[allow] with rationale to clear -D warnings.
  • cargo fmt over touched files.

Deferred (documented):

  • Otel #3 (pre-existing test failures): test_flushable_buckets_carry_all_batches and test_batch_queue_under_load both fail on master — confirmed via git stash. Out of scope for a perf PR; should be a separate fix.
  • Better tests and introduce multi table support. so tables are added as yaml files #7 (fast-path unit tests): worth doing but ~200 LOC of plumbing for a hook that's currently validated by bench/concurrent_load.py + bench/latency_probe.py (100% fast-path hit rate, 2,919/2,919). Filed as a follow-up.
  • Update and delete sql query support #8 (try_reserve_memory hard-limit): the reject is unchanged from master and lives in buffered_write_layer.rs::try_reserve_memory at new_total > hard_limit — F1 only removed the inline flush_all_now. The PR's first commit message now calls this out.
  • Hardcoded bench credentials: kept; the files are bench-only and the credentials match what make minio-start uses (minioadmin / postgres). Happy to move to .env.example if you prefer.

@claude

claude Bot commented Jun 9, 2026

Copy link
Copy Markdown

Code Review: perf/insert-path-optimizations

Overview: This PR delivers five targeted optimizations to address Redpanda consumer lag by reducing per-row INSERT latency and flush throughput overhead. The headline numbers (61% single-connection improvement, 38% concurrent improvement) are impressive and the benchmarking methodology is solid. The changes are generally well-reasoned, but there are several issues worth addressing before shipping to production.


High Priority

1. Memory pressure now creates a cliff, not a slope

buffered_write_layer.rs:454-274

The old behavior gave true soft backpressure: inserts stalled (briefly) while the buffer flushed, which created organic flow control. The new behavior:

  • Accepts inserts continuously between 0% and 120%
  • At the 80% pressure threshold: logs a warning + pings the background task
  • At 120%: hard reject

Between 80% and 120%, inserts still return fast (good), but the background flush task has to drain ~40% of the buffer before the next insert starts being rejected. Under a sustained write storm, the 120% cliff could be hit before the background flush completes a full cycle, causing a burst of rejections rather than gradual backpressure.

Consider: does pressure_notify.notify_one() actually wake the flush task faster than its normal interval? If the flush interval is e.g. 60s (as set in run-tf-bench.sh), the notify may do nothing useful until the task's select! fires. Is there a fast-path for immediate wakeup vs. the scheduled interval?

2. WAL advance failure leaves buckets that already committed to Delta

buffered_write_layer.rs:350-375

If advance_by_counts fails after a successful Delta commit, the buckets remain in MemBuffer and will be re-committed on the next flush cycle. The PR comment states "Delta's optimistic concurrency + dedup_keys handle the duplicate commit (last-write-wins on the per-table key set)."

This needs explicit verification:

  • Does this table actually have dedup_keys configured? This is a Delta Lake feature that requires opt-in at table creation time.
  • If not configured, re-committed rows will appear twice in query results — silent data duplication.

This is a data correctness concern, not just a performance risk.

3. Missing tests for the two largest changes

The PR passes the existing test suite but adds no new tests for:

  • fold_literal_casts — easy to unit test with synthetic LogicalPlans containing CAST(Literal, T) at various nesting depths
  • try_fast_path_insert — the fast-path / fallback branching logic has many conditions; a table-driven test covering Projection→Values, Values directly, non-literal cells (should fall through), and type mismatch would catch regressions
  • CoalescedGroup::absorb with heterogeneous shard widths (e.g. one bucket has 3 shards, another has 5)

Medium Priority

4. dml_completion is a manual re-vendor with no automated drift detection

plan_cache.rs:843-860

The function is explicitly copied from a pub(super) function in the vendored crate. The re-vendor checklist is good, but it relies entirely on manual diligence. Two suggestions:

  • Add a #[cfg(test)] test that exercises the "INSERT 0 N" wire format so any divergence at least surfaces as a test failure
  • Add a comment to Cargo.toml or a VENDOR_NOTES.md noting this file has a manual copy that must be checked on version bumps

5. Tantivy detached tasks are not awaited on shutdown

buffered_write_layer.rs:418-430

The comment acknowledges this. The risk is: after a clean shutdown, the last N tantivy uploads are silently dropped. If a downstream consumer relies on tantivy index completeness (e.g. for full-text search over recent data), there will be a gap after every restart/deploy.

At minimum, storing the JoinHandles in a Vec<JoinHandle<()>> and join_all-ing them in a shutdown() method would eliminate this. Even if the handles are stored only and never cancelled, a 5-second tokio::time::timeout on the join would be safer than fire-and-forget.

6. Counter semantic change will break any "commits per minute" alert

buffered_write_layer.rs:323-327

The counters flush_completed_total / flush_failed_total now count source buckets, not Delta commits. The PR documents this, but it should be tracked as a follow-up to update any dashboards/alerts that derive "commit rate" from these metrics. A new flush_commit_total counter (incremented once per coalesced group, not per bucket) would let both meanings coexist without confusion.


Low Priority / Style

7. Semaphore capacity hardcoded to 16

buffered_write_layer.rs:238

This is a reasonable default but isn't configurable. If a deployment has 50+ distinct tables (multi-tenant), the semaphore becomes a throughput limiter. Consider wiring it through AppConfig with a default of 16.

8. fast_insert_batch silently drops added-file URIs

database.rs:529-532

insert_records_batch with skip_queue=false hits the buffered layer path and returns Vec::new() anyway, so this is not a bug today. But it means fast_insert_batch diverges from the pattern established in bootstrap.rs/main.rs (use the returned URIs for cache warming). Adding a comment like // buffered-layer path: URIs returned after the flush cycle, not at insert time would prevent future confusion.

9. bench/latency_probe.py hardcodes credentials

bench/latency_probe.py:18

URL = "host=127.0.0.1 port=12345 user=postgres password=postgres dbname=postgres"

Fine for a local bench script, but if this ever gets copy-pasted into CI config it will fail or worse embed credentials. Reading from env vars (os.getenv("TF_PG_URL", "...")) would be a safer default.


What's Well Done

  • The coalescing logic in CoalescedGroup is clean and the sentinel Option<(String, String)> comment correctly identifies a real footgun with the empty-string project_id
  • The critical ordering comment in the WAL advance block is exactly the right thing to document
  • The fallback behavior in try_fast_path_insert (Ok(None) → fall through) means the fast path never causes silent correctness regressions; it only affects performance
  • Removing the pre/post list_file_uris diff and instead capturing the pre-write snapshot under the same write lock (database.rs:486-497) is clean and correct

Summary: The performance wins are real and the approach is sound. Before merging, I'd prioritize: (1) verifying dedup_keys exist or adding them if not, (2) adding tests for fold_literal_casts and try_fast_path_insert, and (3) confirming pressure_notify wakes the flush task promptly rather than waiting for the next scheduled interval. The tantivy shutdown and counter issues are lower risk but worth tracking as immediate follow-ups.

@tonyalaribe tonyalaribe merged commit ef13450 into master Jun 9, 2026
7 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant