From 7affa975fa6ea04761da8c64740c4d4d3e2bc134 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 28 May 2026 14:43:35 -0600 Subject: [PATCH] perf(precompute): jemalloc global allocator + Arc group-key interning Two CPU optimizations for the precompute engine hot path, motivated by profiling the tumbling-window KLL sketch workload (perf -F499, 4000 groups). 1. jemalloc global allocator (tikv-jemallocator, default `jemalloc` feature). The hot path was dominated by malloc/free churn (43% of CPU) from per-window sketch buffers. jemalloc's per-thread arenas / size-class caching recycle those buffers, dropping allocator cost to ~7%. 2. Arc group-key interning. group_states is now nested HashMap, GroupState>>, so the per-sample/per-batch lookup borrows by &str with zero allocation; the key string is allocated once per group. Also removes a redundant per-batch hashmap lookup in process_group_samples. Measured (4000 groups, 2.4M window-closes, mean of 3 reps), relative to the pre-PR clone+system-malloc baseline: clone + system malloc 7.95s 301k closes/s 1.00x move (into_accumulator) 6.33s 380k closes/s 1.26x move + jemalloc 3.71s 646k closes/s 2.14x move + jemalloc + Arc 3.61s 665k closes/s 2.20x All 473 lib tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 21 +++ asap-query-engine/Cargo.toml | 10 +- asap-query-engine/src/lib.rs | 8 + .../src/precompute_engine/worker.rs | 152 ++++++++++-------- 4 files changed, 120 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60320a7d..1fbc7db3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3453,6 +3453,7 @@ dependencies = [ "structopt", "tempfile", "thiserror 1.0.69", + "tikv-jemallocator", "tokio", "tokio-stream", "tonic", @@ -4319,6 +4320,26 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.47" diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index f14eb40c..c31618a4 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -61,6 +61,12 @@ csv = "1" elastic_dsl_utilities.workspace = true asap_sketchlib.workspace = true +# jemalloc global allocator — cuts malloc/free churn in the precompute engine +# hot path (per-sample/per-window sketch + key allocations). Optional so the +# crate still builds where jemalloc is unavailable; not built on MSVC. +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = { version = "0.6", optional = true } + [[bin]] name = "precompute_engine" path = "src/bin/precompute_engine.rs" @@ -87,7 +93,9 @@ harness = false [features] #default = ["lock_profiling", "extra_debugging"] -default = [] +default = ["jemalloc"] +# Use jemalloc as the global allocator (see lib.rs). On by default. +jemalloc = ["dep:tikv-jemallocator"] # Enable lock profiling instrumentation lock_profiling = [] # Enable extra debugging output diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index f6b8a4ed..64ca8196 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -1,3 +1,11 @@ +// Use jemalloc as the global allocator when the `jemalloc` feature is on +// (default). The precompute engine's hot path is dominated by malloc/free +// churn (per-sample/per-window sketch buffers and group-key allocations); +// jemalloc's per-thread arenas and size-class caching cut that overhead. +#[cfg(feature = "jemalloc")] +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + pub mod data_model; pub mod drivers; pub mod engines; diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index e72e6041..02207994 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -50,7 +50,11 @@ pub struct Worker { receiver: mpsc::Receiver, output_sink: Arc, /// Map from (agg_id, group_key) to per-group state. - group_states: HashMap<(u64, String), GroupState>, + /// Per-group state, keyed by `agg_id` then by an interned `Arc` + /// group key. Nesting lets the per-sample hot path look up by `&str` + /// (no allocation); the group-key string is allocated once, on first + /// sight of the group, and shared via the `Arc`. + group_states: HashMap, GroupState>>, /// Aggregation configs, keyed by aggregation_id. agg_configs: HashMap>, /// Allowed lateness in ms. @@ -185,25 +189,18 @@ impl Worker { Vec::new(); for agg_id in &removed_ids { - // Drain all group states for this agg_id. - let removed_keys: Vec<_> = self - .group_states - .keys() - .filter(|(id, _)| id == agg_id) - .cloned() - .collect(); - - for key in removed_keys { - let Some(state) = self.group_states.remove(&key) else { - continue; - }; + // Drain all group states for this agg_id in one move. + let Some(inner) = self.group_states.remove(agg_id) else { + continue; + }; + + for (group_key_str, state) in inner { if state.previous_watermark_ms == i64::MIN { continue; // No samples received — nothing to emit. } // Force-close all open windows by advancing the watermark // to i64::MAX. No new samples will arrive for this group. - let (group_key_str, mut active_panes) = - (key.1.clone(), state.active_panes); + let mut active_panes = state.active_panes; let closed = state .window_manager .closed_windows(state.previous_watermark_ms, i64::MAX); @@ -240,7 +237,7 @@ impl Worker { } self.group_count - .store(self.group_states.len(), Ordering::Relaxed); + .store(self.total_groups(), Ordering::Relaxed); info!( "Worker {}: evicted {} removed agg_id(s) {:?}", self.id, @@ -264,10 +261,15 @@ impl Worker { info!( "Worker {} stopped, {} active groups", self.id, - self.group_states.len() + self.total_groups() ); } + /// Total number of live groups across all agg_ids. + fn total_groups(&self) -> usize { + self.group_states.values().map(|m| m.len()).sum() + } + /// Get or create the GroupState for a (agg_id, group_key) pair. /// Returns None if agg_id has no matching config. fn get_or_create_group_state( @@ -275,20 +277,28 @@ impl Worker { agg_id: u64, group_key: &str, ) -> Option<&mut GroupState> { - let key = (agg_id, group_key.to_string()); - if !self.group_states.contains_key(&key) { - let config = self.agg_configs.get(&agg_id)?; + // Fast path: group already exists — borrow-based lookup, no allocation. + let exists = self + .group_states + .get(&agg_id) + .is_some_and(|m| m.contains_key(group_key)); + if !exists { + // Creation path: requires a config, and allocates the interned key once. + let config = Arc::clone(self.agg_configs.get(&agg_id)?); let gs = GroupState { window_manager: WindowManager::new(config.window_size, config.slide_interval), - config: Arc::clone(config), + config, active_panes: BTreeMap::new(), previous_watermark_ms: i64::MIN, }; - self.group_states.insert(key.clone(), gs); + self.group_states + .entry(agg_id) + .or_default() + .insert(Arc::from(group_key), gs); self.group_count - .store(self.group_states.len(), Ordering::Relaxed); + .store(self.total_groups(), Ordering::Relaxed); } - self.group_states.get_mut(&key) + self.group_states.get_mut(&agg_id)?.get_mut(group_key) } /// Process a batch of samples for a specific (agg_id, group_key). @@ -305,17 +315,16 @@ impl Worker { let allowed_lateness_ms = self.allowed_lateness_ms; let late_data_policy = self.late_data_policy; - if self.get_or_create_group_state(agg_id, group_key).is_none() { - warn!( - "Worker {} skipping samples for unknown agg_id={}, group_key={}", - self.id, agg_id, group_key - ); - return Ok(()); - } - let state = self - .group_states - .get_mut(&(agg_id, group_key.to_string())) - .unwrap(); + let state = match self.get_or_create_group_state(agg_id, group_key) { + Some(state) => state, + None => { + warn!( + "Worker {} skipping samples for unknown agg_id={}, group_key={}", + self.id, agg_id, group_key + ); + return Ok(()); + } + }; // Find the max timestamp in this batch to advance the watermark let batch_max_ts = samples @@ -470,6 +479,7 @@ impl Worker { let worker_wm = self .group_states .values() + .flat_map(|m| m.values()) .map(|s| s.previous_watermark_ms) .filter(|&wm| wm != i64::MIN) .max() @@ -484,44 +494,46 @@ impl Worker { // Step 4: For each group, advance watermark and close due windows. let mut emit_batch: Vec<(PrecomputedOutput, Box)> = Vec::new(); - for ((agg_id, group_key), state) in &mut self.group_states { - if state.previous_watermark_ms == i64::MIN { - continue; // No samples received yet — no panes to close. - } + for (agg_id, inner) in &mut self.group_states { + for (group_key, state) in inner.iter_mut() { + if state.previous_watermark_ms == i64::MIN { + continue; // No samples received yet — no panes to close. + } - // Effective watermark: max(group's own, global) + 1ms for boundary. - let propagated_wm = if global_wm != i64::MIN { - state.previous_watermark_ms.max(global_wm) - } else { - state.previous_watermark_ms - }; - let effective_wm = propagated_wm.saturating_add(1); - - let closed = state - .window_manager - .closed_windows(state.previous_watermark_ms, effective_wm); - - for window_start in &closed { - let (_, window_end) = state.window_manager.window_bounds(*window_start); - let pane_starts = state.window_manager.panes_for_window(*window_start); - - if let Some(accumulator) = - merge_panes_for_window(&mut state.active_panes, &pane_starts) - { - let key = build_group_key_label_values(group_key); - let output = PrecomputedOutput::new( - *window_start as u64, - window_end as u64, - Some(key), - *agg_id, - ); - emit_batch.push((output, accumulator)); + // Effective watermark: max(group's own, global) + 1ms for boundary. + let propagated_wm = if global_wm != i64::MIN { + state.previous_watermark_ms.max(global_wm) + } else { + state.previous_watermark_ms + }; + let effective_wm = propagated_wm.saturating_add(1); + + let closed = state + .window_manager + .closed_windows(state.previous_watermark_ms, effective_wm); + + for window_start in &closed { + let (_, window_end) = state.window_manager.window_bounds(*window_start); + let pane_starts = state.window_manager.panes_for_window(*window_start); + + if let Some(accumulator) = + merge_panes_for_window(&mut state.active_panes, &pane_starts) + { + let key = build_group_key_label_values(group_key); + let output = PrecomputedOutput::new( + *window_start as u64, + window_end as u64, + Some(key), + *agg_id, + ); + emit_batch.push((output, accumulator)); + } } - } - // Update group watermark to reflect the advancement. - if effective_wm > state.previous_watermark_ms { - state.previous_watermark_ms = effective_wm; + // Update group watermark to reflect the advancement. + if effective_wm > state.previous_watermark_ms { + state.previous_watermark_ms = effective_wm; + } } }