diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 68a0b89..d43d22c 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -734,19 +734,26 @@ fn hydra_kll_params(config: &AggregationConfig) -> (usize, usize, u16) { /// Extract `(row_num, col_num, heap_size)` for CountMinSketchWithHeap configs. /// /// Accepts the planner/Arroyo-canonical `depth`/`width`/`heapsize` names first, -/// then falls back to the `row_num`/`col_num`/`heap_size` aliases. Defaults -/// mirror the planner sketch defaults (depth 3, width 1024) with a heap of 32. -fn cms_heap_params(config: &AggregationConfig) -> (usize, usize, usize) { - let read = |names: &[&str], default: u64| -> usize { +/// then falls back to the `row_num`/`col_num`/`heap_size` aliases. All three +/// parameters are required — the planner always emits them and their absence +/// indicates a malformed config. +fn cms_heap_params(config: &AggregationConfig) -> Result<(usize, usize, usize), String> { + let read = |names: &[&str]| -> Result { names .iter() .find_map(|n| config.parameters.get(*n).and_then(|v| v.as_u64())) - .unwrap_or(default) as usize + .map(|v| v as usize) + .ok_or_else(|| { + format!( + "CountMinSketchWithHeap config missing required parameter (tried: {})", + names.join(", ") + ) + }) }; - let row_num = read(&["depth", "row_num"], 3); - let col_num = read(&["width", "col_num"], 1024); - let heap_size = read(&["heapsize", "heap_size"], 32); - (row_num, col_num, heap_size) + let row_num = read(&["depth", "row_num"])?; + let col_num = read(&["width", "col_num"])?; + let heap_size = read(&["heapsize", "heap_size"])?; + Ok((row_num, col_num, heap_size)) } /// Whether a CountMinSketchWithHeap config should count events (weight 1 per @@ -787,84 +794,97 @@ fn hll_precision_param(config: &AggregationConfig) -> u32 { // --------------------------------------------------------------------------- /// Create an appropriate `AccumulatorUpdater` from an `AggregationConfig`. -pub fn create_accumulator_updater(config: &AggregationConfig) -> Box { +/// +/// Returns `Err` if the config is of a type that requires specific parameters +/// (e.g. `CountMinSketchWithHeap`) but those parameters are absent or invalid. +pub fn create_accumulator_updater( + config: &AggregationConfig, +) -> Result, String> { let sub_type = config.aggregation_sub_type.as_str(); match config.aggregation_type { AggregationType::SingleSubpopulation => match sub_type { - "Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()), - "Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new(false)), - "Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new(true)), - "Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()), + "Sum" | "sum" => Ok(Box::new(SumAccumulatorUpdater::new())), + "Min" | "min" => Ok(Box::new(MinMaxAccumulatorUpdater::new(false))), + "Max" | "max" => Ok(Box::new(MinMaxAccumulatorUpdater::new(true))), + "Increase" | "increase" => Ok(Box::new(IncreaseAccumulatorUpdater::new())), "DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => { - Box::new(KllAccumulatorUpdater::new(kll_k_param(config))) + Ok(Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))) } other => { tracing::warn!( "Unknown SingleSubpopulation sub_type '{}', defaulting to Sum", other ); - Box::new(SumAccumulatorUpdater::new()) + Ok(Box::new(SumAccumulatorUpdater::new())) } }, AggregationType::MultipleSubpopulation => match sub_type { - "Sum" | "sum" => Box::new(MultipleSumAccumulatorUpdater::new()), - "Min" | "min" => Box::new(MultipleMinMaxAccumulatorUpdater::new(false)), - "Max" | "max" => Box::new(MultipleMinMaxAccumulatorUpdater::new(true)), - "Increase" | "increase" => Box::new(MultipleIncreaseAccumulatorUpdater::new()), + "Sum" | "sum" => Ok(Box::new(MultipleSumAccumulatorUpdater::new())), + "Min" | "min" => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(false))), + "Max" | "max" => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(true))), + "Increase" | "increase" => Ok(Box::new(MultipleIncreaseAccumulatorUpdater::new())), "CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => { let (row_num, col_num) = cms_params(config); - Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) + Ok(Box::new(CmsAccumulatorUpdater::new(row_num, col_num))) } "HydraKLL" | "hydra_kll" => { let (row_num, col_num, k) = hydra_kll_params(config); - Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k)) + Ok(Box::new(HydraKllAccumulatorUpdater::new( + row_num, col_num, k, + ))) } other => { tracing::warn!( "Unknown MultipleSubpopulation sub_type '{}', defaulting to Sum", other ); - Box::new(MultipleSumAccumulatorUpdater::new()) + Ok(Box::new(MultipleSumAccumulatorUpdater::new())) } }, AggregationType::DatasketchesKLL => { - Box::new(KllAccumulatorUpdater::new(kll_k_param(config))) + Ok(Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))) + } + AggregationType::MultipleSum => Ok(Box::new(MultipleSumAccumulatorUpdater::new())), + AggregationType::MultipleIncrease => { + Ok(Box::new(MultipleIncreaseAccumulatorUpdater::new())) } - AggregationType::MultipleSum => Box::new(MultipleSumAccumulatorUpdater::new()), - AggregationType::MultipleIncrease => Box::new(MultipleIncreaseAccumulatorUpdater::new()), - AggregationType::MultipleMinMax => Box::new(MultipleMinMaxAccumulatorUpdater::new( + AggregationType::MultipleMinMax => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new( sub_type.eq_ignore_ascii_case("max"), - )), - AggregationType::Sum => Box::new(SumAccumulatorUpdater::new()), - AggregationType::MinMax => Box::new(MinMaxAccumulatorUpdater::new( + ))), + AggregationType::Sum => Ok(Box::new(SumAccumulatorUpdater::new())), + AggregationType::MinMax => Ok(Box::new(MinMaxAccumulatorUpdater::new( sub_type.eq_ignore_ascii_case("max"), - )), - AggregationType::Increase => Box::new(IncreaseAccumulatorUpdater::new()), + ))), + AggregationType::Increase => Ok(Box::new(IncreaseAccumulatorUpdater::new())), AggregationType::CountMinSketch => { let (row_num, col_num) = cms_params(config); - Box::new(CmsAccumulatorUpdater::new(row_num, col_num)) + Ok(Box::new(CmsAccumulatorUpdater::new(row_num, col_num))) } AggregationType::CountMinSketchWithHeap => { - let (row_num, col_num, heap_size) = cms_heap_params(config); - Box::new(CmsWithHeapAccumulatorUpdater::new( + let (row_num, col_num, heap_size) = cms_heap_params(config)?; + Ok(Box::new(CmsWithHeapAccumulatorUpdater::new( row_num, col_num, heap_size, cms_count_events(config), - )) + ))) } AggregationType::HydraKLL => { let (row_num, col_num, k) = hydra_kll_params(config); - Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k)) + Ok(Box::new(HydraKllAccumulatorUpdater::new( + row_num, col_num, k, + ))) } - AggregationType::HLL => Box::new(HllAccumulatorUpdater::new(hll_precision_param(config))), + AggregationType::HLL => Ok(Box::new(HllAccumulatorUpdater::new(hll_precision_param( + config, + )))), other => { tracing::warn!( "Unknown aggregation_type '{:?}', defaulting to SingleSubpopulation Sum", other ); - Box::new(SumAccumulatorUpdater::new()) + Ok(Box::new(SumAccumulatorUpdater::new())) } } } @@ -1017,7 +1037,7 @@ mod tests { (AggregationType::CountMinSketch, ""), ] { let config = make_config(*agg_type, sub_type); - let updater = create_accumulator_updater(&config); + let updater = create_accumulator_updater(&config).unwrap(); assert_eq!( config_is_keyed(&config), updater.is_keyed(), @@ -1052,7 +1072,7 @@ mod tests { None, None, ); - let mut updater = create_accumulator_updater(&config); + let mut updater = create_accumulator_updater(&config).unwrap(); assert!( !updater.is_keyed(), "HLL is single-population per grouping key (like KLL), not keyed", @@ -1107,7 +1127,7 @@ mod tests { None, None, ); - let updater = create_accumulator_updater(&config); + let updater = create_accumulator_updater(&config).unwrap(); let acc = updater.snapshot_accumulator(); let hll = acc .as_any() @@ -1140,7 +1160,7 @@ mod tests { None, None, ); - let updater = create_accumulator_updater(&config); + let updater = create_accumulator_updater(&config).unwrap(); let acc = updater.snapshot_accumulator(); let hll = acc .as_any() @@ -1174,7 +1194,7 @@ mod tests { None, None, ); - let mut updater = create_accumulator_updater(&config); + let mut updater = create_accumulator_updater(&config).unwrap(); for i in 0..50 { updater.update_single(i as f64, 0); } @@ -1212,7 +1232,7 @@ mod tests { None, None, ); - let updater = create_accumulator_updater(&config); + let updater = create_accumulator_updater(&config).unwrap(); let acc = updater.snapshot_accumulator(); let kll = acc .as_any() @@ -1221,6 +1241,14 @@ mod tests { assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param"); } + fn cms_heap_params_required() -> std::collections::HashMap { + let mut p = std::collections::HashMap::new(); + p.insert("depth".to_string(), serde_json::json!(3_u64)); + p.insert("width".to_string(), serde_json::json!(1024_u64)); + p.insert("heapsize".to_string(), serde_json::json!(32_u64)); + p + } + fn cms_heap_config( parameters: std::collections::HashMap, ) -> AggregationConfig { @@ -1251,8 +1279,8 @@ mod tests { fn test_cms_with_heap_factory_routes_to_heap_accumulator_and_is_keyed() { // CountMinSketchWithHeap must build a CmsWithHeapAccumulatorUpdater whose // accumulator exposes the heap (get_keys), NOT a plain CMS (no heap). - let config = cms_heap_config(std::collections::HashMap::new()); - let updater = create_accumulator_updater(&config); + let config = cms_heap_config(cms_heap_params_required()); + let updater = create_accumulator_updater(&config).unwrap(); assert!(updater.is_keyed(), "CMS-with-heap top-k is keyed by srcip"); let acc = updater.snapshot_accumulator(); @@ -1271,8 +1299,8 @@ mod tests { fn test_cms_with_heap_count_events_uses_unit_weight() { // count_events (the default) → each observation contributes weight 1, so // the per-key estimate is the EVENT COUNT, not the sum of sample values. - let config = cms_heap_config(std::collections::HashMap::new()); - let mut updater = create_accumulator_updater(&config); + let config = cms_heap_config(cms_heap_params_required()); + let mut updater = create_accumulator_updater(&config).unwrap(); let key = KeyByLabelValues::new_with_labels(vec!["10.0.0.1".to_string()]); // Feed 5 events with large values; count semantics must yield ~5, not ~Σvalue. @@ -1294,10 +1322,10 @@ mod tests { #[test] fn test_cms_with_heap_count_events_false_sums_values() { // count_events=false → weight is the sample value, giving SUM semantics. - let mut params = std::collections::HashMap::new(); + let mut params = cms_heap_params_required(); params.insert("count_events".to_string(), serde_json::json!(false)); let config = cms_heap_config(params); - let mut updater = create_accumulator_updater(&config); + let mut updater = create_accumulator_updater(&config).unwrap(); let key = KeyByLabelValues::new_with_labels(vec!["10.0.0.1".to_string()]); for _ in 0..5 { @@ -1318,14 +1346,14 @@ mod tests { params.insert("width".to_string(), serde_json::json!(2048)); params.insert("heapsize".to_string(), serde_json::json!(40)); let config = cms_heap_config(params); - assert_eq!(cms_heap_params(&config), (4, 2048, 40)); + assert_eq!(cms_heap_params(&config).unwrap(), (4, 2048, 40)); assert!(cms_count_events(&config), "count_events defaults to true"); } #[test] fn test_cms_with_heap_reset_clears_state() { - let config = cms_heap_config(std::collections::HashMap::new()); - let mut updater = create_accumulator_updater(&config); + let config = cms_heap_config(cms_heap_params_required()); + let mut updater = create_accumulator_updater(&config).unwrap(); let key = KeyByLabelValues::new_with_labels(vec!["k".to_string()]); for _ in 0..10 { updater.update_keyed(&key, 1.0, 0); diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 0220799..928c123 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -370,7 +370,7 @@ impl Worker { continue; } LateDataPolicy::ForwardToStore => { - let mut updater = create_accumulator_updater(&state.config); + let mut updater = create_accumulator_updater(&state.config)?; apply_sample(&mut *updater, series_key, *val, *ts, &state.config); let key = build_group_key_label_values(group_key); let output = PrecomputedOutput::new( @@ -390,10 +390,12 @@ impl Worker { } // Normal path: route sample to its single pane accumulator - let updater = state - .active_panes - .entry(pane_start) - .or_insert_with(|| create_accumulator_updater(&state.config)); + let updater = match state.active_panes.entry(pane_start) { + std::collections::btree_map::Entry::Occupied(e) => e.into_mut(), + std::collections::btree_map::Entry::Vacant(e) => { + e.insert(create_accumulator_updater(&state.config)?) + } + }; apply_sample(&mut **updater, series_key, *val, *ts, &state.config); }