Skip to content

Commit e2258ca

Browse files
zzylolclaude
andcommitted
perf(precompute): jemalloc global allocator + Arc<str> group-key interning
Two CPU optimizations for the precompute engine hot path, motivated by profiling the tumbling-window KLL sketch workload (perf -F499, 4000 groups). 1. jemalloc global allocator (tikv-jemallocator, default `jemalloc` feature). The hot path was dominated by malloc/free churn (43% of CPU) from per-window sketch buffers. jemalloc's per-thread arenas / size-class caching recycle those buffers, dropping allocator cost to ~7%. 2. Arc<str> group-key interning. group_states is now nested HashMap<u64, HashMap<Arc<str>, GroupState>>, so the per-sample/per-batch lookup borrows by &str with zero allocation; the key string is allocated once per group. Also removes a redundant per-batch hashmap lookup in process_group_samples. Measured (4000 groups, 2.4M window-closes, mean of 3 reps), relative to the pre-PR clone+system-malloc baseline: clone + system malloc 7.95s 301k closes/s 1.00x move (into_accumulator) 6.33s 380k closes/s 1.26x move + jemalloc 3.71s 646k closes/s 2.14x move + jemalloc + Arc<str> 3.61s 665k closes/s 2.20x All 473 lib tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 1f2487e commit e2258ca

4 files changed

Lines changed: 86 additions & 37 deletions

File tree

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

asap-query-engine/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ csv = "1"
6161
elastic_dsl_utilities.workspace = true
6262
asap_sketchlib.workspace = true
6363

64+
# jemalloc global allocator — cuts malloc/free churn in the precompute engine
65+
# hot path (per-sample/per-window sketch + key allocations). Optional so the
66+
# crate still builds where jemalloc is unavailable; not built on MSVC.
67+
[target.'cfg(not(target_env = "msvc"))'.dependencies]
68+
tikv-jemallocator = { version = "0.6", optional = true }
69+
6470
[[bin]]
6571
name = "precompute_engine"
6672
path = "src/bin/precompute_engine.rs"
@@ -87,7 +93,9 @@ harness = false
8793

8894
[features]
8995
#default = ["lock_profiling", "extra_debugging"]
90-
default = []
96+
default = ["jemalloc"]
97+
# Use jemalloc as the global allocator (see lib.rs). On by default.
98+
jemalloc = ["dep:tikv-jemallocator"]
9199
# Enable lock profiling instrumentation
92100
lock_profiling = []
93101
# Enable extra debugging output

asap-query-engine/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
// Use jemalloc as the global allocator when the `jemalloc` feature is on
2+
// (default). The precompute engine's hot path is dominated by malloc/free
3+
// churn (per-sample/per-window sketch buffers and group-key allocations);
4+
// jemalloc's per-thread arenas and size-class caching cut that overhead.
5+
#[cfg(feature = "jemalloc")]
6+
#[global_allocator]
7+
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
8+
19
pub mod data_model;
210
pub mod drivers;
311
pub mod engines;

asap-query-engine/src/precompute_engine/worker.rs

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ pub struct Worker {
5050
receiver: mpsc::Receiver<WorkerMessage>,
5151
output_sink: Arc<dyn OutputSink>,
5252
/// Map from (agg_id, group_key) to per-group state.
53-
group_states: HashMap<(u64, String), GroupState>,
53+
/// Per-group state, keyed by `agg_id` then by an interned `Arc<str>`
54+
/// group key. Nesting lets the per-sample hot path look up by `&str`
55+
/// (no allocation); the group-key string is allocated once, on first
56+
/// sight of the group, and shared via the `Arc`.
57+
group_states: HashMap<u64, HashMap<Arc<str>, GroupState>>,
5458
/// Aggregation configs, keyed by aggregation_id.
5559
agg_configs: HashMap<u64, Arc<AggregationConfig>>,
5660
/// Allowed lateness in ms.
@@ -185,25 +189,18 @@ impl Worker {
185189
Vec::new();
186190

187191
for agg_id in &removed_ids {
188-
// Drain all group states for this agg_id.
189-
let removed_keys: Vec<_> = self
190-
.group_states
191-
.keys()
192-
.filter(|(id, _)| id == agg_id)
193-
.cloned()
194-
.collect();
195-
196-
for key in removed_keys {
197-
let Some(state) = self.group_states.remove(&key) else {
198-
continue;
199-
};
192+
// Drain all group states for this agg_id in one move.
193+
let Some(inner) = self.group_states.remove(agg_id) else {
194+
continue;
195+
};
196+
197+
for (group_key_str, state) in inner {
200198
if state.previous_watermark_ms == i64::MIN {
201199
continue; // No samples received — nothing to emit.
202200
}
203201
// Force-close all open windows by advancing the watermark
204202
// to i64::MAX. No new samples will arrive for this group.
205-
let (group_key_str, mut active_panes) =
206-
(key.1.clone(), state.active_panes);
203+
let mut active_panes = state.active_panes;
207204
let closed = state
208205
.window_manager
209206
.closed_windows(state.previous_watermark_ms, i64::MAX);
@@ -240,7 +237,7 @@ impl Worker {
240237
}
241238

242239
self.group_count
243-
.store(self.group_states.len(), Ordering::Relaxed);
240+
.store(self.total_groups(), Ordering::Relaxed);
244241
info!(
245242
"Worker {}: evicted {} removed agg_id(s) {:?}",
246243
self.id,
@@ -264,31 +261,44 @@ impl Worker {
264261
info!(
265262
"Worker {} stopped, {} active groups",
266263
self.id,
267-
self.group_states.len()
264+
self.total_groups()
268265
);
269266
}
270267

268+
/// Total number of live groups across all agg_ids.
269+
fn total_groups(&self) -> usize {
270+
self.group_states.values().map(|m| m.len()).sum()
271+
}
272+
271273
/// Get or create the GroupState for a (agg_id, group_key) pair.
272274
/// Returns None if agg_id has no matching config.
273275
fn get_or_create_group_state(
274276
&mut self,
275277
agg_id: u64,
276278
group_key: &str,
277279
) -> Option<&mut GroupState> {
278-
let key = (agg_id, group_key.to_string());
279-
if !self.group_states.contains_key(&key) {
280-
let config = self.agg_configs.get(&agg_id)?;
280+
// Fast path: group already exists — borrow-based lookup, no allocation.
281+
let exists = self
282+
.group_states
283+
.get(&agg_id)
284+
.is_some_and(|m| m.contains_key(group_key));
285+
if !exists {
286+
// Creation path: requires a config, and allocates the interned key once.
287+
let config = Arc::clone(self.agg_configs.get(&agg_id)?);
281288
let gs = GroupState {
282289
window_manager: WindowManager::new(config.window_size, config.slide_interval),
283-
config: Arc::clone(config),
290+
config,
284291
active_panes: BTreeMap::new(),
285292
previous_watermark_ms: i64::MIN,
286293
};
287-
self.group_states.insert(key.clone(), gs);
294+
self.group_states
295+
.entry(agg_id)
296+
.or_default()
297+
.insert(Arc::from(group_key), gs);
288298
self.group_count
289-
.store(self.group_states.len(), Ordering::Relaxed);
299+
.store(self.total_groups(), Ordering::Relaxed);
290300
}
291-
self.group_states.get_mut(&key)
301+
self.group_states.get_mut(&agg_id)?.get_mut(group_key)
292302
}
293303

294304
/// Process a batch of samples for a specific (agg_id, group_key).
@@ -305,17 +315,16 @@ impl Worker {
305315
let allowed_lateness_ms = self.allowed_lateness_ms;
306316
let late_data_policy = self.late_data_policy;
307317

308-
if self.get_or_create_group_state(agg_id, group_key).is_none() {
309-
warn!(
310-
"Worker {} skipping samples for unknown agg_id={}, group_key={}",
311-
self.id, agg_id, group_key
312-
);
313-
return Ok(());
314-
}
315-
let state = self
316-
.group_states
317-
.get_mut(&(agg_id, group_key.to_string()))
318-
.unwrap();
318+
let state = match self.get_or_create_group_state(agg_id, group_key) {
319+
Some(state) => state,
320+
None => {
321+
warn!(
322+
"Worker {} skipping samples for unknown agg_id={}, group_key={}",
323+
self.id, agg_id, group_key
324+
);
325+
return Ok(());
326+
}
327+
};
319328

320329
// Find the max timestamp in this batch to advance the watermark
321330
let batch_max_ts = samples
@@ -470,6 +479,7 @@ impl Worker {
470479
let worker_wm = self
471480
.group_states
472481
.values()
482+
.flat_map(|m| m.values())
473483
.map(|s| s.previous_watermark_ms)
474484
.filter(|&wm| wm != i64::MIN)
475485
.max()
@@ -484,7 +494,8 @@ impl Worker {
484494
// Step 4: For each group, advance watermark and close due windows.
485495
let mut emit_batch: Vec<(PrecomputedOutput, Box<dyn AggregateCore>)> = Vec::new();
486496

487-
for ((agg_id, group_key), state) in &mut self.group_states {
497+
for (agg_id, inner) in &mut self.group_states {
498+
for (group_key, state) in inner.iter_mut() {
488499
if state.previous_watermark_ms == i64::MIN {
489500
continue; // No samples received yet — no panes to close.
490501
}
@@ -523,6 +534,7 @@ impl Worker {
523534
if effective_wm > state.previous_watermark_ms {
524535
state.previous_watermark_ms = effective_wm;
525536
}
537+
}
526538
}
527539

528540
if !emit_batch.is_empty() {

0 commit comments

Comments
 (0)