Skip to content

Commit e5b6abc

Browse files
zzylolclaude
andcommitted
Add pane-based incremental sliding window computation
Route each sample to exactly 1 pane (sub-window of size slide_interval) instead of W overlapping window accumulators. When a window closes, merge its constituent panes via AggregateCore::merge_with(). This reduces per-sample accumulator updates from W to 1 for sliding windows. - Add snapshot_accumulator() to AccumulatorUpdater trait (9 implementations) - Add pane_start_for(), panes_for_window(), slide_interval_ms() to WindowManager - Replace active_windows HashMap with active_panes BTreeMap in worker - Rewrite sample routing and window close logic with pane merging - Extract PrecomputeEngine to engine.rs, ingest handlers to ingest_handler.rs - Update design doc with pane-based architecture Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e903783 commit e5b6abc

7 files changed

Lines changed: 636 additions & 293 deletions

File tree

asap-query-engine/src/precompute_engine/accumulator_factory.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ pub trait AccumulatorUpdater: Send {
2020
/// Extract the final accumulator as a boxed `AggregateCore`.
2121
fn take_accumulator(&mut self) -> Box<dyn AggregateCore>;
2222

23+
/// Non-destructive read of the current accumulator state (clone without reset).
24+
/// Used by pane-based sliding windows to read shared panes.
25+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore>;
26+
2327
/// Reset internal state for reuse (avoids re-allocation).
2428
fn reset(&mut self);
2529

@@ -67,6 +71,10 @@ impl AccumulatorUpdater for SumAccumulatorUpdater {
6771
result
6872
}
6973

74+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
75+
Box::new(self.acc.clone())
76+
}
77+
7078
fn reset(&mut self) {
7179
self.acc = SumAccumulator::new();
7280
}
@@ -113,6 +121,10 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater {
113121
result
114122
}
115123

124+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
125+
Box::new(self.acc.clone())
126+
}
127+
116128
fn reset(&mut self) {
117129
self.acc = MinMaxAccumulator::new(self.sub_type.clone());
118130
}
@@ -175,6 +187,18 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater {
175187
result
176188
}
177189

190+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
191+
match &self.acc {
192+
Some(acc) => Box::new(acc.clone()),
193+
None => Box::new(IncreaseAccumulator::new(
194+
Measurement::new(0.0),
195+
0,
196+
Measurement::new(0.0),
197+
0,
198+
)),
199+
}
200+
}
201+
178202
fn reset(&mut self) {
179203
self.acc = None;
180204
}
@@ -221,6 +245,10 @@ impl AccumulatorUpdater for KllAccumulatorUpdater {
221245
result
222246
}
223247

248+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
249+
Box::new(self.acc.clone())
250+
}
251+
224252
fn reset(&mut self) {
225253
self.acc = DatasketchesKLLAccumulator::new(self.k);
226254
}
@@ -272,6 +300,10 @@ impl AccumulatorUpdater for MultipleSumUpdater {
272300
result
273301
}
274302

303+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
304+
Box::new(self.acc.clone())
305+
}
306+
275307
fn reset(&mut self) {
276308
self.acc = MultipleSumAccumulator::new();
277309
}
@@ -319,6 +351,10 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater {
319351
result
320352
}
321353

354+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
355+
Box::new(self.acc.clone())
356+
}
357+
322358
fn reset(&mut self) {
323359
self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone());
324360
}
@@ -384,6 +420,10 @@ impl AccumulatorUpdater for MultipleIncreaseUpdater {
384420
result
385421
}
386422

423+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
424+
Box::new(self.acc.clone())
425+
}
426+
387427
fn reset(&mut self) {
388428
self.acc = MultipleIncreaseAccumulator::new();
389429
}
@@ -435,6 +475,10 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater {
435475
result
436476
}
437477

478+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
479+
Box::new(self.acc.clone())
480+
}
481+
438482
fn reset(&mut self) {
439483
self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num);
440484
}
@@ -486,6 +530,10 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater {
486530
result
487531
}
488532

533+
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
534+
Box::new(self.acc.clone())
535+
}
536+
489537
fn reset(&mut self) {
490538
self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k);
491539
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use crate::data_model::StreamingConfig;
2+
use crate::precompute_engine::config::PrecomputeEngineConfig;
3+
use crate::precompute_engine::ingest_handler::{
4+
handle_prometheus_ingest, handle_victoriametrics_ingest, IngestState,
5+
};
6+
use crate::precompute_engine::output_sink::OutputSink;
7+
use crate::precompute_engine::series_router::{SeriesRouter, WorkerMessage};
8+
use crate::precompute_engine::worker::Worker;
9+
use axum::{routing::post, Router};
10+
use std::collections::HashMap;
11+
use std::sync::Arc;
12+
use tokio::net::TcpListener;
13+
use tokio::sync::mpsc;
14+
use tracing::{info, warn};
15+
16+
/// The top-level precompute engine orchestrator.
17+
///
18+
/// Creates worker threads, the series router, and the Axum ingest server.
19+
pub struct PrecomputeEngine {
20+
config: PrecomputeEngineConfig,
21+
streaming_config: Arc<StreamingConfig>,
22+
output_sink: Arc<dyn OutputSink>,
23+
}
24+
25+
impl PrecomputeEngine {
26+
pub fn new(
27+
config: PrecomputeEngineConfig,
28+
streaming_config: Arc<StreamingConfig>,
29+
output_sink: Arc<dyn OutputSink>,
30+
) -> Self {
31+
Self {
32+
config,
33+
streaming_config,
34+
output_sink,
35+
}
36+
}
37+
38+
/// Start the precompute engine. This spawns worker tasks and the HTTP
39+
/// ingest server, then blocks until shutdown.
40+
pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
41+
let num_workers = self.config.num_workers;
42+
let channel_size = self.config.channel_buffer_size;
43+
44+
// Build MPSC channels for each worker
45+
let mut senders = Vec::with_capacity(num_workers);
46+
let mut receivers = Vec::with_capacity(num_workers);
47+
for _ in 0..num_workers {
48+
let (tx, rx) = mpsc::channel::<WorkerMessage>(channel_size);
49+
senders.push(tx);
50+
receivers.push(rx);
51+
}
52+
53+
// Build the router
54+
let router = SeriesRouter::new(senders);
55+
56+
// Build aggregation config map from streaming config
57+
let agg_configs: HashMap<u64, _> =
58+
self.streaming_config.get_all_aggregation_configs().clone();
59+
60+
// Spawn workers
61+
let mut worker_handles = Vec::with_capacity(num_workers);
62+
for (id, rx) in receivers.into_iter().enumerate() {
63+
let worker = Worker::new(
64+
id,
65+
rx,
66+
self.output_sink.clone(),
67+
agg_configs.clone(),
68+
self.config.max_buffer_per_series,
69+
self.config.allowed_lateness_ms,
70+
self.config.pass_raw_samples,
71+
self.config.raw_mode_aggregation_id,
72+
self.config.late_data_policy,
73+
);
74+
let handle = tokio::spawn(async move {
75+
worker.run().await;
76+
});
77+
worker_handles.push(handle);
78+
}
79+
80+
info!(
81+
"PrecomputeEngine started with {} workers on port {}",
82+
num_workers, self.config.ingest_port
83+
);
84+
85+
// Build the ingest state
86+
let ingest_state = Arc::new(IngestState {
87+
router,
88+
samples_ingested: std::sync::atomic::AtomicU64::new(0),
89+
});
90+
91+
// Start flush timer
92+
let flush_state = ingest_state.clone();
93+
let flush_interval_ms = self.config.flush_interval_ms;
94+
tokio::spawn(async move {
95+
let mut interval =
96+
tokio::time::interval(tokio::time::Duration::from_millis(flush_interval_ms));
97+
loop {
98+
interval.tick().await;
99+
if let Err(e) = flush_state.router.broadcast_flush().await {
100+
warn!("Flush broadcast error: {}", e);
101+
break;
102+
}
103+
}
104+
});
105+
106+
// Start the Axum HTTP server for ingest (Prometheus + VictoriaMetrics)
107+
let app = Router::new()
108+
.route("/api/v1/write", post(handle_prometheus_ingest))
109+
.route("/api/v1/import", post(handle_victoriametrics_ingest))
110+
.with_state(ingest_state);
111+
112+
let addr = format!("0.0.0.0:{}", self.config.ingest_port);
113+
info!("Ingest server listening on {}", addr);
114+
115+
let listener = TcpListener::bind(&addr).await?;
116+
axum::serve(listener, app).await?;
117+
118+
// Wait for workers to finish (this only happens on shutdown)
119+
for handle in worker_handles {
120+
let _ = handle.await;
121+
}
122+
123+
Ok(())
124+
}
125+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use crate::drivers::ingest::prometheus_remote_write::decode_prometheus_remote_write;
2+
use crate::drivers::ingest::victoriametrics_remote_write::decode_victoriametrics_remote_write;
3+
use crate::precompute_engine::series_router::SeriesRouter;
4+
use axum::{body::Bytes, extract::State, http::StatusCode};
5+
use std::collections::HashMap;
6+
use std::sync::Arc;
7+
use std::time::Instant;
8+
use tracing::warn;
9+
10+
/// Shared state for the ingest HTTP handler.
11+
pub(crate) struct IngestState {
12+
pub(crate) router: SeriesRouter,
13+
pub(crate) samples_ingested: std::sync::atomic::AtomicU64,
14+
}
15+
16+
/// Shared logic: group decoded samples by series key and route to workers.
17+
async fn route_decoded_samples(
18+
state: &IngestState,
19+
samples: Vec<crate::drivers::ingest::prometheus_remote_write::DecodedSample>,
20+
ingest_received_at: Instant,
21+
) -> StatusCode {
22+
if samples.is_empty() {
23+
return StatusCode::NO_CONTENT;
24+
}
25+
26+
let count = samples.len() as u64;
27+
state
28+
.samples_ingested
29+
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
30+
31+
// Group samples by series key for batch routing
32+
let mut by_series: HashMap<&str, Vec<(i64, f64)>> = HashMap::new();
33+
for s in &samples {
34+
by_series
35+
.entry(&s.labels)
36+
.or_default()
37+
.push((s.timestamp_ms, s.value));
38+
}
39+
40+
// Convert to owned keys for batch routing
41+
let by_series_owned: HashMap<String, Vec<(i64, f64)>> = by_series
42+
.into_iter()
43+
.map(|(k, v)| (k.to_string(), v))
44+
.collect();
45+
46+
// Route all series to workers concurrently
47+
if let Err(e) = state
48+
.router
49+
.route_batch(by_series_owned, ingest_received_at)
50+
.await
51+
{
52+
warn!("Batch routing error: {}", e);
53+
return StatusCode::INTERNAL_SERVER_ERROR;
54+
}
55+
56+
StatusCode::NO_CONTENT
57+
}
58+
59+
/// Axum handler for Prometheus remote write (Snappy + Protobuf).
60+
pub(crate) async fn handle_prometheus_ingest(
61+
State(state): State<Arc<IngestState>>,
62+
body: Bytes,
63+
) -> StatusCode {
64+
let ingest_received_at = Instant::now();
65+
let samples = match decode_prometheus_remote_write(&body) {
66+
Ok(s) => s,
67+
Err(e) => {
68+
warn!("Failed to decode Prometheus remote write: {}", e);
69+
return StatusCode::BAD_REQUEST;
70+
}
71+
};
72+
route_decoded_samples(&state, samples, ingest_received_at).await
73+
}
74+
75+
/// Axum handler for VictoriaMetrics remote write (Zstd + Protobuf).
76+
pub(crate) async fn handle_victoriametrics_ingest(
77+
State(state): State<Arc<IngestState>>,
78+
body: Bytes,
79+
) -> StatusCode {
80+
let ingest_received_at = Instant::now();
81+
let samples = match decode_victoriametrics_remote_write(&body) {
82+
Ok(s) => s,
83+
Err(e) => {
84+
warn!("Failed to decode VictoriaMetrics remote write: {}", e);
85+
return StatusCode::BAD_REQUEST;
86+
}
87+
};
88+
route_decoded_samples(&state, samples, ingest_received_at).await
89+
}

0 commit comments

Comments
 (0)