Skip to content

Commit bf87a83

Browse files
zzylolclaude
andcommitted
Fix sliding window aggregation: feed samples into all overlapping windows
Previously each sample was assigned to only one window via window_start_for(), which is incorrect for sliding windows where window_size > slide_interval. Added window_starts_containing() that returns all window starts whose range covers the timestamp, and use it in the worker aggregation loop. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 63619dd commit bf87a83

2 files changed

Lines changed: 58 additions & 12 deletions

File tree

asap-query-engine/src/precompute_engine/window_manager.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ impl WindowManager {
7373
closed
7474
}
7575

76+
/// Return all window starts whose window `[start, start + window_size_ms)`
77+
/// contains the given timestamp. For tumbling windows this returns exactly
78+
/// one start; for sliding windows it returns `ceil(window_size / slide)`
79+
/// starts.
80+
pub fn window_starts_containing(&self, timestamp_ms: i64) -> Vec<i64> {
81+
let mut starts = Vec::new();
82+
let mut start = self.window_start_for(timestamp_ms);
83+
while start + self.window_size_ms > timestamp_ms {
84+
starts.push(start);
85+
start -= self.slide_interval_ms;
86+
}
87+
starts
88+
}
89+
7690
/// Return the window `[start, end)` boundaries for a given window start.
7791
pub fn window_bounds(&self, window_start: i64) -> (i64, i64) {
7892
(window_start, window_start + self.window_size_ms)
@@ -151,4 +165,34 @@ mod tests {
151165
let closed = wm.closed_windows(15_000, 35_000);
152166
assert_eq!(closed, vec![0]);
153167
}
168+
169+
#[test]
170+
fn test_window_starts_containing_tumbling() {
171+
// 60s tumbling windows — each sample belongs to exactly one window
172+
let wm = WindowManager::new(60, 0);
173+
let mut starts = wm.window_starts_containing(15_000);
174+
starts.sort();
175+
assert_eq!(starts, vec![0]);
176+
177+
let mut starts = wm.window_starts_containing(60_000);
178+
starts.sort();
179+
assert_eq!(starts, vec![60_000]);
180+
}
181+
182+
#[test]
183+
fn test_window_starts_containing_sliding() {
184+
// 30s window, 10s slide — each sample belongs to 3 windows
185+
let wm = WindowManager::new(30, 10);
186+
187+
// t=15_000 belongs to [0, 30_000), [10_000, 40_000)
188+
// and [-10_000, 20_000) which starts negative — still returned
189+
let mut starts = wm.window_starts_containing(15_000);
190+
starts.sort();
191+
assert_eq!(starts, vec![-10_000, 0, 10_000]);
192+
193+
// t=30_000 belongs to [10_000, 40_000), [20_000, 50_000), [30_000, 60_000)
194+
let mut starts = wm.window_starts_containing(30_000);
195+
starts.sort();
196+
assert_eq!(starts, vec![10_000, 20_000, 30_000]);
197+
}
154198
}

asap-query-engine/src/precompute_engine/worker.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,18 +221,20 @@ impl Worker {
221221
continue; // already dropped
222222
}
223223

224-
let window_start = agg_state.window_manager.window_start_for(ts);
225-
226-
let updater = agg_state
227-
.active_windows
228-
.entry(window_start)
229-
.or_insert_with(|| create_accumulator_updater(&agg_state.config));
230-
231-
if updater.is_keyed() {
232-
let key = extract_key_from_series(series_key, &agg_state.config);
233-
updater.update_keyed(&key, val, ts);
234-
} else {
235-
updater.update_single(val, ts);
224+
let window_starts = agg_state.window_manager.window_starts_containing(ts);
225+
226+
for window_start in window_starts {
227+
let updater = agg_state
228+
.active_windows
229+
.entry(window_start)
230+
.or_insert_with(|| create_accumulator_updater(&agg_state.config));
231+
232+
if updater.is_keyed() {
233+
let key = extract_key_from_series(series_key, &agg_state.config);
234+
updater.update_keyed(&key, val, ts);
235+
} else {
236+
updater.update_single(val, ts);
237+
}
236238
}
237239
}
238240

0 commit comments

Comments
 (0)