Skip to content

Commit eb036d1

Browse files
added a separate error type
1 parent bbaa6d5 commit eb036d1

5 files changed

Lines changed: 57 additions & 14 deletions

File tree

asap-query-engine/src/engines/simple_engine/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::{Arc, RwLock};
1616
use std::time::Instant;
1717
use tracing::{debug, warn};
1818

19+
use crate::precompute_operators::AccumulatorError;
1920
use crate::AggregateCore;
2021

2122
use asap_types::enums::WindowType;
@@ -1094,9 +1095,9 @@ impl SimpleEngine {
10941095
fn merge_accumulators(
10951096
&self,
10961097
accumulators: &[Box<dyn crate::data_model::AggregateCore>],
1097-
) -> Result<Box<dyn crate::data_model::AggregateCore>, String> {
1098+
) -> Result<Box<dyn crate::data_model::AggregateCore>, AccumulatorError> {
10981099
if accumulators.is_empty() {
1099-
return Err("merge_accumulators called with empty slice".to_string());
1100+
return Err(AccumulatorError::EmptySlice);
11001101
}
11011102

11021103
if accumulators.len() == 1 {

asap-query-engine/src/precompute_operators/delta_set_aggregator_accumulator.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::data_model::{
55
use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack};
66
use serde_json::Value;
77
use std::collections::{HashMap, HashSet};
8+
use tracing::warn;
89

910
use promql_utilities::query_logics::enums::Statistic;
1011

@@ -247,6 +248,10 @@ impl AggregateCore for DeltaSetAggregatorAccumulator {
247248

248249
fn get_keys(&self) -> Option<Vec<KeyByLabelValues>> {
249250
if !self.removed.is_empty() {
251+
warn!(
252+
"DeltaSetAggregatorAccumulator::get_keys called with {} removed items; returning None",
253+
self.removed.len()
254+
);
250255
return None;
251256
}
252257
Some(self.added.iter().cloned().collect())

asap-query-engine/src/precompute_operators/min_max_accumulator.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::error::AccumulatorError;
12
use crate::data_model::{
23
AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink,
34
SingleSubpopulationAggregate, SingleSubpopulationAggregateFactory,
@@ -29,17 +30,17 @@ impl MinMaxAccumulator {
2930
}
3031
}
3132

32-
pub fn new(sub_type: String) -> Result<Self, String> {
33+
pub fn new(sub_type: String) -> Result<Self, AccumulatorError> {
3334
match sub_type.as_str() {
3435
"min" => Ok(Self::new_min()),
3536
"max" => Ok(Self::new_max()),
36-
_ => Err(format!("sub_type must be 'min' or 'max', got '{sub_type}'")),
37+
_ => Err(AccumulatorError::InvalidSubType(sub_type)),
3738
}
3839
}
3940

40-
pub fn with_value(value: f64, sub_type: String) -> Result<Self, String> {
41+
pub fn with_value(value: f64, sub_type: String) -> Result<Self, AccumulatorError> {
4142
if sub_type != "min" && sub_type != "max" {
42-
return Err(format!("sub_type must be 'min' or 'max', got '{sub_type}'"));
43+
return Err(AccumulatorError::InvalidSubType(sub_type));
4344
}
4445
Ok(Self { value, sub_type })
4546
}
@@ -120,15 +121,19 @@ impl MergeableAccumulator<MinMaxAccumulator> for MinMaxAccumulator {
120121
accumulators: Vec<MinMaxAccumulator>,
121122
) -> Result<MinMaxAccumulator, Box<dyn std::error::Error + Send + Sync>> {
122123
if accumulators.is_empty() {
123-
return Err("No accumulators to merge".into());
124+
return Err(AccumulatorError::EmptySlice.into());
124125
}
125126

126127
let sub_type = &accumulators[0].sub_type;
127128

128129
// Verify all accumulators have the same sub_type
129130
for acc in &accumulators {
130131
if acc.sub_type != *sub_type {
131-
return Err("Cannot merge accumulators with different sub_types".into());
132+
return Err(AccumulatorError::MergeTypeMismatch {
133+
expected: sub_type.clone(),
134+
got: acc.sub_type.clone(),
135+
}
136+
.into());
132137
}
133138
}
134139

@@ -397,4 +402,16 @@ mod tests {
397402
assert!(acc.query(Statistic::Min, None).is_err());
398403
assert_eq!(acc.type_name(), "MinMaxAccumulator");
399404
}
405+
406+
#[test]
407+
fn test_new_invalid_sub_type() {
408+
let err = MinMaxAccumulator::new("mean".to_string()).unwrap_err();
409+
assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "mean"));
410+
}
411+
412+
#[test]
413+
fn test_with_value_invalid_sub_type() {
414+
let err = MinMaxAccumulator::with_value(1.0, "sum".to_string()).unwrap_err();
415+
assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "sum"));
416+
}
400417
}

asap-query-engine/src/precompute_operators/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod count_min_sketch_accumulator;
22
pub mod count_min_sketch_with_heap_accumulator;
33
pub mod datasketches_kll_accumulator;
44
pub mod delta_set_aggregator_accumulator;
5+
pub mod error;
56
pub mod hydra_kll_accumulator;
67
pub mod increase_accumulator;
78
pub mod min_max_accumulator;
@@ -15,6 +16,7 @@ pub use count_min_sketch_accumulator::*;
1516
pub use count_min_sketch_with_heap_accumulator::*;
1617
pub use datasketches_kll_accumulator::*;
1718
pub use delta_set_aggregator_accumulator::*;
19+
pub use error::AccumulatorError;
1820
pub use hydra_kll_accumulator::*;
1921
pub use increase_accumulator::*;
2022
pub use min_max_accumulator::*;

asap-query-engine/src/precompute_operators/multiple_min_max_accumulator.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::error::AccumulatorError;
12
use crate::data_model::{
23
AggregateCore, AggregationType, KeyByLabelValues, MergeableAccumulator,
34
MultipleSubpopulationAggregate, SerializableToSink,
@@ -17,9 +18,9 @@ pub struct MultipleMinMaxAccumulator {
1718
}
1819

1920
impl MultipleMinMaxAccumulator {
20-
pub fn new(sub_type: String) -> Result<Self, String> {
21+
pub fn new(sub_type: String) -> Result<Self, AccumulatorError> {
2122
if sub_type != "min" && sub_type != "max" {
22-
return Err(format!("sub_type must be 'min' or 'max', got '{sub_type}'"));
23+
return Err(AccumulatorError::InvalidSubType(sub_type));
2324
}
2425

2526
Ok(Self {
@@ -45,9 +46,9 @@ impl MultipleMinMaxAccumulator {
4546
pub fn new_with_values(
4647
values: HashMap<KeyByLabelValues, f64>,
4748
sub_type: String,
48-
) -> Result<Self, String> {
49+
) -> Result<Self, AccumulatorError> {
4950
if sub_type != "min" && sub_type != "max" {
50-
return Err(format!("sub_type must be 'min' or 'max', got '{sub_type}'"));
51+
return Err(AccumulatorError::InvalidSubType(sub_type));
5152
}
5253

5354
Ok(Self { values, sub_type })
@@ -310,15 +311,19 @@ impl MergeableAccumulator<MultipleMinMaxAccumulator> for MultipleMinMaxAccumulat
310311
accumulators: Vec<MultipleMinMaxAccumulator>,
311312
) -> Result<MultipleMinMaxAccumulator, Box<dyn std::error::Error + Send + Sync>> {
312313
if accumulators.is_empty() {
313-
return Err("No accumulators to merge".into());
314+
return Err(AccumulatorError::EmptySlice.into());
314315
}
315316

316317
let sub_type = accumulators[0].sub_type.clone();
317318

318319
// Verify all accumulators have the same sub_type
319320
for acc in &accumulators {
320321
if acc.sub_type != sub_type {
321-
return Err("Cannot merge accumulators with different sub_types".into());
322+
return Err(AccumulatorError::MergeTypeMismatch {
323+
expected: sub_type.clone(),
324+
got: acc.sub_type.clone(),
325+
}
326+
.into());
322327
}
323328
}
324329

@@ -490,4 +495,17 @@ mod tests {
490495
// Test type name through trait object
491496
assert_eq!(trait_obj.type_name(), "MultipleMinMaxAccumulator");
492497
}
498+
499+
#[test]
500+
fn test_new_invalid_sub_type() {
501+
let err = MultipleMinMaxAccumulator::new("mean".to_string()).unwrap_err();
502+
assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "mean"));
503+
}
504+
505+
#[test]
506+
fn test_new_with_values_invalid_sub_type() {
507+
let err = MultipleMinMaxAccumulator::new_with_values(HashMap::new(), "sum".to_string())
508+
.unwrap_err();
509+
assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "sum"));
510+
}
493511
}

0 commit comments

Comments
 (0)