Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions asap-query-engine/src/precompute_engine/accumulator_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use crate::precompute_operators::{
};
use asap_types::aggregation_config::AggregationConfig;

/// Generate the two boilerplate clone-based `AccumulatorUpdater` methods
/// Generate the boilerplate `AccumulatorUpdater` extraction methods
/// (`take_accumulator`/`snapshot_accumulator` clone, `into_accumulator` moves)
/// for updaters whose inner `acc` field implements `Clone + AggregateCore`.
/// Not applicable to `IncreaseAccumulatorUpdater` (its `acc` is `Option<_>`
/// with non-trivial `None` handling).
macro_rules! impl_clone_accumulator_methods {
macro_rules! impl_accumulator_methods {
($acc_field:ident) => {
fn take_accumulator(&mut self) -> Box<dyn AggregateCore> {
let result = Box::new(self.$acc_field.clone());
Expand All @@ -21,6 +22,15 @@ macro_rules! impl_clone_accumulator_methods {
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore> {
Box::new(self.$acc_field.clone())
}

fn into_accumulator(self: Box<Self>) -> Box<dyn AggregateCore> {
// Consume the updater and MOVE the accumulator out — no clone.
// Avoids the expensive `Clone` (a full msgpack serialize/deserialize
// round-trip for sketch accumulators) when a pane is evicted at
// window close.
let this = *self;
Box::new(this.$acc_field)
}
};
}

Expand All @@ -42,6 +52,16 @@ pub trait AccumulatorUpdater: Send {
/// Used by pane-based sliding windows to read shared panes.
fn snapshot_accumulator(&self) -> Box<dyn AggregateCore>;

/// Consume the updater and return its accumulator BY MOVE, avoiding the
/// `Clone` that `take_accumulator`/`snapshot_accumulator` pay (for sketch
/// accumulators that clone is a full msgpack serialize/deserialize
/// round-trip). Used by `merge_panes_for_window` when a pane is evicted at
/// window close. Default falls back to a clone for updaters that can't
/// cheaply move their inner accumulator out.
fn into_accumulator(self: Box<Self>) -> Box<dyn AggregateCore> {
self.snapshot_accumulator()
}

/// Reset internal state for reuse (avoids re-allocation).
fn reset(&mut self);

Expand Down Expand Up @@ -83,7 +103,7 @@ impl AccumulatorUpdater for SumAccumulatorUpdater {
self.update_single(value, timestamp_ms);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = SumAccumulator::new();
Expand Down Expand Up @@ -129,7 +149,7 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater {
self.update_single(value, timestamp_ms);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = if self.is_max {
Expand Down Expand Up @@ -210,6 +230,17 @@ impl AccumulatorUpdater for IncreaseAccumulatorUpdater {
}
}

// Hand-written: consume the updater and MOVE the accumulator out (no clone),
// mirroring `take_accumulator`'s `Option::take`. Overriding the default
// (which clones via `snapshot_accumulator`) keeps this type consistent with
// the macro-generated updaters at window close.
fn into_accumulator(self: Box<Self>) -> Box<dyn AggregateCore> {
let this = *self;
Box::new(this.acc.unwrap_or_else(|| {
IncreaseAccumulator::new(Measurement::new(0.0), 0, Measurement::new(0.0), 0)
}))
}

fn reset(&mut self) {
self.acc = None;
}
Expand Down Expand Up @@ -250,7 +281,7 @@ impl AccumulatorUpdater for KllAccumulatorUpdater {
self.update_single(value, timestamp_ms);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = DatasketchesKLLAccumulator::new(self.k);
Expand Down Expand Up @@ -297,7 +328,7 @@ impl AccumulatorUpdater for HllAccumulatorUpdater {
self.update_single(value, timestamp_ms);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = HllAccumulator::new(self.precision);
Expand Down Expand Up @@ -349,7 +380,7 @@ impl AccumulatorUpdater for MultipleSumAccumulatorUpdater {
self.acc.update(key.clone(), value);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = MultipleSumAccumulator::new();
Expand Down Expand Up @@ -399,7 +430,7 @@ impl AccumulatorUpdater for MultipleMinMaxAccumulatorUpdater {
self.acc.update(key.clone(), value);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = if self.is_max {
Expand Down Expand Up @@ -466,7 +497,7 @@ impl AccumulatorUpdater for MultipleIncreaseAccumulatorUpdater {
}
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = MultipleIncreaseAccumulator::new();
Expand Down Expand Up @@ -516,7 +547,7 @@ impl AccumulatorUpdater for CmsAccumulatorUpdater {
self.acc.inner.update(&key.to_semicolon_str(), value);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = CountMinSketchAccumulator::new(self.row_num, self.col_num);
Expand Down Expand Up @@ -566,7 +597,7 @@ impl AccumulatorUpdater for HydraKllAccumulatorUpdater {
self.acc.update(key, value);
}

impl_clone_accumulator_methods!(acc);
impl_accumulator_methods!(acc);

fn reset(&mut self) {
self.acc = HydraKllSketchAccumulator::new(self.row_num, self.col_num, self.k);
Expand Down
4 changes: 2 additions & 2 deletions asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,10 +704,10 @@ fn merge_panes_for_window(

for (i, &ps) in pane_starts.iter().enumerate() {
let pane_acc = if i == 0 {
// Oldest pane: destructive take + evict
// Oldest pane: evict and MOVE the accumulator out (no clone).
active_panes
.remove(&ps)
.map(|mut updater| updater.take_accumulator())
.map(|updater| updater.into_accumulator())
} else {
// Shared pane: non-destructive snapshot
active_panes
Expand Down
Loading