Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn map_statistic_to_precompute_operator(
Ok((AggregationType::MultipleIncrease, "".to_string()))
}
Statistic::Topk => Ok((AggregationType::CountMinSketchWithHeap, "topk".to_string())),
_ => Err(format!("Statistic {statistic:?} not supported")),
Comment thread
akanksha-akkihal marked this conversation as resolved.
Statistic::Cardinality => Ok((AggregationType::HLL, "".to_string())),
}
}

Expand Down Expand Up @@ -82,6 +82,8 @@ pub fn does_precompute_operator_support_subpopulations(
// CountMinSketchWithHeap is only supported for Topk — does not support subpopulations
AggregationType::CountMinSketchWithHeap if matches!(statistic, Statistic::Topk) => false,

AggregationType::HLL => false,

// Default: not supported
_ => panic!("Unexpected precompute operator: {}", precompute_operator),
}
Expand Down Expand Up @@ -169,6 +171,16 @@ mod tests {
));
}

#[test]
fn test_cardinality_maps_to_hll() {
let result = map_statistic_to_precompute_operator(
Statistic::Cardinality,
QueryTreatmentType::Approximate,
)
.unwrap();
assert_eq!(result, (AggregationType::HLL, "".to_string()));
}

#[test]
fn test_topk_maps_to_count_min_sketch_with_heap() {
let result =
Expand Down
7 changes: 7 additions & 0 deletions asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct SketchParameterOverrides {
pub datasketches_kll: Option<KllParams>,
#[serde(rename = "HydraKLL")]
pub hydra_kll: Option<HydraParams>,
#[serde(rename = "HLL")]
pub hll: Option<HllParams>,
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -114,6 +116,11 @@ pub struct HydraParams {
pub k: u64,
}

#[derive(Debug, Clone, Deserialize)]
pub struct HllParams {
pub precision: u64,
}

#[derive(Debug, Clone, Deserialize)]
pub struct SQLControllerConfig {
pub query_groups: Vec<SQLQueryGroup>,
Expand Down
14 changes: 14 additions & 0 deletions asap-planner-rs/src/planner/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const DEFAULT_KLL_K: u64 = 500;
const DEFAULT_HYDRA_ROW: u64 = 3;
const DEFAULT_HYDRA_COL: u64 = 1024;
const DEFAULT_HYDRA_K: u64 = 20;
const DEFAULT_HLL_PRECISION: u64 = 14;

/// Shared sketch parameter builder used by both PromQL and SQL paths.
///
Expand Down Expand Up @@ -89,6 +90,19 @@ pub fn build_sketch_parameters(
Ok(m)
}

AggregationType::HLL => {
let precision = sketch_params
.and_then(|p| p.hll.as_ref())
.map(|p| p.precision)
.unwrap_or(DEFAULT_HLL_PRECISION);
let mut m = HashMap::new();
m.insert(
"precision".to_string(),
serde_json::Value::Number(precision.into()),
);
Ok(m)
}

AggregationType::HydraKLL => {
let row_num = sketch_params
.and_then(|p| p.hydra_kll.as_ref())
Expand Down
9 changes: 7 additions & 2 deletions asap-planner-rs/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,14 @@ impl SQLSingleQueryProcessor {

// Label routing
let spatial_output = KeyByLabelNames::new(labels.iter().cloned().collect::<Vec<_>>());
let rollup = all_metadata.difference(&spatial_output);

let treatment_type = get_sql_treatment_type(agg_info.get_name());
let statistics = get_sql_statistics(agg_info.get_name())?;
let rollup = if statistics.contains(&Statistic::Cardinality) {
// Distinct target is value_column, not a rollup label dimension.
KeyByLabelNames::empty()
} else {
all_metadata.difference(&spatial_output)
};

let configs = build_agg_configs_for_statistics(
&statistics,
Expand Down Expand Up @@ -179,6 +183,7 @@ fn get_sql_statistics(name: &str) -> Result<Vec<Statistic>, ControllerError> {
"AVG" => Ok(vec![Statistic::Sum, Statistic::Count]),
"MIN" => Ok(vec![Statistic::Min]),
"MAX" => Ok(vec![Statistic::Max]),
"CARDINALITY" => Ok(vec![Statistic::Cardinality]),
other => Err(ControllerError::SqlParse(format!(
"Unsupported aggregation: {}",
other
Expand Down
69 changes: 69 additions & 0 deletions asap-planner-rs/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,75 @@ fn temporal_quantile_cast_datetime_bounds() {
assert_eq!(out.inference_cleanup_param(q), Some(1));
}

// ── COUNT(DISTINCT) / HLL (spatial 1 s) ───────────────────────────────────────

fn netflow_one_query_config(query: &str, t_repeat: u64) -> String {
format!(
r#"
tables:
- name: netflow_table
time_column: time
value_columns: [pkt_len, dstip]
metadata_columns: [srcip, dstip, proto]
query_groups:
- id: 1
repetition_delay: {t_repeat}
controller_options:
accuracy_sla: 0.95
latency_sla: 100.0
queries:
- >-
{query}
aggregate_cleanup:
policy: no_cleanup
"#
)
}

fn sql_opts_1s_ingest() -> SQLRuntimeOptions {
SQLRuntimeOptions {
streaming_engine: StreamingEngine::Arroyo,
query_evaluation_time: Some(1_000_000.0),
data_ingestion_interval: 1,
}
}

/// COUNT(DISTINCT dstip) GROUP BY srcip, 1 s window → HLL, grouping [srcip], empty rollup.
#[test]
fn spatial_count_distinct_hll() {
let q = "SELECT srcip, COUNT(DISTINCT dstip) AS unique_peers FROM netflow_table WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) GROUP BY srcip";
let out = SQLController::from_yaml(&netflow_one_query_config(q, 1), sql_opts_1s_ingest())
.unwrap()
.generate()
.unwrap();

assert_eq!(out.streaming_aggregation_count(), 1);
assert_eq!(out.inference_query_count(), 1);
assert!(out.has_aggregation_type("HLL"));
assert!(!out.has_aggregation_type("DeltaSetAggregator"));
assert!(out.all_tumbling_window_sizes_eq(1));
assert_eq!(
out.aggregation_table_name("HLL"),
Some("netflow_table".to_string())
);
assert_eq!(
out.aggregation_value_column("HLL"),
Some("dstip".to_string())
);
assert_eq!(
out.aggregation_labels("HLL", "grouping"),
vec!["srcip".to_string()]
);
assert_eq!(
out.aggregation_labels("HLL", "rollup"),
Vec::<String>::new()
);
assert_eq!(
out.aggregation_labels("HLL", "aggregated"),
Vec::<String>::new()
);
}

// ── T-value variants for SUM (range = 300 s fixed) ───────────────────────────
//
// These three tests use the same query and differ only in repetition_delay (T).
Expand Down
Loading