diff --git a/asap-query-engine/src/precompute_engine/accumulator_factory.rs b/asap-query-engine/src/precompute_engine/accumulator_factory.rs index d3b41dc..e6005e9 100644 --- a/asap-query-engine/src/precompute_engine/accumulator_factory.rs +++ b/asap-query-engine/src/precompute_engine/accumulator_factory.rs @@ -6,11 +6,12 @@ use crate::precompute_operators::{ }; use asap_types::aggregation_config::AggregationConfig; -/// Generate the two boilerplate clone-based `AccumulatorUpdater` methods +/// Generate the boilerplate `AccumulatorUpdater` extraction methods +/// (`take_accumulator`/`snapshot_accumulator` clone, `into_accumulator` moves) /// for updaters whose inner `acc` field implements `Clone + AggregateCore`. /// Not applicable to `IncreaseAccumulatorUpdater` (its `acc` is `Option<_>` /// with non-trivial `None` handling). -macro_rules! impl_clone_accumulator_methods { +macro_rules! impl_accumulator_methods { ($acc_field:ident) => { fn take_accumulator(&mut self) -> Box { let result = Box::new(self.$acc_field.clone()); @@ -21,6 +22,15 @@ macro_rules! impl_clone_accumulator_methods { fn snapshot_accumulator(&self) -> Box { Box::new(self.$acc_field.clone()) } + + fn into_accumulator(self: Box) -> Box { + // Consume the updater and MOVE the accumulator out — no clone. + // Avoids the expensive `Clone` (a full msgpack serialize/deserialize + // round-trip for sketch accumulators) when a pane is evicted at + // window close. + let this = *self; + Box::new(this.$acc_field) + } }; } @@ -42,6 +52,16 @@ pub trait AccumulatorUpdater: Send { /// Used by pane-based sliding windows to read shared panes. fn snapshot_accumulator(&self) -> Box; + /// Consume the updater and return its accumulator BY MOVE, avoiding the + /// `Clone` that `take_accumulator`/`snapshot_accumulator` pay (for sketch + /// accumulators that clone is a full msgpack serialize/deserialize + /// round-trip). Used by `merge_panes_for_window` when a pane is evicted at + /// window close. Default falls back to a clone for updaters that can't + /// cheaply move their inner accumulator out. + fn into_accumulator(self: Box) -> Box { + self.snapshot_accumulator() + } + /// Reset internal state for reuse (avoids re-allocation). fn reset(&mut self); @@ -83,7 +103,7 @@ impl AccumulatorUpdater for SumAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = SumAccumulator::new(); @@ -129,7 +149,7 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = if self.is_max { @@ -210,6 +230,17 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater { } } + // Hand-written: consume the updater and MOVE the accumulator out (no clone), + // mirroring `take_accumulator`'s `Option::take`. Overriding the default + // (which clones via `snapshot_accumulator`) keeps this type consistent with + // the macro-generated updaters at window close. + fn into_accumulator(self: Box) -> Box { + let this = *self; + Box::new(this.acc.unwrap_or_else(|| { + IncreaseAccumulator::new(Measurement::new(0.0), 0, Measurement::new(0.0), 0) + })) + } + fn reset(&mut self) { self.acc = None; } @@ -250,7 +281,7 @@ impl AccumulatorUpdater for KllAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = DatasketchesKLLAccumulator::new(self.k); @@ -297,7 +328,7 @@ impl AccumulatorUpdater for HllAccumulatorUpdater { self.update_single(value, timestamp_ms); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = HllAccumulator::new(self.precision); @@ -349,7 +380,7 @@ impl AccumulatorUpdater for MultipleSumAccumulatorUpdater { self.acc.update(key.clone(), value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleSumAccumulator::new(); @@ -399,7 +430,7 @@ impl AccumulatorUpdater for MultipleMinMaxAccumulatorUpdater { self.acc.update(key.clone(), value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = if self.is_max { @@ -466,7 +497,7 @@ impl AccumulatorUpdater for MultipleIncreaseAccumulatorUpdater { } } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = MultipleIncreaseAccumulator::new(); @@ -516,7 +547,7 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater { self.acc.inner.update(&key.to_semicolon_str(), value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num); @@ -566,7 +597,7 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater { self.acc.update(key, value); } - impl_clone_accumulator_methods!(acc); + impl_accumulator_methods!(acc); fn reset(&mut self) { self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k); diff --git a/asap-query-engine/src/precompute_engine/worker.rs b/asap-query-engine/src/precompute_engine/worker.rs index 37fc8f3..e72e604 100644 --- a/asap-query-engine/src/precompute_engine/worker.rs +++ b/asap-query-engine/src/precompute_engine/worker.rs @@ -704,10 +704,10 @@ fn merge_panes_for_window( for (i, &ps) in pane_starts.iter().enumerate() { let pane_acc = if i == 0 { - // Oldest pane: destructive take + evict + // Oldest pane: evict and MOVE the accumulator out (no clone). active_panes .remove(&ps) - .map(|mut updater| updater.take_accumulator()) + .map(|updater| updater.into_accumulator()) } else { // Shared pane: non-destructive snapshot active_panes