Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion asap-query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ csv = "1"
elastic_dsl_utilities.workspace = true
asap_sketchlib.workspace = true

# jemalloc global allocator — cuts malloc/free churn in the precompute engine
# hot path (per-sample/per-window sketch + key allocations). Optional so the
# crate still builds where jemalloc is unavailable; not built on MSVC.
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.6", optional = true }

[[bin]]
name = "precompute_engine"
path = "src/bin/precompute_engine.rs"
Expand All @@ -87,7 +93,9 @@ harness = false

[features]
#default = ["lock_profiling", "extra_debugging"]
default = []
default = ["jemalloc"]
# Use jemalloc as the global allocator (see lib.rs). On by default.
jemalloc = ["dep:tikv-jemallocator"]
# Enable lock profiling instrumentation
lock_profiling = []
# Enable extra debugging output
Expand Down
8 changes: 8 additions & 0 deletions asap-query-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
// Use jemalloc as the global allocator when the `jemalloc` feature is on
// (default). The precompute engine's hot path is dominated by malloc/free
// churn (per-sample/per-window sketch buffers and group-key allocations);
// jemalloc's per-thread arenas and size-class caching cut that overhead.
#[cfg(feature = "jemalloc")]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

pub mod data_model;
pub mod drivers;
pub mod engines;
Expand Down
152 changes: 82 additions & 70 deletions asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ pub struct Worker {
receiver: mpsc::Receiver<WorkerMessage>,
output_sink: Arc<dyn OutputSink>,
/// Map from (agg_id, group_key) to per-group state.
group_states: HashMap<(u64, String), GroupState>,
/// Per-group state, keyed by `agg_id` then by an interned `Arc<str>`
/// group key. Nesting lets the per-sample hot path look up by `&str`
/// (no allocation); the group-key string is allocated once, on first
/// sight of the group, and shared via the `Arc`.
group_states: HashMap<u64, HashMap<Arc<str>, GroupState>>,
/// Aggregation configs, keyed by aggregation_id.
agg_configs: HashMap<u64, Arc<AggregationConfig>>,
/// Allowed lateness in ms.
Expand Down Expand Up @@ -185,25 +189,18 @@ impl Worker {
Vec::new();

for agg_id in &removed_ids {
// Drain all group states for this agg_id.
let removed_keys: Vec<_> = self
.group_states
.keys()
.filter(|(id, _)| id == agg_id)
.cloned()
.collect();

for key in removed_keys {
let Some(state) = self.group_states.remove(&key) else {
continue;
};
// Drain all group states for this agg_id in one move.
let Some(inner) = self.group_states.remove(agg_id) else {
continue;
};

for (group_key_str, state) in inner {
if state.previous_watermark_ms == i64::MIN {
continue; // No samples received — nothing to emit.
}
// Force-close all open windows by advancing the watermark
// to i64::MAX. No new samples will arrive for this group.
let (group_key_str, mut active_panes) =
(key.1.clone(), state.active_panes);
let mut active_panes = state.active_panes;
let closed = state
.window_manager
.closed_windows(state.previous_watermark_ms, i64::MAX);
Expand Down Expand Up @@ -240,7 +237,7 @@ impl Worker {
}

self.group_count
.store(self.group_states.len(), Ordering::Relaxed);
.store(self.total_groups(), Ordering::Relaxed);
info!(
"Worker {}: evicted {} removed agg_id(s) {:?}",
self.id,
Expand All @@ -264,31 +261,44 @@ impl Worker {
info!(
"Worker {} stopped, {} active groups",
self.id,
self.group_states.len()
self.total_groups()
);
}

/// Total number of live groups across all agg_ids.
fn total_groups(&self) -> usize {
self.group_states.values().map(|m| m.len()).sum()
}

/// Get or create the GroupState for a (agg_id, group_key) pair.
/// Returns None if agg_id has no matching config.
fn get_or_create_group_state(
&mut self,
agg_id: u64,
group_key: &str,
) -> Option<&mut GroupState> {
let key = (agg_id, group_key.to_string());
if !self.group_states.contains_key(&key) {
let config = self.agg_configs.get(&agg_id)?;
// Fast path: group already exists — borrow-based lookup, no allocation.
let exists = self
.group_states
.get(&agg_id)
.is_some_and(|m| m.contains_key(group_key));
if !exists {
// Creation path: requires a config, and allocates the interned key once.
let config = Arc::clone(self.agg_configs.get(&agg_id)?);
let gs = GroupState {
window_manager: WindowManager::new(config.window_size, config.slide_interval),
config: Arc::clone(config),
config,
active_panes: BTreeMap::new(),
previous_watermark_ms: i64::MIN,
};
self.group_states.insert(key.clone(), gs);
self.group_states
.entry(agg_id)
.or_default()
.insert(Arc::from(group_key), gs);
self.group_count
.store(self.group_states.len(), Ordering::Relaxed);
.store(self.total_groups(), Ordering::Relaxed);
}
self.group_states.get_mut(&key)
self.group_states.get_mut(&agg_id)?.get_mut(group_key)
}

/// Process a batch of samples for a specific (agg_id, group_key).
Expand All @@ -305,17 +315,16 @@ impl Worker {
let allowed_lateness_ms = self.allowed_lateness_ms;
let late_data_policy = self.late_data_policy;

if self.get_or_create_group_state(agg_id, group_key).is_none() {
warn!(
"Worker {} skipping samples for unknown agg_id={}, group_key={}",
self.id, agg_id, group_key
);
return Ok(());
}
let state = self
.group_states
.get_mut(&(agg_id, group_key.to_string()))
.unwrap();
let state = match self.get_or_create_group_state(agg_id, group_key) {
Some(state) => state,
None => {
warn!(
"Worker {} skipping samples for unknown agg_id={}, group_key={}",
self.id, agg_id, group_key
);
return Ok(());
}
};

// Find the max timestamp in this batch to advance the watermark
let batch_max_ts = samples
Expand Down Expand Up @@ -470,6 +479,7 @@ impl Worker {
let worker_wm = self
.group_states
.values()
.flat_map(|m| m.values())
.map(|s| s.previous_watermark_ms)
.filter(|&wm| wm != i64::MIN)
.max()
Expand All @@ -484,44 +494,46 @@ impl Worker {
// Step 4: For each group, advance watermark and close due windows.
let mut emit_batch: Vec<(PrecomputedOutput, Box<dyn AggregateCore>)> = Vec::new();

for ((agg_id, group_key), state) in &mut self.group_states {
if state.previous_watermark_ms == i64::MIN {
continue; // No samples received yet — no panes to close.
}
for (agg_id, inner) in &mut self.group_states {
for (group_key, state) in inner.iter_mut() {
if state.previous_watermark_ms == i64::MIN {
continue; // No samples received yet — no panes to close.
}

// Effective watermark: max(group's own, global) + 1ms for boundary.
let propagated_wm = if global_wm != i64::MIN {
state.previous_watermark_ms.max(global_wm)
} else {
state.previous_watermark_ms
};
let effective_wm = propagated_wm.saturating_add(1);

let closed = state
.window_manager
.closed_windows(state.previous_watermark_ms, effective_wm);

for window_start in &closed {
let (_, window_end) = state.window_manager.window_bounds(*window_start);
let pane_starts = state.window_manager.panes_for_window(*window_start);

if let Some(accumulator) =
merge_panes_for_window(&mut state.active_panes, &pane_starts)
{
let key = build_group_key_label_values(group_key);
let output = PrecomputedOutput::new(
*window_start as u64,
window_end as u64,
Some(key),
*agg_id,
);
emit_batch.push((output, accumulator));
// Effective watermark: max(group's own, global) + 1ms for boundary.
let propagated_wm = if global_wm != i64::MIN {
state.previous_watermark_ms.max(global_wm)
} else {
state.previous_watermark_ms
};
let effective_wm = propagated_wm.saturating_add(1);

let closed = state
.window_manager
.closed_windows(state.previous_watermark_ms, effective_wm);

for window_start in &closed {
let (_, window_end) = state.window_manager.window_bounds(*window_start);
let pane_starts = state.window_manager.panes_for_window(*window_start);

if let Some(accumulator) =
merge_panes_for_window(&mut state.active_panes, &pane_starts)
{
let key = build_group_key_label_values(group_key);
let output = PrecomputedOutput::new(
*window_start as u64,
window_end as u64,
Some(key),
*agg_id,
);
emit_batch.push((output, accumulator));
}
}
}

// Update group watermark to reflect the advancement.
if effective_wm > state.previous_watermark_ms {
state.previous_watermark_ms = effective_wm;
// Update group watermark to reflect the advancement.
if effective_wm > state.previous_watermark_ms {
state.previous_watermark_ms = effective_wm;
}
}
}

Expand Down
Loading