Skip to content

Commit 1ed4fa4

Browse files
zz_yclaude
andcommitted
refactor: restructure precompute engine from per-series to per-group accumulators
Match Arroyo's GROUP BY semantics: the ingest handler now extracts grouping label values from each series and groups samples by (agg_id, group_key) before routing to workers. The router hashes by group key so all series sharing the same grouping labels land on the same worker and feed a single shared accumulator. For the quickstart (189K series, 7 pattern groups), this reduces accumulator count from 189K to 7, store writes/sec from 189K to 7, and eliminates query-time fan-in merge. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bea8f8e commit 1ed4fa4

5 files changed

Lines changed: 751 additions & 449 deletions

File tree

asap-query-engine/src/main.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use query_engine_rust::data_model::enums::{InputFormat, LockStrategy, StreamingE
1111
use query_engine_rust::drivers::AdapterConfig;
1212
use query_engine_rust::precompute_engine::config::LateDataPolicy;
1313
use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config};
14+
use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics;
1415
use query_engine_rust::{
1516
HttpServer, HttpServerConfig, KafkaConsumer, KafkaConsumerConfig, OtlpReceiver,
1617
OtlpReceiverConfig, PrecomputeEngine, PrecomputeEngineConfig, Result, SimpleEngine,
@@ -323,16 +324,29 @@ async fn main() -> Result<()> {
323324
let output_sink = Arc::new(StoreOutputSink::new(store.clone()));
324325
let engine =
325326
PrecomputeEngine::new(precompute_config, streaming_config.clone(), output_sink);
327+
let worker_diagnostics = engine.diagnostics();
326328
info!(
327329
"Starting precompute engine on port {}",
328330
args.prometheus_remote_write_port
329331
);
332+
333+
// Spawn periodic memory diagnostics logger
334+
let diag_store = store.clone();
335+
tokio::spawn(async move {
336+
spawn_memory_diagnostics(diag_store, Some(worker_diagnostics)).await;
337+
});
338+
330339
Some(tokio::spawn(async move {
331340
if let Err(e) = engine.run().await {
332341
error!("Precompute engine error: {}", e);
333342
}
334343
}))
335344
} else {
345+
// Even without precompute, log store diagnostics
346+
let diag_store = store.clone();
347+
tokio::spawn(async move {
348+
spawn_memory_diagnostics(diag_store, None).await;
349+
});
336350
None
337351
};
338352

@@ -396,6 +410,59 @@ async fn main() -> Result<()> {
396410
Ok(())
397411
}
398412

413+
/// Periodic memory diagnostics logger — runs every 30 seconds.
414+
async fn spawn_memory_diagnostics(
415+
store: Arc<SimpleMapStore>,
416+
worker_diagnostics: Option<Arc<PrecomputeWorkerDiagnostics>>,
417+
) {
418+
use std::sync::atomic::Ordering;
419+
420+
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
421+
loop {
422+
interval.tick().await;
423+
424+
// 1. Store diagnostics
425+
let store_diag = store.diagnostic_info();
426+
info!(
427+
"[MEMORY_DIAG] Store: {} aggregation(s), {} total time_map entries, {:.2} KB total sketch bytes",
428+
store_diag.num_aggregations,
429+
store_diag.total_time_map_entries,
430+
store_diag.total_sketch_bytes as f64 / 1024.0,
431+
);
432+
for agg in &store_diag.per_aggregation {
433+
info!(
434+
"[MEMORY_DIAG] agg_id={}: time_map_len={}, read_counts_len={}, aggregate_objects={}, sketch_bytes={:.2} KB",
435+
agg.aggregation_id,
436+
agg.time_map_len,
437+
agg.read_counts_len,
438+
agg.num_aggregate_objects,
439+
agg.sketch_bytes as f64 / 1024.0,
440+
);
441+
}
442+
443+
// 2. Worker diagnostics (precompute engine only)
444+
if let Some(ref diag) = worker_diagnostics {
445+
let total_groups: usize = diag
446+
.worker_group_counts
447+
.iter()
448+
.map(|c| c.load(Ordering::Relaxed))
449+
.sum();
450+
info!(
451+
"[MEMORY_DIAG] PrecomputeEngine: {} total groups across {} workers",
452+
total_groups,
453+
diag.worker_group_counts.len(),
454+
);
455+
for (i, counter) in diag.worker_group_counts.iter().enumerate() {
456+
info!(
457+
"[MEMORY_DIAG] worker_{}: group_states_len={}",
458+
i,
459+
counter.load(Ordering::Relaxed),
460+
);
461+
}
462+
}
463+
}
464+
}
465+
399466
fn setup_logging(
400467
output_dir: &str,
401468
log_level: &str,

asap-query-engine/src/precompute_engine/engine.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,25 @@ use crate::precompute_engine::worker::{Worker, WorkerRuntimeConfig};
99
use axum::{routing::post, Router};
1010
use sketch_db_common::aggregation_config::AggregationConfig;
1111
use std::collections::HashMap;
12+
use std::sync::atomic::AtomicUsize;
1213
use std::sync::Arc;
1314
use tokio::net::TcpListener;
1415
use tokio::sync::mpsc;
1516
use tracing::{info, warn};
1617

18+
/// Shared diagnostic counters readable from outside the engine.
19+
pub struct PrecomputeWorkerDiagnostics {
20+
pub worker_group_counts: Vec<Arc<AtomicUsize>>,
21+
}
22+
1723
/// The top-level precompute engine orchestrator.
1824
///
1925
/// Creates worker threads, the series router, and the Axum ingest server.
2026
pub struct PrecomputeEngine {
2127
config: PrecomputeEngineConfig,
2228
streaming_config: Arc<StreamingConfig>,
2329
output_sink: Arc<dyn OutputSink>,
30+
diagnostics: Arc<PrecomputeWorkerDiagnostics>,
2431
}
2532

2633
impl PrecomputeEngine {
@@ -29,13 +36,25 @@ impl PrecomputeEngine {
2936
streaming_config: Arc<StreamingConfig>,
3037
output_sink: Arc<dyn OutputSink>,
3138
) -> Self {
39+
let worker_group_counts = (0..config.num_workers)
40+
.map(|_| Arc::new(AtomicUsize::new(0)))
41+
.collect();
42+
let diagnostics = Arc::new(PrecomputeWorkerDiagnostics {
43+
worker_group_counts,
44+
});
3245
Self {
3346
config,
3447
streaming_config,
3548
output_sink,
49+
diagnostics,
3650
}
3751
}
3852

53+
/// Get a handle to worker diagnostics, readable even after `run()` starts.
54+
pub fn diagnostics(&self) -> Arc<PrecomputeWorkerDiagnostics> {
55+
self.diagnostics.clone()
56+
}
57+
3958
/// Start the precompute engine. This spawns worker tasks and the HTTP
4059
/// ingest server, then blocks until shutdown.
4160
pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
@@ -63,6 +82,10 @@ impl PrecomputeEngine {
6382
.map(|(&id, cfg)| (id, Arc::new(cfg.clone())))
6483
.collect();
6584

85+
// Build a Vec<Arc<AggregationConfig>> for the ingest handler
86+
let agg_configs_vec: Vec<Arc<AggregationConfig>> =
87+
agg_configs.values().cloned().collect();
88+
6689
// Spawn workers
6790
let mut worker_handles = Vec::with_capacity(num_workers);
6891
for (id, rx) in receivers.into_iter().enumerate() {
@@ -78,6 +101,7 @@ impl PrecomputeEngine {
78101
raw_mode_aggregation_id: self.config.raw_mode_aggregation_id,
79102
late_data_policy: self.config.late_data_policy,
80103
},
104+
self.diagnostics.worker_group_counts[id].clone(),
81105
);
82106
let handle = tokio::spawn(async move {
83107
worker.run().await;
@@ -94,6 +118,8 @@ impl PrecomputeEngine {
94118
let ingest_state = Arc::new(IngestState {
95119
router,
96120
samples_ingested: std::sync::atomic::AtomicU64::new(0),
121+
agg_configs: agg_configs_vec,
122+
pass_raw_samples: self.config.pass_raw_samples,
97123
});
98124

99125
// Start flush timer

asap-query-engine/src/precompute_engine/ingest_handler.rs

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write;
22
use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write;
3-
use crate::precompute_engine::series_router::SeriesRouter;
3+
use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage};
4+
use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_series_key};
45
use axum::{body::Bytes, extract::State, http::StatusCode};
6+
use sketch_db_common::aggregation_config::AggregationConfig;
57
use std::collections::HashMap;
68
use std::sync::Arc;
79
use std::time::Instant;
@@ -11,9 +13,28 @@ use tracing::warn;
1113
pub(crate) struct IngestState {
1214
pub(crate) router: SeriesRouter,
1315
pub(crate) samples_ingested: std::sync::atomic::AtomicU64,
16+
/// Aggregation configs for group-key extraction.
17+
pub(crate) agg_configs: Vec<Arc<AggregationConfig>>,
18+
/// When true, skip group-key extraction and pass raw samples through.
19+
pub(crate) pass_raw_samples: bool,
1420
}
1521

16-
/// Shared logic: group decoded samples by series key and route to workers.
22+
/// Extract the group key (grouping label values joined by semicolons)
23+
/// for a given series key and aggregation config.
24+
fn extract_group_key(series_key: &str, config: &AggregationConfig) -> String {
25+
let labels = parse_labels_from_series_key(series_key);
26+
let mut values = Vec::new();
27+
for label_name in &config.grouping_labels.labels {
28+
if let Some(val) = labels.get(label_name.as_str()) {
29+
values.push(*val);
30+
} else {
31+
values.push("");
32+
}
33+
}
34+
values.join(";")
35+
}
36+
37+
/// Shared logic: group decoded samples by (agg_id, group_key) and route to workers.
1738
async fn route_decoded_samples(
1839
state: &IngestState,
1940
samples: Vec<crate::drivers::ingest::prometheus_remote_write::DecodedSample>,
@@ -28,25 +49,71 @@ async fn route_decoded_samples(
2849
.samples_ingested
2950
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
3051

31-
// Group samples by series key for batch routing
32-
let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new();
52+
if state.pass_raw_samples {
53+
// Raw mode: group by series key and send as RawSamples
54+
let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new();
55+
for s in &samples {
56+
by_series
57+
.entry(&s.labels)
58+
.or_default()
59+
.push((s.timestamp_ms, s.value));
60+
}
61+
let messages: Vec<WorkerMessage> = by_series
62+
.into_iter()
63+
.map(|(k, v)| WorkerMessage::RawSamples {
64+
series_key: k.to_string(),
65+
samples: v,
66+
ingest_received_at,
67+
})
68+
.collect();
69+
70+
if let Err(e) = state
71+
.router
72+
.route_group_batch(messages, ingest_received_at)
73+
.await
74+
{
75+
warn!("Batch routing error: {}", e);
76+
return StatusCode::INTERNAL_SERVER_ERROR;
77+
}
78+
return StatusCode::NO_CONTENT;
79+
}
80+
81+
// Group-by mode: for each sample, find matching agg configs and group by
82+
// (agg_id, group_key). This is the equivalent of Arroyo's GROUP BY.
83+
//
84+
// Key: (agg_id, group_key) → Vec<(series_key, timestamp_ms, value)>
85+
let mut by_group: HashMap<(u64, String), Vec<(String, i64, f64)>> = HashMap::new();
86+
3387
for s in &samples {
34-
by_series
35-
.entry(&s.labels)
36-
.or_default()
37-
.push((s.timestamp_ms, s.value));
88+
let metric_name = extract_metric_name(&s.labels);
89+
for config in &state.agg_configs {
90+
if config.metric != metric_name
91+
&& config.spatial_filter_normalized != metric_name
92+
&& config.spatial_filter != metric_name
93+
{
94+
continue;
95+
}
96+
let group_key = extract_group_key(&s.labels, config);
97+
by_group
98+
.entry((config.aggregation_id, group_key))
99+
.or_default()
100+
.push((s.labels.clone(), s.timestamp_ms, s.value));
101+
}
38102
}
39103

40-
// Convert to owned keys for batch routing
41-
let by_series_owned: HashMap<String, Vec<(i64, f64)>> = by_series
104+
let messages: Vec<WorkerMessage> = by_group
42105
.into_iter()
43-
.map(|(k, v)| (k.to_string(), v))
106+
.map(|((agg_id, group_key), samples)| WorkerMessage::GroupSamples {
107+
agg_id,
108+
group_key,
109+
samples,
110+
ingest_received_at,
111+
})
44112
.collect();
45113

46-
// Route all series to workers concurrently
47114
if let Err(e) = state
48115
.router
49-
.route_batch(by_series_owned, ingest_received_at)
116+
.route_group_batch(messages, ingest_received_at)
50117
.await
51118
{
52119
warn!("Batch routing error: {}", e);

0 commit comments

Comments
 (0)