Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 156 additions & 1 deletion asap-common/dependencies/rs/asap_types/src/capability_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,39 @@ pub fn spatial_filter_compatible(config_filter: &str, req_filter: &str) -> bool
config_norm == req_norm
}

/// Reads the `count_events` parameter from a `CountMinSketchWithHeap` config.
/// Defaults to `true` (COUNT semantics) so existing count top-k configs that
/// omit the flag keep matching COUNT top-k queries.
fn config_count_events(config: &AggregationConfig) -> bool {
config
.parameters
.get("count_events")
.and_then(|v| v.as_bool())
.unwrap_or(true)
}

/// Top-k weighting compatibility. Only constrains `Statistic::Topk` candidates;
/// every other statistic passes unconditionally.
///
/// A COUNT top-k query (`Some(true)`) must be served by a `count_events: true`
/// sketch and a SUM top-k query (`Some(false)`) by a `count_events: false`
/// (value-weighted) sketch. This is what tells two `CountMinSketchWithHeap`
/// configs on the same metric apart. `None` (non-top-k, or PromQL top-k which
/// does not pin the weighting) imposes no constraint.
pub fn topk_weighting_compatible(
stat: Statistic,
config: &AggregationConfig,
req_count_events: Option<bool>,
) -> bool {
if stat != Statistic::Topk {
return true;
}
match req_count_events {
Some(want) => config_count_events(config) == want,
None => true,
}
}

/// Aggregation priority comparator: prefer larger `window_size` (descending).
/// This is a separate function so callers can swap the policy without touching matching logic.
pub fn aggregation_priority(a: &AggregationConfig, b: &AggregationConfig) -> Ordering {
Expand Down Expand Up @@ -157,7 +190,8 @@ pub fn find_compatible_aggregation(
&& spatial_filter_compatible(
&c.spatial_filter_normalized,
&requirements.spatial_filter_normalized,
);
)
&& topk_weighting_compatible(stat, c, requirements.topk_count_events);
if !ok {
debug!(
agg_id = c.aggregation_id,
Expand Down Expand Up @@ -216,6 +250,10 @@ pub fn find_compatible_aggregation(
}

// If value type is multi-population, find the paired key aggregation.
// Top-k (CountMinSketchWithHeap) follows the same path as plain COUNT: the
// self-keyed case is expressed via the query_config path (a single
// aggregation reference), while the capability-matching fallback resolves a
// separate key aggregation just like any other multi-population value type.
let key_agg: &AggregationConfig = if is_multi_population_value_type(value_agg.aggregation_type)
{
let ka = configs
Expand Down Expand Up @@ -310,6 +348,7 @@ mod tests {
data_range_ms,
grouping_labels: KeyByLabelNames::new(grouping.iter().map(|s| s.to_string()).collect()),
spatial_filter_normalized: normalize_spatial_filter(spatial_filter),
topk_count_events: None,
}
}

Expand Down Expand Up @@ -839,4 +878,120 @@ mod tests {
);
assert!(result.is_none());
}

// --- top-k count vs sum weighting ---
//
// Top-k follows the same capability-matching path as plain COUNT: a
// CountMinSketchWithHeap is a multi-population value type, so the fallback
// pairs it with a key aggregation. These tests therefore always provision a
// DeltaSetAggregator and focus on which *value* heap (count- vs sum-weighted)
// is selected via the count_events discriminator.

/// Paired key aggregation required by the multi-population fallback.
fn make_key_agg(id: u64, metric: &str) -> AggregationConfig {
make_config(id, metric, "DeltaSetAggregator", "", 1, "tumbling", &[], "")
}

/// `CountMinSketchWithHeap` config with an explicit `count_events` parameter.
fn make_topk_config(id: u64, metric: &str, count_events: bool) -> AggregationConfig {
let mut c = make_config(
id,
metric,
"CountMinSketchWithHeap",
"",
1,
"tumbling",
&[],
"",
);
c.parameters.insert(
"count_events".to_string(),
serde_json::Value::Bool(count_events),
);
c
}

fn topk_req(metric: &str, count_events: Option<bool>) -> QueryRequirements {
let mut r = req(metric, &[Statistic::Topk], Some(1_000), &[], "");
r.topk_count_events = count_events;
r
}

#[test]
fn topk_count_query_picks_count_events_sketch() {
// Two heap sketches on the same metric: one count-weighted, one
// sum-weighted. A COUNT top-k query must resolve to the count one.
let mut configs = HashMap::new();
configs.insert(1, make_topk_config(1, "netflow_table", true));
configs.insert(2, make_topk_config(2, "netflow_table", false));
configs.insert(9, make_key_agg(9, "netflow_table"));

let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(true)))
.expect("COUNT top-k should match the count_events sketch");
assert_eq!(result.aggregation_id_for_value, 1);
assert_eq!(result.aggregation_id_for_key, 9);
}

#[test]
fn topk_sum_query_picks_value_weighted_sketch() {
let mut configs = HashMap::new();
configs.insert(1, make_topk_config(1, "netflow_table", true));
configs.insert(2, make_topk_config(2, "netflow_table", false));
configs.insert(9, make_key_agg(9, "netflow_table"));

let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(false)))
.expect("SUM top-k should match the count_events: false sketch");
assert_eq!(result.aggregation_id_for_value, 2);
assert_eq!(result.aggregation_id_for_key, 9);
}

#[test]
fn topk_sum_query_rejects_count_only_sketch() {
// Only a count-weighted sketch exists; a SUM top-k query cannot be served
// even with a key agg available.
let mut configs = HashMap::new();
configs.insert(1, make_topk_config(1, "netflow_table", true));
configs.insert(9, make_key_agg(9, "netflow_table"));
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(false)));
assert!(
result.is_none(),
"SUM top-k must not fall back to a count_events: true sketch",
);
}

#[test]
fn topk_count_query_matches_sketch_without_explicit_flag() {
// Configs that omit `count_events` default to count semantics, so a
// COUNT top-k query still matches (backwards compatibility).
let mut configs = HashMap::new();
configs.insert(
7,
make_config(
7,
"netflow_table",
"CountMinSketchWithHeap",
"",
1,
"tumbling",
&[],
"",
),
);
configs.insert(9, make_key_agg(9, "netflow_table"));
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", Some(true)))
.expect("default (no flag) sketch should serve COUNT top-k");
assert_eq!(result.aggregation_id_for_value, 7);
}

#[test]
fn topk_unconstrained_weighting_matches_any_sketch() {
// `topk_count_events: None` (e.g. PromQL top-k) imposes no weighting
// constraint, so any heap sketch on the metric matches.
let mut configs = HashMap::new();
configs.insert(3, make_topk_config(3, "netflow_table", false));
configs.insert(9, make_key_agg(9, "netflow_table"));
let result = find_compatible_aggregation(&configs, &topk_req("netflow_table", None))
.expect("unconstrained top-k should match regardless of count_events");
assert_eq!(result.aggregation_id_for_value, 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ pub struct QueryRequirements {
pub grouping_labels: KeyByLabelNames,
/// Normalized label filter (produced by normalize_spatial_filter).
pub spatial_filter_normalized: String,
/// For `Statistic::Topk` requirements, the heavy-hitter weighting the query
/// needs, used to disambiguate two `CountMinSketchWithHeap` configs on the
/// same metric:
/// * `Some(true)` → COUNT semantics (`count_events: true`, weight 1/event),
/// * `Some(false)` → SUM semantics (`count_events: false`, weight = value).
///
/// `None` for non-top-k requirements (and for PromQL top-k, which does not
/// constrain the sketch weighting); matching ignores it when `None`.
pub topk_count_events: Option<bool>,
}
3 changes: 3 additions & 0 deletions asap-query-engine/src/engines/simple_engine/promql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,9 @@ impl SimpleEngine {
data_range_ms,
grouping_labels,
spatial_filter_normalized: normalize_spatial_filter(&spatial_filter),
// PromQL top-k does not constrain the sketch weighting; leave the
// count/sum discriminator unset so matching does not over-filter.
topk_count_events: None,
}
}

Expand Down
Loading
Loading