From c430204b53ee4194c8da23206185a44d18464886 Mon Sep 17 00:00:00 2001 From: zzylol Date: Tue, 26 May 2026 13:47:13 -0600 Subject: [PATCH 1/2] precompute: avoid cloning the accumulator at window close merge_panes_for_window evicts the oldest pane destructively (it is removed from the map and dropped), but then called take_accumulator(), which clones the inner accumulator before the updater is dropped. For sketch accumulators that clone is expensive (a serialize -> deserialize round-trip), so it is pure waste when the pane is being evicted. Add AccumulatorUpdater::into_accumulator(self: Box) that moves the inner accumulator out (no clone) and use it for the evicted (oldest) pane. Sliding- window shared panes still snapshot_accumulator() (clone), since a later window may still read them. A default impl falls back to a clone for updaters that can't cheaply move their inner accumulator out. For tumbling windows this removes one accumulator clone per window close. Co-Authored-By: Claude Opus 4.7 --- .../precompute_engine/accumulator_factory.rs | 19 +++++++++++++++++++ .../src/precompute_engine/worker.rs | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index d3b41dc3..565fcd8c 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -21,6 +21,15 @@ macro_rules! impl_clone_accumulator_methods { fn snapshot_accumulator(&self) -> Box { Box::new(self.$acc_field.clone()) } + + fn into_accumulator(self: Box) -> Box { + // Consume the updater and MOVE the accumulator out — no clone. + // Avoids the expensive `Clone` (a full msgpack serialize/deserialize + // round-trip for sketch accumulators) when a pane is evicted at + // window close. + let this = *self; + Box::new(this.$acc_field) + } }; } @@ -42,6 +51,16 @@ pub trait AccumulatorUpdater: Send { /// Used by pane-based sliding windows to read shared panes. fn snapshot_accumulator(&self) -> Box; + /// Consume the updater and return its accumulator BY MOVE, avoiding the + /// `Clone` that `take_accumulator`/`snapshot_accumulator` pay (for sketch + /// accumulators that clone is a full msgpack serialize/deserialize + /// round-trip). Used by `merge_panes_for_window` when a pane is evicted at + /// window close. Default falls back to a clone for updaters that can't + /// cheaply move their inner accumulator out. + fn into_accumulator(self: Box) -> Box { + self.snapshot_accumulator() + } + /// Reset internal state for reuse (avoids re-allocation). fn reset(&mut self); diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 37fc8f3c..e72e6041 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -704,10 +704,10 @@ fn merge_panes_for_window( for (i, &ps) in pane_starts.iter().enumerate() { let pane_acc = if i == 0 { - // Oldest pane: destructive take + evict + // Oldest pane: evict and MOVE the accumulator out (no clone). active_panes .remove(&ps) - .map(|mut updater| updater.take_accumulator()) + .map(|updater| updater.into_accumulator()) } else { // Shared pane: non-destructive snapshot active_panes From e60c2593d32973146d01703bb6f4bc29fa309534 Mon Sep 17 00:00:00 2001 From: zz_y Date: Thu, 28 May 2026 13:44:17 -0600 Subject: [PATCH 2/2] perf(precompute): move accumulator out for IncreaseAccumulatorUpdater; rename macro MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Override into_accumulator for IncreaseAccumulatorUpdater so it moves its inner accumulator out (mirroring take_accumulator) instead of falling through to the cloning trait default — consistent with the macro-generated updaters at window close. Rename impl_clone_accumulator_methods -> impl_accumulator_methods, since the macro now also emits the move-based into_accumulator (not just clones). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../precompute_engine/accumulator_factory.rs | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index 565fcd8c..e6005e92 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -6,11 +6,12 @@ use crate::precompute_operators::{ }; use asap_types::aggregation_config::AggregationConfig; -/// Generate the two boilerplate clone-based `AccumulatorUpdater` methods +/// Generate the boilerplate `AccumulatorUpdater` extraction methods +/// (`take_accumulator`/`snapshot_accumulator` clone, `into_accumulator` moves) /// for updaters whose inner `acc` field implements `Clone + AggregateCore`. /// Not applicable to `IncreaseAccumulatorUpdater` (its `acc` is `Option<_>` /// with non-trivial `None` handling). -macro_rules! impl_clone_accumulator_methods { +macro_rules! impl_accumulator_methods { ($acc_field:ident) => { fn take_accumulator(&mut self) -> Box { let result = Box::new(self.$acc_field.clone()); @@ -102,7 +103,7 @@ impl AccumulatorUpdater for SumAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = SumAccumulator::new(); @@ -148,7 +149,7 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = if self.is_max { @@ -229,6 +230,17 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater { } } + // Hand-written: consume the updater and MOVE the accumulator out (no clone), + // mirroring `take_accumulator`'s `Option::take`. Overriding the default + // (which clones via `snapshot_accumulator`) keeps this type consistent with + // the macro-generated updaters at window close. + fn into_accumulator(self: Box) -> Box { + let this = *self; + Box::new(this.acc.unwrap_or_else(|| { + IncreaseAccumulator::new(Measurement::new(0.0), 0, Measurement::new(0.0), 0) + })) + } + fn reset(&mut self) { self.acc = None; } @@ -269,7 +281,7 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = DatasketchesKLLAccumulator::new(self.k); @@ -316,7 +328,7 @@ impl AccumulatorUpdater for HllAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = HllAccumulator::new(self.precision); @@ -368,7 +380,7 @@ impl AccumulatorUpdater for MultipleSumAccumulatorUpdater { self.acc.update(key.clone(), value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleSumAccumulator::new(); @@ -418,7 +430,7 @@ impl AccumulatorUpdater for MultipleMinMaxAccumulatorUpdater { self.acc.update(key.clone(), value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = if self.is_max { @@ -485,7 +497,7 @@ impl AccumulatorUpdater for MultipleIncreaseAccumulatorUpdater { } } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleIncreaseAccumulator::new(); @@ -535,7 +547,7 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { self.acc.inner.update(&key.to_semicolon_str(), value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num); @@ -585,7 +597,7 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater { self.acc.update(key, value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k);