Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion asap-planner-rs/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl SQLSingleQueryProcessor {
all_metadata.difference(&spatial_output)
};

let configs = build_agg_configs_for_statistics(
let mut configs = build_agg_configs_for_statistics(
&statistics,
treatment_type,
&spatial_output,
Expand All @@ -135,6 +135,23 @@ impl SQLSingleQueryProcessor {
)
.map_err(ControllerError::SqlParse)?;

// 1s spatial MIN/MAX: single MinMax per GROUP BY key (not MultipleMinMax subpopulations).
if matches!(query_type, QueryType::Spatial) {
match agg_info.get_name().to_uppercase().as_str() {
"MIN" | "MAX" => {
let sub_type = agg_info.get_name().to_lowercase();
for cfg in &mut configs {
cfg.aggregation_type = AggregationType::MinMax;
cfg.aggregation_sub_type = sub_type.clone();
cfg.grouping_labels = spatial_output.clone();
cfg.aggregated_labels = KeyByLabelNames::empty();
cfg.rollup_labels = KeyByLabelNames::empty();
}
}
_ => {}
}
}

let t_lookback = match query_type {
QueryType::Spatial => self.data_ingestion_interval,
_ => sql_query.query_data[0].time_info.get_duration() as u64,
Expand Down
33 changes: 33 additions & 0 deletions asap-planner-rs/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,39 @@ fn spatial_count_distinct_hll() {
);
}

/// 1s spatial MAX(pkt_len) GROUP BY dstip → MinMax (max), grouping [dstip], empty rollup.
#[test]
fn spatial_max_pkt_len() {
let q = "SELECT dstip, MAX(pkt_len) AS max_pkt_len FROM netflow_table WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) GROUP BY dstip";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 1);
assert_eq!(out.inference_query_count(), 1);
assert!(out.has_aggregation_type("MinMax"));
assert!(out.has_aggregation_type_and_sub_type("MinMax", "max"));
assert!(!out.has_aggregation_type("MultipleMinMax"));
assert!(out.all_tumbling_window_sizes_eq(1));
assert_eq!(
out.aggregation_value_column("MinMax"),
Some("pkt_len".to_string())
);
assert_eq!(
out.aggregation_labels("MinMax", "grouping"),
vec!["dstip".to_string()]
);
assert_eq!(
out.aggregation_labels("MinMax", "rollup"),
Vec::<String>::new()
);
assert_eq!(
out.aggregation_labels("MinMax", "aggregated"),
Vec::<String>::new()
);
}

// ── T-value variants for SUM (range = 300 s fixed) ───────────────────────────
//
// These three tests use the same query and differ only in repetition_delay (T).
Expand Down
Loading