diff --git a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs index 11824474..66495916 100644 --- a/asap-query-engine/src/drivers/query/adapters/elastic_http.rs +++ b/asap-query-engine/src/drivers/query/adapters/elastic_http.rs @@ -91,7 +91,7 @@ impl QueryRequestAdapter for ElasticHttpAdapter { match self.config.language { QueryLanguage::elastic_sql => "/_sql", QueryLanguage::elastic_querydsl => "/_search", - _ => panic!("Invalid query language configured for Elastic"), + _ => unreachable!("Elastic adapter config is validated at startup"), } } diff --git a/asap-query-engine/src/engines/simple_engine/mod.rs b/asap-query-engine/src/engines/simple_engine/mod.rs index 7d477d8b..ff01e98e 100644 --- a/asap-query-engine/src/engines/simple_engine/mod.rs +++ b/asap-query-engine/src/engines/simple_engine/mod.rs @@ -16,6 +16,7 @@ use std::sync::{Arc, RwLock}; use std::time::Instant; use tracing::{debug, warn}; +use crate::precompute_operators::AccumulatorError; use crate::AggregateCore; use asap_types::enums::WindowType; @@ -1050,16 +1051,22 @@ impl SimpleEngine { debug!(" Merging accumulators (should_merge=true)"); #[cfg(feature = "extra_debugging")] let merge_start = Instant::now(); - let merged_accumulator = self.merge_accumulators(&precomputes); - #[cfg(feature = "extra_debugging")] - let merge_duration = merge_start.elapsed(); - #[cfg(feature = "extra_debugging")] - debug!( - " Merge completed in {:.2}ms, result type: {}", - merge_duration.as_secs_f64() * 1000.0, - merged_accumulator.get_accumulator_type() - ); - merged.insert(key.clone(), merged_accumulator); + match self.merge_accumulators(&precomputes) { + Ok(merged_accumulator) => { + #[cfg(feature = "extra_debugging")] + let merge_duration = merge_start.elapsed(); + #[cfg(feature = "extra_debugging")] + debug!( + " Merge completed in {:.2}ms, result type: {}", + merge_duration.as_secs_f64() * 1000.0, + merged_accumulator.get_accumulator_type() + ); + merged.insert(key.clone(), merged_accumulator); + } + Err(e) => { + warn!("Failed to merge accumulators for key {:?}: {}", key, e); + } + } } else { assert_eq!( precomputes.len(), @@ -1088,13 +1095,13 @@ impl SimpleEngine { fn merge_accumulators( &self, accumulators: &[Box], - ) -> Box { + ) -> Result, AccumulatorError> { if accumulators.is_empty() { - panic!("No accumulators to merge"); + return Err(AccumulatorError::EmptySlice); } if accumulators.len() == 1 { - return accumulators[0].clone_boxed_core(); + return Ok(accumulators[0].clone_boxed_core()); } // Try to use optimized batch merge for KLL accumulators @@ -1102,7 +1109,7 @@ impl SimpleEngine { use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator; match DatasketchesKLLAccumulator::merge_multiple(accumulators) { - Ok(merged) => return Box::new(merged), + Ok(merged) => return Ok(Box::new(merged)), Err(e) => { warn!( "Batch merge failed: {}. Falling back to sequential merge.", @@ -1118,7 +1125,7 @@ impl SimpleEngine { use crate::precompute_operators::count_min_sketch_accumulator::CountMinSketchAccumulator; match CountMinSketchAccumulator::merge_multiple(accumulators) { - Ok(merged) => return Box::new(merged), + Ok(merged) => return Ok(Box::new(merged)), Err(e) => { warn!( "Batch merge failed: {}. Falling back to sequential merge.", @@ -1145,7 +1152,7 @@ impl SimpleEngine { } } - result + Ok(result) } /// Collects results when key and value use different aggregations diff --git a/asap-query-engine/src/engines/simple_engine/sql.rs b/asap-query-engine/src/engines/simple_engine/sql.rs index 9ef4dd53..eb299013 100644 --- a/asap-query-engine/src/engines/simple_engine/sql.rs +++ b/asap-query-engine/src/engines/simple_engine/sql.rs @@ -265,9 +265,9 @@ impl SimpleEngine { (QueryType::Spatial, QueryType::TemporalQuantile) => { QueryPatternType::OneTemporalOneSpatial } - _ => panic!("Unsupported query type found"), + _ => return None, }, - _ => panic!("Unsupported query type found"), + _ => return None, }; // For nested queries (spatial of temporal), the outer query has no time clause, diff --git a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs index f323426a..cc9a8580 100644 --- a/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs @@ -5,6 +5,7 @@ use crate::data_model::{ use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack}; use serde_json::Value; use std::collections::{HashMap, HashSet}; +use tracing::warn; use promql_utilities::query_logics::enums::Statistic; @@ -247,7 +248,11 @@ impl AggregateCore for DeltaSetAggregatorAccumulator { fn get_keys(&self) -> Option> { if !self.removed.is_empty() { - panic!("DeltaSetAggregatorAccumulator does not support get_keys when removed items are present"); + warn!( + "DeltaSetAggregatorAccumulator::get_keys called with {} removed items; returning None", + self.removed.len() + ); + return None; } Some(self.added.iter().cloned().collect()) } diff --git a/asap-query-engine/src/precompute_operators/error.rs b/asap-query-engine/src/precompute_operators/error.rs new file mode 100644 index 00000000..739fda66 --- /dev/null +++ b/asap-query-engine/src/precompute_operators/error.rs @@ -0,0 +1,30 @@ +use std::fmt; + +#[derive(Debug, PartialEq)] +pub enum AccumulatorError { + /// Returned by constructors when `sub_type` is not "min" or "max". + InvalidSubType(String), + /// Returned by `SimpleEngine::merge_accumulators` when called with an empty + /// slice. This is a programming error (violated precondition), not a domain + /// error from the `MergeableAccumulator` trait impls — those erase their + /// errors into `Box` to accommodate heterogeneous accumulator + /// types (KLL, CMS, etc.) that produce library-specific errors. + EmptySlice, + /// Returned when merging accumulators whose `sub_type` fields disagree. + MergeTypeMismatch { expected: String, got: String }, +} + +impl fmt::Display for AccumulatorError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidSubType(s) => write!(f, "sub_type must be 'min' or 'max', got '{s}'"), + Self::EmptySlice => write!(f, "merge_accumulators called with empty slice"), + Self::MergeTypeMismatch { expected, got } => write!( + f, + "cannot merge accumulators: expected sub_type '{expected}', got '{got}'" + ), + } + } +} + +impl std::error::Error for AccumulatorError {} diff --git a/asap-query-engine/src/precompute_operators/min_max_accumulator.rs b/asap-query-engine/src/precompute_operators/min_max_accumulator.rs index b4763626..96f08133 100644 --- a/asap-query-engine/src/precompute_operators/min_max_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/min_max_accumulator.rs @@ -1,3 +1,4 @@ +use super::error::AccumulatorError; use crate::data_model::{ AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink, SingleSubpopulationAggregate, SingleSubpopulationAggregateFactory, @@ -29,19 +30,19 @@ impl MinMaxAccumulator { } } - pub fn new(sub_type: String) -> Self { + pub fn new(sub_type: String) -> Result { match sub_type.as_str() { - "min" => Self::new_min(), - "max" => Self::new_max(), - _ => panic!("sub_type must be 'min' or 'max'"), + "min" => Ok(Self::new_min()), + "max" => Ok(Self::new_max()), + _ => Err(AccumulatorError::InvalidSubType(sub_type)), } } - pub fn with_value(value: f64, sub_type: String) -> Self { + pub fn with_value(value: f64, sub_type: String) -> Result { if sub_type != "min" && sub_type != "max" { - panic!("sub_type must be 'min' or 'max'"); + return Err(AccumulatorError::InvalidSubType(sub_type)); } - Self { value, sub_type } + Ok(Self { value, sub_type }) } pub fn update(&mut self, value: f64) { @@ -56,7 +57,7 @@ impl MinMaxAccumulator { self.value = value; } } - _ => panic!("Invalid sub_type"), + _ => unreachable!("MinMaxAccumulator sub_type is always 'min' or 'max'"), } } @@ -73,7 +74,7 @@ impl MinMaxAccumulator { return Err("sub_type must be 'min' or 'max'".into()); } - Ok(Self::with_value(value, sub_type)) + Ok(Self::with_value(value, sub_type)?) } pub fn deserialize_from_bytes(buffer: &[u8]) -> Result> { @@ -91,7 +92,7 @@ impl MinMaxAccumulator { _ => return Err("Invalid sub_type byte".into()), }; - Ok(Self::with_value(value, sub_type)) + Ok(Self::with_value(value, sub_type)?) } } @@ -108,7 +109,7 @@ impl SerializableToSink for MinMaxAccumulator { let sub_type_byte = match self.sub_type.as_str() { "min" => 0u8, "max" => 1u8, - _ => panic!("Invalid sub_type"), + _ => unreachable!("MinMaxAccumulator sub_type is always 'min' or 'max'"), }; bytes.push(sub_type_byte); bytes @@ -120,7 +121,7 @@ impl MergeableAccumulator for MinMaxAccumulator { accumulators: Vec, ) -> Result> { if accumulators.is_empty() { - return Err("No accumulators to merge".into()); + return Err(AccumulatorError::EmptySlice.into()); } let sub_type = &accumulators[0].sub_type; @@ -128,11 +129,15 @@ impl MergeableAccumulator for MinMaxAccumulator { // Verify all accumulators have the same sub_type for acc in &accumulators { if acc.sub_type != *sub_type { - return Err("Cannot merge accumulators with different sub_types".into()); + return Err(AccumulatorError::MergeTypeMismatch { + expected: sub_type.clone(), + got: acc.sub_type.clone(), + } + .into()); } } - let mut result = MinMaxAccumulator::new(sub_type.clone()); + let mut result = MinMaxAccumulator::new(sub_type.clone())?; for acc in accumulators { result.update(acc.value); @@ -282,7 +287,7 @@ impl SingleSubpopulationAggregateFactory for MinMaxAccumulatorFactory { Ok(Box::new(MinMaxAccumulator::with_value( result_value, self.sub_type.clone(), - ))) + )?)) } fn create_default(&self) -> Box { @@ -330,9 +335,9 @@ mod tests { #[test] fn test_merge_min_accumulators() { - let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string()); - let acc2 = MinMaxAccumulator::with_value(5.0, "min".to_string()); - let acc3 = MinMaxAccumulator::with_value(15.0, "min".to_string()); + let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string()).unwrap(); + let acc2 = MinMaxAccumulator::with_value(5.0, "min".to_string()).unwrap(); + let acc3 = MinMaxAccumulator::with_value(15.0, "min".to_string()).unwrap(); let merged = >::merge_accumulators( @@ -345,9 +350,9 @@ mod tests { #[test] fn test_merge_max_accumulators() { - let acc1 = MinMaxAccumulator::with_value(10.0, "max".to_string()); - let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string()); - let acc3 = MinMaxAccumulator::with_value(15.0, "max".to_string()); + let acc1 = MinMaxAccumulator::with_value(10.0, "max".to_string()).unwrap(); + let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string()).unwrap(); + let acc3 = MinMaxAccumulator::with_value(15.0, "max".to_string()).unwrap(); let merged = >::merge_accumulators( @@ -360,8 +365,8 @@ mod tests { #[test] fn test_merge_different_types_error() { - let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string()); - let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string()); + let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string()).unwrap(); + let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string()).unwrap(); assert!( >::merge_accumulators( @@ -373,7 +378,7 @@ mod tests { #[test] fn test_serialization() { - let acc = MinMaxAccumulator::with_value(42.5, "min".to_string()); + let acc = MinMaxAccumulator::with_value(42.5, "min".to_string()).unwrap(); // Test JSON serialization let json = acc.serialize_to_json(); @@ -391,10 +396,22 @@ mod tests { #[test] fn test_single_subpopulation_aggregate_trait() { let acc: Box = - Box::new(MinMaxAccumulator::with_value(42.0, "max".to_string())); + Box::new(MinMaxAccumulator::with_value(42.0, "max".to_string()).unwrap()); assert_eq!(acc.query(Statistic::Max, None).unwrap(), 42.0); assert!(acc.query(Statistic::Min, None).is_err()); assert_eq!(acc.type_name(), "MinMaxAccumulator"); } + + #[test] + fn test_new_invalid_sub_type() { + let err = MinMaxAccumulator::new("mean".to_string()).unwrap_err(); + assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "mean")); + } + + #[test] + fn test_with_value_invalid_sub_type() { + let err = MinMaxAccumulator::with_value(1.0, "sum".to_string()).unwrap_err(); + assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "sum")); + } } diff --git a/asap-query-engine/src/precompute_operators/mod.rs b/asap-query-engine/src/precompute_operators/mod.rs index fbbff1cc..76ed652d 100644 --- a/asap-query-engine/src/precompute_operators/mod.rs +++ b/asap-query-engine/src/precompute_operators/mod.rs @@ -2,6 +2,7 @@ pub mod count_min_sketch_accumulator; pub mod count_min_sketch_with_heap_accumulator; pub mod datasketches_kll_accumulator; pub mod delta_set_aggregator_accumulator; +pub mod error; pub mod hydra_kll_accumulator; pub mod increase_accumulator; pub mod min_max_accumulator; @@ -15,6 +16,7 @@ pub use count_min_sketch_accumulator::*; pub use count_min_sketch_with_heap_accumulator::*; pub use datasketches_kll_accumulator::*; pub use delta_set_aggregator_accumulator::*; +pub use error::AccumulatorError; pub use hydra_kll_accumulator::*; pub use increase_accumulator::*; pub use min_max_accumulator::*; diff --git a/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs b/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs index 2984cf30..5bc96f60 100644 --- a/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs +++ b/asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs @@ -1,3 +1,4 @@ +use super::error::AccumulatorError; use crate::data_model::{ AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator, MultipleSubpopulationAggregate, SerializableToSink, @@ -17,31 +18,40 @@ pub struct MultipleMinMaxAccumulator { } impl MultipleMinMaxAccumulator { - pub fn new(sub_type: String) -> Self { + pub fn new(sub_type: String) -> Result { if sub_type != "min" && sub_type != "max" { - panic!("sub_type must be 'min' or 'max'"); + return Err(AccumulatorError::InvalidSubType(sub_type)); } - Self { + Ok(Self { values: HashMap::new(), sub_type, - } + }) } pub fn new_min() -> Self { - Self::new("min".to_string()) + Self { + values: HashMap::new(), + sub_type: "min".to_string(), + } } pub fn new_max() -> Self { - Self::new("max".to_string()) + Self { + values: HashMap::new(), + sub_type: "max".to_string(), + } } - pub fn new_with_values(values: HashMap, sub_type: String) -> Self { + pub fn new_with_values( + values: HashMap, + sub_type: String, + ) -> Result { if sub_type != "min" && sub_type != "max" { - panic!("sub_type must be 'min' or 'max'"); + return Err(AccumulatorError::InvalidSubType(sub_type)); } - Self { values, sub_type } + Ok(Self { values, sub_type }) } pub fn update(&mut self, key: KeyByLabelValues, value: f64) { @@ -58,7 +68,7 @@ impl MultipleMinMaxAccumulator { *current = value; } } - _ => panic!("Invalid sub_type"), + _ => unreachable!("MultipleMinMaxAccumulator sub_type is always 'min' or 'max'"), } } @@ -301,7 +311,7 @@ impl MergeableAccumulator for MultipleMinMaxAccumulat accumulators: Vec, ) -> Result> { if accumulators.is_empty() { - return Err("No accumulators to merge".into()); + return Err(AccumulatorError::EmptySlice.into()); } let sub_type = accumulators[0].sub_type.clone(); @@ -309,11 +319,15 @@ impl MergeableAccumulator for MultipleMinMaxAccumulat // Verify all accumulators have the same sub_type for acc in &accumulators { if acc.sub_type != sub_type { - return Err("Cannot merge accumulators with different sub_types".into()); + return Err(AccumulatorError::MergeTypeMismatch { + expected: sub_type.clone(), + got: acc.sub_type.clone(), + } + .into()); } } - let mut result = MultipleMinMaxAccumulator::new(sub_type.clone()); + let mut result = MultipleMinMaxAccumulator::new(sub_type.clone())?; for acc in accumulators { for (key, value) in acc.values { @@ -481,4 +495,17 @@ mod tests { // Test type name through trait object assert_eq!(trait_obj.type_name(), "MultipleMinMaxAccumulator"); } + + #[test] + fn test_new_invalid_sub_type() { + let err = MultipleMinMaxAccumulator::new("mean".to_string()).unwrap_err(); + assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "mean")); + } + + #[test] + fn test_new_with_values_invalid_sub_type() { + let err = MultipleMinMaxAccumulator::new_with_values(HashMap::new(), "sum".to_string()) + .unwrap_err(); + assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "sum")); + } } diff --git a/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs b/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs index 169d4f32..7f1c6135 100644 --- a/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs +++ b/asap-query-engine/src/tests/datafusion/plan_execution_tests.rs @@ -333,8 +333,8 @@ mod tests { #[tokio::test] #[ignore = "Blocked: MinMaxAccumulator has no arroyo serde"] async fn test_old_vs_new_minmax_min() { - let mm_a = MinMaxAccumulator::with_value(10.0, "min".to_string()); - let mm_b = MinMaxAccumulator::with_value(5.0, "min".to_string()); + let mm_a = MinMaxAccumulator::with_value(10.0, "min".to_string()).unwrap(); + let mm_b = MinMaxAccumulator::with_value(5.0, "min".to_string()).unwrap(); let engine = create_engine_single_pop( "temperature", @@ -353,8 +353,8 @@ mod tests { #[tokio::test] #[ignore = "Blocked: MinMaxAccumulator has no arroyo serde"] async fn test_old_vs_new_minmax_max() { - let mm_a = MinMaxAccumulator::with_value(90.0, "max".to_string()); - let mm_b = MinMaxAccumulator::with_value(95.0, "max".to_string()); + let mm_a = MinMaxAccumulator::with_value(90.0, "max".to_string()).unwrap(); + let mm_b = MinMaxAccumulator::with_value(95.0, "max".to_string()).unwrap(); let engine = create_engine_single_pop( "temperature", @@ -392,7 +392,7 @@ mod tests { #[tokio::test] #[ignore = "Blocked: MultipleMinMaxAccumulator has no arroyo serde"] async fn test_old_vs_new_multiple_minmax() { - let mm = MultipleMinMaxAccumulator::new("min".to_string()); + let mm = MultipleMinMaxAccumulator::new("min".to_string()).unwrap(); let engine = create_engine_single_pop( "latency", @@ -564,7 +564,7 @@ mod tests { #[tokio::test] async fn test_execute_plan_not_implemented_minmax() { - let mm = MinMaxAccumulator::with_value(42.0, "min".to_string()); + let mm = MinMaxAccumulator::with_value(42.0, "min".to_string()).unwrap(); let engine = create_engine_single_pop( "temperature", diff --git a/asap-query-engine/src/tests/store_correctness_tests.rs b/asap-query-engine/src/tests/store_correctness_tests.rs index ef9425bb..b15ce460 100644 --- a/asap-query-engine/src/tests/store_correctness_tests.rs +++ b/asap-query-engine/src/tests/store_correctness_tests.rs @@ -829,7 +829,7 @@ fn test_clone_fidelity_sum(strategy: LockStrategy) { } fn test_clone_fidelity_min_max(strategy: LockStrategy) { - let acc = MinMaxAccumulator::with_value(42.0, "max".to_string()); + let acc = MinMaxAccumulator::with_value(42.0, "max".to_string()).unwrap(); roundtrip(strategy, acc); } @@ -858,7 +858,7 @@ fn test_clone_fidelity_multiple_min_max(strategy: LockStrategy) { let mut values = HashMap::new(); values.insert(key(&["dc", "east"]), 77.7); values.insert(key(&["dc", "west"]), 33.3); - let acc = MultipleMinMaxAccumulator::new_with_values(values, "max".to_string()); + let acc = MultipleMinMaxAccumulator::new_with_values(values, "max".to_string()).unwrap(); roundtrip(strategy, acc); }