Skip to content

Commit aec8a18

Browse files
zzylolclaude
andcommitted
fix: address PR review issues - panic paths, no-op flush, missing service
- Replace panic on invalid aggregationId with proper anyhow error - Return Option from get_or_create_group_state to handle unknown agg_id - Fix flush_all no-op by advancing watermark by 1ms to close pending windows - Add missing fake-exporter-spiky service in docker-compose-precompute.yml Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent efeee04 commit aec8a18

3 files changed

Lines changed: 59 additions & 23 deletions

File tree

asap-common/dependencies/rs/sketch_db_common/src/streaming_config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use anyhow::Result;
2-
use core::panic;
32
use serde::{Deserialize, Serialize};
43
use serde_yaml::Value;
54
use std::collections::HashMap;
@@ -83,7 +82,12 @@ impl StreamingConfig {
8382
if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) {
8483
for aggregation_data in aggregations {
8584
if let Some(aggregation_id) = aggregation_data.get("aggregationId") {
86-
let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap();
85+
let aggregation_id_u64 = aggregation_id.as_u64().ok_or_else(|| {
86+
anyhow::anyhow!(
87+
"aggregationId must be a valid u64, got: {:?}",
88+
aggregation_id
89+
)
90+
})?;
8791
let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64);
8892
let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64);
8993
let config = AggregationConfig::from_yaml_data(

asap-query-engine/src/precompute_engine/worker.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -176,22 +176,26 @@ impl Worker {
176176
}
177177

178178
/// Get or create the GroupState for a (agg_id, group_key) pair.
179-
fn get_or_create_group_state(&mut self, agg_id: u64, group_key: &str) -> &mut GroupState {
179+
/// Returns None if agg_id has no matching config.
180+
fn get_or_create_group_state(
181+
&mut self,
182+
agg_id: u64,
183+
group_key: &str,
184+
) -> Option<&mut GroupState> {
180185
let key = (agg_id, group_key.to_string());
181186
if !self.group_states.contains_key(&key) {
182-
if let Some(config) = self.agg_configs.get(&agg_id) {
183-
let gs = GroupState {
184-
window_manager: WindowManager::new(config.window_size, config.slide_interval),
185-
config: Arc::clone(config),
186-
active_panes: BTreeMap::new(),
187-
previous_watermark_ms: i64::MIN,
188-
};
189-
self.group_states.insert(key.clone(), gs);
190-
self.group_count
191-
.store(self.group_states.len(), Ordering::Relaxed);
192-
}
187+
let config = self.agg_configs.get(&agg_id)?;
188+
let gs = GroupState {
189+
window_manager: WindowManager::new(config.window_size, config.slide_interval),
190+
config: Arc::clone(config),
191+
active_panes: BTreeMap::new(),
192+
previous_watermark_ms: i64::MIN,
193+
};
194+
self.group_states.insert(key.clone(), gs);
195+
self.group_count
196+
.store(self.group_states.len(), Ordering::Relaxed);
193197
}
194-
self.group_states.get_mut(&key).unwrap()
198+
self.group_states.get_mut(&key)
195199
}
196200

197201
/// Process a batch of samples for a specific (agg_id, group_key).
@@ -208,7 +212,13 @@ impl Worker {
208212
let allowed_lateness_ms = self.allowed_lateness_ms;
209213
let late_data_policy = self.late_data_policy;
210214

211-
self.get_or_create_group_state(agg_id, group_key);
215+
if self.get_or_create_group_state(agg_id, group_key).is_none() {
216+
warn!(
217+
"Worker {} skipping samples for unknown agg_id={}, group_key={}",
218+
self.id, agg_id, group_key
219+
);
220+
return Ok(());
221+
}
212222
let state = self.group_states.get_mut(&(agg_id, group_key.to_string())).unwrap();
213223

214224
// Find the max timestamp in this batch to advance the watermark
@@ -358,15 +368,15 @@ impl Worker {
358368
let mut emit_batch: Vec<(PrecomputedOutput, Box<dyn AggregateCore>)> = Vec::new();
359369

360370
for ((agg_id, group_key), state) in &mut self.group_states {
361-
let current_wm = state.previous_watermark_ms;
362-
// Use a slightly earlier "previous" to trigger re-checking
363-
// In practice flush just re-runs closed_windows with the same watermark
364-
// which returns empty — the real purpose is to catch windows that
365-
// were missed because watermark advanced within process_group_samples.
366-
// The flush timer is a safety net, not the primary close mechanism.
371+
if state.previous_watermark_ms == i64::MIN {
372+
continue; // No samples received yet for this group
373+
}
374+
// Advance watermark by 1ms beyond current to force-close any windows
375+
// whose end exactly equals the current watermark.
376+
let flush_wm = state.previous_watermark_ms.saturating_add(1);
367377
let closed = state
368378
.window_manager
369-
.closed_windows(state.previous_watermark_ms, current_wm);
379+
.closed_windows(state.previous_watermark_ms, flush_wm);
370380

371381
for window_start in &closed {
372382
let (_, window_end) = state.window_manager.window_bounds(*window_start);

asap-quickstart/docker-compose-precompute.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,28 @@ services:
277277
- "--add-pattern-label"
278278
restart: no
279279

280+
# Spiky pattern - tests sudden spikes/drops
281+
fake-exporter-spiky:
282+
image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0
283+
container_name: asap-fake-exporter-spiky
284+
hostname: fake-exporter-spiky
285+
networks:
286+
- asap-network
287+
expose:
288+
- "50006"
289+
command:
290+
- "--port=50006"
291+
- "--valuescale=1000"
292+
- "--dataset=spiky"
293+
- "--num-labels=3"
294+
- "--num-values-per-label=30,30,30"
295+
- "--metric-type=gauge"
296+
- "--metric-name=sensor_reading"
297+
- "--label-names=region,service,host"
298+
- "--label-value-prefixes=region,svc,host"
299+
- "--add-pattern-label"
300+
restart: no
301+
280302
# Exponential growth - tests non-linear patterns
281303
fake-exporter-exp-up:
282304
image: ghcr.io/projectasap/asap-fake-exporter:v0.2.0

0 commit comments

Comments
 (0)