Skip to content

Commit 8a1cd59

Browse files
zzylolclaude
andcommitted
Add correctness tests: new store vs legacy per all Store trait methods
Tests in store_correctness_tests.rs verify that SimpleMapStorePerKey and SimpleMapStoreGlobal produce identical results to LegacySimpleMapStorePerKey for range queries, exact queries, earliest-timestamp lookups, and batch insert. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6dd9ff4 commit 8a1cd59

2 files changed

Lines changed: 380 additions & 0 deletions

File tree

asap-query-engine/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod datafusion;
33
pub mod elastic_forwarding_tests;
44
pub mod prometheus_forwarding_tests;
55
pub mod query_equivalence_tests;
6+
pub mod store_correctness_tests;
67
pub mod trait_design_tests;
78

89
#[cfg(test)]
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
//! Store Correctness Tests
2+
//!
3+
//! Verifies that `SimpleMapStorePerKey` and `SimpleMapStoreGlobal` produce
4+
//! identical results to `LegacySimpleMapStorePerKey` for range queries,
5+
//! exact queries, and earliest-timestamp lookups.
6+
7+
#![allow(deprecated)]
8+
9+
use crate::data_model::{CleanupPolicy, KeyByLabelValues, LockStrategy, PrecomputedOutput};
10+
use crate::precompute_operators::sum_accumulator::SumAccumulator;
11+
use crate::stores::simple_map_store::per_key_legacy::LegacySimpleMapStorePerKey;
12+
use crate::stores::simple_map_store::SimpleMapStore;
13+
use crate::stores::{Store, TimestampedBucketsMap};
14+
use crate::AggregateCore;
15+
use promql_utilities::data_model::KeyByLabelNames;
16+
use sketch_db_common::aggregation_config::AggregationConfig;
17+
use std::collections::HashMap;
18+
use std::sync::Arc;
19+
20+
// ---------------------------------------------------------------------------
21+
// Helpers
22+
// ---------------------------------------------------------------------------
23+
24+
fn make_streaming_config(
25+
num_aggregates_to_retain: Option<u64>,
26+
) -> Arc<crate::data_model::StreamingConfig> {
27+
let mut configs = HashMap::new();
28+
configs.insert(
29+
1u64,
30+
AggregationConfig {
31+
aggregation_id: 1,
32+
aggregation_type: "SumAccumulator".to_string(),
33+
aggregation_sub_type: String::new(),
34+
parameters: HashMap::new(),
35+
grouping_labels: KeyByLabelNames::empty(),
36+
aggregated_labels: KeyByLabelNames::empty(),
37+
rollup_labels: KeyByLabelNames::empty(),
38+
original_yaml: String::new(),
39+
window_size: 1000,
40+
slide_interval: 1000,
41+
window_type: "tumbling".to_string(),
42+
tumbling_window_size: 1000,
43+
spatial_filter: String::new(),
44+
spatial_filter_normalized: String::new(),
45+
metric: "test_metric".to_string(),
46+
num_aggregates_to_retain,
47+
read_count_threshold: None,
48+
table_name: None,
49+
value_column: None,
50+
},
51+
);
52+
Arc::new(crate::data_model::StreamingConfig::new(configs))
53+
}
54+
55+
/// One row of test data: (start_ts, end_ts, label, sum_value)
56+
struct Row {
57+
start: u64,
58+
end: u64,
59+
label: String,
60+
value: f64,
61+
}
62+
63+
fn populate_new(store: &SimpleMapStore, rows: &[Row]) {
64+
for r in rows {
65+
let key = KeyByLabelValues::new_with_labels(vec![r.label.clone()]);
66+
let output = PrecomputedOutput::new(r.start, r.end, Some(key), 1);
67+
let acc: Box<dyn AggregateCore> = Box::new(SumAccumulator::with_sum(r.value));
68+
store.insert_precomputed_output(output, acc).unwrap();
69+
}
70+
}
71+
72+
fn populate_legacy(store: &LegacySimpleMapStorePerKey, rows: &[Row]) {
73+
for r in rows {
74+
let key = KeyByLabelValues::new_with_labels(vec![r.label.clone()]);
75+
let output = PrecomputedOutput::new(r.start, r.end, Some(key), 1);
76+
let acc: Box<dyn AggregateCore> = Box::new(SumAccumulator::with_sum(r.value));
77+
store.insert_precomputed_output(output, acc).unwrap();
78+
}
79+
}
80+
81+
/// Extract (label, sum) pairs from a TimestampedBucketsMap, sorted for comparison.
82+
fn extract_sums(map: &TimestampedBucketsMap) -> Vec<(Vec<String>, u64, u64, f64)> {
83+
let mut out = Vec::new();
84+
for (key, buckets) in map {
85+
let labels = key
86+
.as_ref()
87+
.map(|k| k.labels.clone())
88+
.unwrap_or_default();
89+
for ((start, end), acc) in buckets {
90+
let sum = acc
91+
.as_any()
92+
.downcast_ref::<SumAccumulator>()
93+
.map(|s| s.sum)
94+
.unwrap_or(f64::NAN);
95+
out.push((labels.clone(), *start, *end, sum));
96+
}
97+
}
98+
out.sort_by(|a, b| {
99+
a.0.cmp(&b.0)
100+
.then(a.1.cmp(&b.1))
101+
.then(a.2.cmp(&b.2))
102+
});
103+
out
104+
}
105+
106+
/// Assert that two `TimestampedBucketsMap` results are identical.
107+
fn assert_maps_equal(new: &TimestampedBucketsMap, legacy: &TimestampedBucketsMap, ctx: &str) {
108+
let new_sums = extract_sums(new);
109+
let legacy_sums = extract_sums(legacy);
110+
assert_eq!(
111+
new_sums, legacy_sums,
112+
"{ctx}: result mismatch\n new: {new_sums:?}\n legacy: {legacy_sums:?}",
113+
);
114+
}
115+
116+
/// Build a standard set of rows: 5 time windows × 3 labels.
117+
fn make_rows() -> Vec<Row> {
118+
let mut rows = Vec::new();
119+
for i in 0u64..5 {
120+
let start = i * 1000;
121+
let end = start + 1000;
122+
for (j, label) in ["host-a", "host-b", "host-c"].iter().enumerate() {
123+
rows.push(Row {
124+
start,
125+
end,
126+
label: label.to_string(),
127+
value: (i * 3 + j as u64) as f64 + 1.0,
128+
});
129+
}
130+
}
131+
rows
132+
}
133+
134+
// ---------------------------------------------------------------------------
135+
// Tests
136+
// ---------------------------------------------------------------------------
137+
138+
#[cfg(test)]
139+
mod tests {
140+
use super::*;
141+
142+
// --- range query (query_precomputed_output) ---
143+
144+
#[test]
145+
fn per_key_range_query_matches_legacy_full_range() {
146+
let rows = make_rows();
147+
let config = make_streaming_config(None);
148+
149+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
150+
populate_legacy(&legacy, &rows);
151+
152+
let new = SimpleMapStore::new_with_strategy(
153+
config,
154+
CleanupPolicy::NoCleanup,
155+
LockStrategy::PerKey,
156+
);
157+
populate_new(&new, &rows);
158+
159+
let start = 0;
160+
let end = 5000;
161+
let new_result = new
162+
.query_precomputed_output("test_metric", 1, start, end)
163+
.unwrap();
164+
let legacy_result = legacy
165+
.query_precomputed_output("test_metric", 1, start, end)
166+
.unwrap();
167+
168+
assert_maps_equal(&new_result, &legacy_result, "per_key range full");
169+
}
170+
171+
#[test]
172+
fn global_range_query_matches_legacy_full_range() {
173+
let rows = make_rows();
174+
let config = make_streaming_config(None);
175+
176+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
177+
populate_legacy(&legacy, &rows);
178+
179+
let new = SimpleMapStore::new_with_strategy(
180+
config,
181+
CleanupPolicy::NoCleanup,
182+
LockStrategy::Global,
183+
);
184+
populate_new(&new, &rows);
185+
186+
let start = 0;
187+
let end = 5000;
188+
let new_result = new
189+
.query_precomputed_output("test_metric", 1, start, end)
190+
.unwrap();
191+
let legacy_result = legacy
192+
.query_precomputed_output("test_metric", 1, start, end)
193+
.unwrap();
194+
195+
assert_maps_equal(&new_result, &legacy_result, "global range full");
196+
}
197+
198+
#[test]
199+
fn per_key_range_query_matches_legacy_partial_range() {
200+
let rows = make_rows();
201+
let config = make_streaming_config(None);
202+
203+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
204+
populate_legacy(&legacy, &rows);
205+
206+
let new = SimpleMapStore::new_with_strategy(
207+
config,
208+
CleanupPolicy::NoCleanup,
209+
LockStrategy::PerKey,
210+
);
211+
populate_new(&new, &rows);
212+
213+
// Query only the middle three windows
214+
let start = 1000;
215+
let end = 3999;
216+
let new_result = new
217+
.query_precomputed_output("test_metric", 1, start, end)
218+
.unwrap();
219+
let legacy_result = legacy
220+
.query_precomputed_output("test_metric", 1, start, end)
221+
.unwrap();
222+
223+
assert_maps_equal(&new_result, &legacy_result, "per_key range partial");
224+
}
225+
226+
// --- exact query (query_precomputed_output_exact) ---
227+
228+
#[test]
229+
fn per_key_exact_query_matches_legacy() {
230+
let rows = make_rows();
231+
let config = make_streaming_config(None);
232+
233+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
234+
populate_legacy(&legacy, &rows);
235+
236+
let new = SimpleMapStore::new_with_strategy(
237+
config,
238+
CleanupPolicy::NoCleanup,
239+
LockStrategy::PerKey,
240+
);
241+
populate_new(&new, &rows);
242+
243+
// Exact match on the third window [2000, 3000)
244+
let exact_start = 2000;
245+
let exact_end = 3000;
246+
let new_result = new
247+
.query_precomputed_output_exact("test_metric", 1, exact_start, exact_end)
248+
.unwrap();
249+
let legacy_result = legacy
250+
.query_precomputed_output_exact("test_metric", 1, exact_start, exact_end)
251+
.unwrap();
252+
253+
assert_maps_equal(&new_result, &legacy_result, "per_key exact");
254+
}
255+
256+
#[test]
257+
fn global_exact_query_matches_legacy() {
258+
let rows = make_rows();
259+
let config = make_streaming_config(None);
260+
261+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
262+
populate_legacy(&legacy, &rows);
263+
264+
let new = SimpleMapStore::new_with_strategy(
265+
config,
266+
CleanupPolicy::NoCleanup,
267+
LockStrategy::Global,
268+
);
269+
populate_new(&new, &rows);
270+
271+
let exact_start = 2000;
272+
let exact_end = 3000;
273+
let new_result = new
274+
.query_precomputed_output_exact("test_metric", 1, exact_start, exact_end)
275+
.unwrap();
276+
let legacy_result = legacy
277+
.query_precomputed_output_exact("test_metric", 1, exact_start, exact_end)
278+
.unwrap();
279+
280+
assert_maps_equal(&new_result, &legacy_result, "global exact");
281+
}
282+
283+
#[test]
284+
fn exact_query_returns_empty_when_no_match() {
285+
let rows = make_rows();
286+
let config = make_streaming_config(None);
287+
288+
let new =
289+
SimpleMapStore::new_with_strategy(config, CleanupPolicy::NoCleanup, LockStrategy::PerKey);
290+
populate_new(&new, &rows);
291+
292+
// No window at [9000, 10000)
293+
let result = new
294+
.query_precomputed_output_exact("test_metric", 1, 9000, 10000)
295+
.unwrap();
296+
297+
assert!(result.is_empty(), "Expected empty result for unmatched exact query");
298+
}
299+
300+
// --- earliest timestamp ---
301+
302+
#[test]
303+
fn per_key_earliest_timestamp_matches_legacy() {
304+
let rows = make_rows();
305+
let config = make_streaming_config(None);
306+
307+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
308+
populate_legacy(&legacy, &rows);
309+
310+
let new = SimpleMapStore::new_with_strategy(
311+
config,
312+
CleanupPolicy::NoCleanup,
313+
LockStrategy::PerKey,
314+
);
315+
populate_new(&new, &rows);
316+
317+
let new_ts = new.get_earliest_timestamp_per_aggregation_id().unwrap();
318+
let legacy_ts = legacy.get_earliest_timestamp_per_aggregation_id().unwrap();
319+
320+
assert_eq!(new_ts, legacy_ts, "earliest timestamp mismatch (per_key)");
321+
}
322+
323+
#[test]
324+
fn global_earliest_timestamp_matches_legacy() {
325+
let rows = make_rows();
326+
let config = make_streaming_config(None);
327+
328+
let legacy = LegacySimpleMapStorePerKey::new(config.clone(), CleanupPolicy::NoCleanup);
329+
populate_legacy(&legacy, &rows);
330+
331+
let new = SimpleMapStore::new_with_strategy(
332+
config,
333+
CleanupPolicy::NoCleanup,
334+
LockStrategy::Global,
335+
);
336+
populate_new(&new, &rows);
337+
338+
let new_ts = new.get_earliest_timestamp_per_aggregation_id().unwrap();
339+
let legacy_ts = legacy.get_earliest_timestamp_per_aggregation_id().unwrap();
340+
341+
assert_eq!(new_ts, legacy_ts, "earliest timestamp mismatch (global)");
342+
}
343+
344+
// --- batch insert produces same results as single insert ---
345+
346+
#[test]
347+
fn batch_insert_matches_single_insert_per_key() {
348+
let rows = make_rows();
349+
let config = make_streaming_config(None);
350+
351+
let single =
352+
SimpleMapStore::new_with_strategy(config.clone(), CleanupPolicy::NoCleanup, LockStrategy::PerKey);
353+
populate_new(&single, &rows);
354+
355+
let batch =
356+
SimpleMapStore::new_with_strategy(config, CleanupPolicy::NoCleanup, LockStrategy::PerKey);
357+
let batch_inputs: Vec<_> = rows
358+
.iter()
359+
.map(|r| {
360+
let key = KeyByLabelValues::new_with_labels(vec![r.label.clone()]);
361+
let output = PrecomputedOutput::new(r.start, r.end, Some(key), 1);
362+
let acc: Box<dyn AggregateCore> = Box::new(SumAccumulator::with_sum(r.value));
363+
(output, acc)
364+
})
365+
.collect();
366+
batch.insert_precomputed_output_batch(batch_inputs).unwrap();
367+
368+
let start = 0;
369+
let end = 5000;
370+
let single_result = single
371+
.query_precomputed_output("test_metric", 1, start, end)
372+
.unwrap();
373+
let batch_result = batch
374+
.query_precomputed_output("test_metric", 1, start, end)
375+
.unwrap();
376+
377+
assert_maps_equal(&single_result, &batch_result, "batch vs single insert");
378+
}
379+
}

0 commit comments

Comments
 (0)