Skip to content

Commit e840200

Browse files
Added some more test
1 parent 934c43d commit e840200

12 files changed

Lines changed: 617 additions & 11 deletions

File tree

asap-planner-rs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ anyhow.workspace = true
2222
tracing.workspace = true
2323
tracing-subscriber.workspace = true
2424
clap.workspace = true
25-
indexmap = { version = "2.0", features = ["serde"] }
25+
indexmap.workspace = true
2626
promql-parser = "0.5.0"
2727

2828
[dev-dependencies]

asap-planner-rs/src/lib.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,69 @@ impl PlannerOutput {
9292
false
9393
}
9494

95+
/// Returns the sorted labels for the first aggregation matching `agg_type`,
96+
/// for the given `label_kind` ("rollup", "grouping", or "aggregated").
97+
pub fn aggregation_labels(&self, agg_type: &str, label_kind: &str) -> Vec<String> {
98+
if let YamlValue::Mapping(root) = &self.streaming_yaml {
99+
if let Some(YamlValue::Sequence(aggs)) = root.get("aggregations") {
100+
for agg in aggs {
101+
if let YamlValue::Mapping(m) = agg {
102+
if let Some(YamlValue::String(t)) = m.get("aggregationType") {
103+
if t == agg_type {
104+
if let Some(YamlValue::Mapping(labels)) = m.get("labels") {
105+
if let Some(YamlValue::Sequence(seq)) = labels.get(label_kind) {
106+
let mut result: Vec<String> = seq
107+
.iter()
108+
.filter_map(|v| {
109+
if let YamlValue::String(s) = v {
110+
Some(s.clone())
111+
} else {
112+
None
113+
}
114+
})
115+
.collect();
116+
result.sort();
117+
return result;
118+
}
119+
}
120+
}
121+
}
122+
}
123+
}
124+
}
125+
}
126+
vec![]
127+
}
128+
129+
/// Returns the cleanup param (read_count_threshold or num_aggregates_to_retain)
130+
/// for the first aggregation entry of the given query string.
131+
pub fn inference_cleanup_param(&self, query: &str) -> Option<u64> {
132+
if let YamlValue::Mapping(root) = &self.inference_yaml {
133+
if let Some(YamlValue::Sequence(queries)) = root.get("queries") {
134+
for q in queries {
135+
if let YamlValue::Mapping(qm) = q {
136+
if let Some(YamlValue::String(qs)) = qm.get("query") {
137+
if qs == query {
138+
if let Some(YamlValue::Sequence(aggs)) = qm.get("aggregations") {
139+
if let Some(YamlValue::Mapping(agg)) = aggs.first() {
140+
for key in ["read_count_threshold", "num_aggregates_to_retain"] {
141+
if let Some(v) = agg.get(key) {
142+
if let YamlValue::Number(n) = v {
143+
return n.as_u64();
144+
}
145+
}
146+
}
147+
}
148+
}
149+
}
150+
}
151+
}
152+
}
153+
}
154+
}
155+
None
156+
}
157+
95158
pub fn to_streaming_yaml_string(&self) -> Result<String, anyhow::Error> {
96159
Ok(serde_yaml::to_string(&self.streaming_yaml)?)
97160
}

asap-planner-rs/src/output/generator.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,13 @@ pub fn generate_plan(
7474

7575
if should_process {
7676
let (configs, cleanup_param) = processor.get_streaming_aggregation_configs()?;
77-
query_keys_map.insert(query_string.clone(), Vec::new());
77+
let mut keys_for_query = Vec::new();
7878
for config in configs {
7979
let key = config.identifying_key();
80-
query_keys_map
81-
.get_mut(query_string)
82-
.unwrap()
83-
.push((key.clone(), cleanup_param));
80+
keys_for_query.push((key.clone(), cleanup_param));
8481
dedup_map.entry(key).or_insert(config);
8582
}
83+
query_keys_map.insert(query_string.clone(), keys_for_query);
8684
}
8785
}
8886
}

asap-planner-rs/src/planner/logics.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,117 @@ pub fn get_cleanup_param(
250250
Ok(result)
251251
}
252252

253+
#[cfg(test)]
254+
mod tests {
255+
use super::*;
256+
use crate::planner::patterns::build_patterns;
257+
use promql_utilities::ast_matching::PromQLMatchResult;
258+
use promql_utilities::query_logics::enums::QueryPatternType;
259+
260+
fn match_query(query: &str) -> (QueryPatternType, PromQLMatchResult) {
261+
let ast = promql_parser::parser::parse(query).unwrap();
262+
let patterns = build_patterns();
263+
for (pt, pattern) in &patterns {
264+
let result = pattern.matches(&ast);
265+
if result.matches {
266+
return (*pt, result);
267+
}
268+
}
269+
panic!("no pattern matched query: {}", query);
270+
}
271+
272+
// --- get_effective_repeat ---
273+
274+
#[test]
275+
fn effective_repeat_no_step() {
276+
assert_eq!(get_effective_repeat(300, 0), 300);
277+
}
278+
279+
#[test]
280+
fn effective_repeat_step_smaller_than_t_repeat() {
281+
assert_eq!(get_effective_repeat(300, 30), 30);
282+
}
283+
284+
#[test]
285+
fn effective_repeat_step_larger_than_t_repeat() {
286+
assert_eq!(get_effective_repeat(30, 300), 30);
287+
}
288+
289+
// --- get_cleanup_param ---
290+
291+
#[test]
292+
fn cleanup_param_circular_buffer_spatial_instant_query() {
293+
let (pt, mr) = match_query("sum(some_metric)");
294+
assert_eq!(pt, QueryPatternType::OnlySpatial);
295+
// t_lookback = t_repeat = 300 (OnlySpatial path)
296+
// effective_repeat = 300 (step=0)
297+
// ceil((300 + 0) / 300) = 1
298+
let result =
299+
get_cleanup_param(CleanupPolicy::CircularBuffer, pt, &mr, 300, "tumbling", 0, 0)
300+
.unwrap();
301+
assert_eq!(result, 1);
302+
}
303+
304+
#[test]
305+
fn cleanup_param_circular_buffer_spatial_range_query() {
306+
let (pt, mr) = match_query("sum(some_metric)");
307+
// t_lookback = t_repeat = 300, effective_repeat = min(300, 30) = 30
308+
// ceil((300 + 3600) / 30) = ceil(130) = 130
309+
let result =
310+
get_cleanup_param(CleanupPolicy::CircularBuffer, pt, &mr, 300, "tumbling", 3600, 30)
311+
.unwrap();
312+
assert_eq!(result, 130);
313+
}
314+
315+
#[test]
316+
fn cleanup_param_read_based_spatial_instant_query() {
317+
let (pt, mr) = match_query("sum(some_metric)");
318+
// lookback_buckets = ceil(300/300) = 1, num_steps = 1 → result = 1
319+
let result =
320+
get_cleanup_param(CleanupPolicy::ReadBased, pt, &mr, 300, "tumbling", 0, 0).unwrap();
321+
assert_eq!(result, 1);
322+
}
323+
324+
#[test]
325+
fn cleanup_param_read_based_spatial_range_query() {
326+
let (pt, mr) = match_query("sum(some_metric)");
327+
// lookback_buckets = ceil(300/30) = 10, num_steps = 3600/30 + 1 = 121
328+
// result = 10 * 121 = 1210
329+
let result =
330+
get_cleanup_param(CleanupPolicy::ReadBased, pt, &mr, 300, "tumbling", 3600, 30)
331+
.unwrap();
332+
assert_eq!(result, 1210);
333+
}
334+
335+
#[test]
336+
fn cleanup_param_circular_buffer_temporal_instant_query() {
337+
let (pt, mr) = match_query("rate(some_metric[5m])");
338+
assert_eq!(pt, QueryPatternType::OnlyTemporal);
339+
// t_lookback = 5m = 300s (from [5m] range vector), range_duration=0, step=0
340+
// effective_repeat = 60, ceil((300 + 0) / 60) = 5
341+
let result =
342+
get_cleanup_param(CleanupPolicy::CircularBuffer, pt, &mr, 60, "tumbling", 0, 0)
343+
.unwrap();
344+
assert_eq!(result, 5);
345+
}
346+
347+
#[test]
348+
fn cleanup_param_no_cleanup_returns_error() {
349+
let (pt, mr) = match_query("sum(some_metric)");
350+
let result = get_cleanup_param(CleanupPolicy::NoCleanup, pt, &mr, 300, "tumbling", 0, 0);
351+
assert!(result.is_err());
352+
}
353+
354+
#[test]
355+
fn cleanup_param_mismatched_range_and_step_returns_error() {
356+
let (pt, mr) = match_query("sum(some_metric)");
357+
// range_duration > 0 but step == 0 is invalid
358+
let result =
359+
get_cleanup_param(CleanupPolicy::CircularBuffer, pt, &mr, 300, "tumbling", 3600, 0);
360+
assert!(result.is_err());
361+
}
362+
}
363+
253364
pub fn set_subpopulation_labels(
254365
statistic: Statistic,
255366
aggregation_type: &str,

asap-planner-rs/src/planner/single_query.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,3 +413,75 @@ fn compute_labels(
413413

414414
(rollup, grouping, aggregated)
415415
}
416+
417+
#[cfg(test)]
418+
mod tests {
419+
use super::*;
420+
use serde_json::Value;
421+
use std::collections::HashMap;
422+
423+
fn base_config() -> IntermediateAggConfig {
424+
IntermediateAggConfig {
425+
aggregation_type: "MultipleIncrease".to_string(),
426+
aggregation_sub_type: "rate".to_string(),
427+
window_type: "tumbling".to_string(),
428+
window_size: 300,
429+
slide_interval: 300,
430+
tumbling_window_size: 300,
431+
spatial_filter: String::new(),
432+
metric: "http_requests_total".to_string(),
433+
table_name: None,
434+
value_column: None,
435+
parameters: HashMap::new(),
436+
rollup_labels: KeyByLabelNames::new(vec!["instance".to_string()]),
437+
grouping_labels: KeyByLabelNames::empty(),
438+
aggregated_labels: KeyByLabelNames::empty(),
439+
}
440+
}
441+
442+
#[test]
443+
fn identifying_key_is_stable() {
444+
let cfg = base_config();
445+
assert_eq!(cfg.identifying_key(), cfg.identifying_key());
446+
}
447+
448+
#[test]
449+
fn identical_configs_have_same_key() {
450+
assert_eq!(base_config().identifying_key(), base_config().identifying_key());
451+
}
452+
453+
#[test]
454+
fn different_aggregation_type_produces_different_key() {
455+
let cfg1 = base_config();
456+
let mut cfg2 = base_config();
457+
cfg2.aggregation_type = "DatasketchesKLL".to_string();
458+
assert_ne!(cfg1.identifying_key(), cfg2.identifying_key());
459+
}
460+
461+
#[test]
462+
fn different_window_size_produces_different_key() {
463+
let cfg1 = base_config();
464+
let mut cfg2 = base_config();
465+
cfg2.window_size = 60;
466+
assert_ne!(cfg1.identifying_key(), cfg2.identifying_key());
467+
}
468+
469+
#[test]
470+
fn different_rollup_labels_produce_different_key() {
471+
let cfg1 = base_config();
472+
let mut cfg2 = base_config();
473+
cfg2.rollup_labels = KeyByLabelNames::new(vec!["job".to_string()]);
474+
assert_ne!(cfg1.identifying_key(), cfg2.identifying_key());
475+
}
476+
477+
#[test]
478+
fn parameter_insertion_order_does_not_affect_key() {
479+
let mut cfg1 = base_config();
480+
let mut cfg2 = base_config();
481+
cfg1.parameters.insert("depth".to_string(), Value::Number(3.into()));
482+
cfg1.parameters.insert("width".to_string(), Value::Number(1024.into()));
483+
cfg2.parameters.insert("width".to_string(), Value::Number(1024.into()));
484+
cfg2.parameters.insert("depth".to_string(), Value::Number(3.into()));
485+
assert_eq!(cfg1.identifying_key(), cfg2.identifying_key());
486+
}
487+
}

asap-planner-rs/tests/comparison/test_cases.json

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,51 @@
8080
"enable_punting": false,
8181
"range_duration": 0,
8282
"step": 0
83+
},
84+
{
85+
"id": "increase",
86+
"input_config": "test_data/configs/increase.yaml",
87+
"prometheus_scrape_interval": 15,
88+
"streaming_engine": "arroyo",
89+
"enable_punting": false,
90+
"range_duration": 0,
91+
"step": 0
92+
},
93+
{
94+
"id": "sum_over_time",
95+
"input_config": "test_data/configs/sum_over_time.yaml",
96+
"prometheus_scrape_interval": 15,
97+
"streaming_engine": "arroyo",
98+
"enable_punting": false,
99+
"range_duration": 0,
100+
"step": 0
101+
},
102+
{
103+
"id": "sum_by",
104+
"input_config": "test_data/configs/sum_by.yaml",
105+
"prometheus_scrape_interval": 15,
106+
"streaming_engine": "arroyo",
107+
"enable_punting": false,
108+
"range_duration": 0,
109+
"step": 0
110+
},
111+
{
112+
"id": "temporal_overlapping",
113+
"input_config": "test_data/configs/temporal_overlapping.yaml",
114+
"prometheus_scrape_interval": 15,
115+
"streaming_engine": "arroyo",
116+
"enable_punting": false,
117+
"range_duration": 0,
118+
"step": 0
119+
},
120+
{
121+
"id": "sum_by_overlapping",
122+
"input_config": "test_data/configs/sum_by_overlapping.yaml",
123+
"prometheus_scrape_interval": 15,
124+
"streaming_engine": "arroyo",
125+
"enable_punting": false,
126+
"range_duration": 0,
127+
"step": 0
83128
}
84129
]
85130
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
query_groups:
2+
- id: 1
3+
queries:
4+
- "increase(http_requests_total[5m])"
5+
repetition_delay: 300
6+
controller_options:
7+
accuracy_sla: 0.99
8+
latency_sla: 1.0
9+
metrics:
10+
- metric: "http_requests_total"
11+
labels: ["instance", "job", "method", "status"]
12+
aggregate_cleanup:
13+
policy: "read_based"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
query_groups:
2+
- id: 1
3+
queries:
4+
- "sum by (job, method) (http_requests_total)"
5+
repetition_delay: 300
6+
controller_options:
7+
accuracy_sla: 0.99
8+
latency_sla: 1.0
9+
metrics:
10+
- metric: "http_requests_total"
11+
labels: ["instance", "job", "method", "status"]
12+
aggregate_cleanup:
13+
policy: "read_based"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
query_groups:
2+
- id: 1
3+
queries:
4+
- "sum by (job, method) (http_requests_total)"
5+
repetition_delay: 60
6+
controller_options:
7+
accuracy_sla: 0.99
8+
latency_sla: 1.0
9+
metrics:
10+
- metric: "http_requests_total"
11+
labels: ["instance", "job", "method", "status"]
12+
aggregate_cleanup:
13+
policy: "read_based"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
query_groups:
2+
- id: 1
3+
queries:
4+
- "sum_over_time(http_requests_total[5m])"
5+
repetition_delay: 300
6+
controller_options:
7+
accuracy_sla: 0.99
8+
latency_sla: 1.0
9+
metrics:
10+
- metric: "http_requests_total"
11+
labels: ["instance", "job", "method", "status"]
12+
aggregate_cleanup:
13+
policy: "read_based"

0 commit comments

Comments
 (0)