Skip to content

Commit 258f207

Browse files
refactor: accumulator factory in precomput engine
1 parent 5a22b3c commit 258f207

10 files changed

Lines changed: 119 additions & 96 deletions

asap-query-engine/benches/simple_store_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl AccumulatorKind {
111111
Self::Kll => {
112112
let mut acc = DatasketchesKLLAccumulator::new(200);
113113
for v in 0..20 {
114-
acc._update(v as f64 * (value + 1.0));
114+
acc.update(v as f64 * (value + 1.0));
115115
}
116116
Box::new(acc)
117117
}

asap-query-engine/src/engines/physical/summary_merge_multiple_exec.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -466,11 +466,11 @@ mod tests {
466466
#[test]
467467
fn test_merge_kll_sketches() {
468468
let mut kll1 = DatasketchesKLLAccumulator::new(200);
469-
kll1._update(1.0);
470-
kll1._update(2.0);
469+
kll1.update(1.0);
470+
kll1.update(2.0);
471471
let mut kll2 = DatasketchesKLLAccumulator::new(200);
472-
kll2._update(3.0);
473-
kll2._update(4.0);
472+
kll2.update(3.0);
473+
kll2.update(4.0);
474474

475475
let bytes1 = serialize_accumulator_arroyo(&kll1);
476476
let bytes2 = serialize_accumulator_arroyo(&kll2);

asap-query-engine/src/precompute_engine/accumulator_factory.rs

Lines changed: 88 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,18 @@ impl AccumulatorUpdater for SumAccumulatorUpdater {
104104

105105
pub struct MinMaxAccumulatorUpdater {
106106
acc: MinMaxAccumulator,
107-
sub_type: String,
107+
is_max: bool,
108108
}
109109

110110
impl MinMaxAccumulatorUpdater {
111-
pub fn new(sub_type: String) -> Self {
111+
pub fn new(is_max: bool) -> Self {
112112
Self {
113-
acc: MinMaxAccumulator::new(sub_type.clone()),
114-
sub_type,
113+
acc: if is_max {
114+
MinMaxAccumulator::new_max()
115+
} else {
116+
MinMaxAccumulator::new_min()
117+
},
118+
is_max,
115119
}
116120
}
117121
}
@@ -128,7 +132,11 @@ impl AccumulatorUpdater for MinMaxAccumulatorUpdater {
128132
impl_clone_accumulator_methods!(acc);
129133

130134
fn reset(&mut self) {
131-
self.acc = MinMaxAccumulator::new(self.sub_type.clone());
135+
self.acc = if self.is_max {
136+
MinMaxAccumulator::new_max()
137+
} else {
138+
MinMaxAccumulator::new_min()
139+
};
132140
}
133141

134142
fn is_keyed(&self) -> bool {
@@ -235,7 +243,7 @@ impl KllAccumulatorUpdater {
235243

236244
impl AccumulatorUpdater for KllAccumulatorUpdater {
237245
fn update_single(&mut self, value: f64, _timestamp_ms: i64) {
238-
self.acc._update(value);
246+
self.acc.update(value);
239247
}
240248

241249
fn update_keyed(&mut self, _key: &KeyByLabelValues, value: f64, timestamp_ms: i64) {
@@ -259,30 +267,33 @@ impl AccumulatorUpdater for KllAccumulatorUpdater {
259267
}
260268

261269
// ---------------------------------------------------------------------------
262-
// MultipleSumUpdater
270+
// MultipleSumAccumulatorUpdater
263271
// ---------------------------------------------------------------------------
264272

265-
pub struct MultipleSumUpdater {
273+
pub struct MultipleSumAccumulatorUpdater {
266274
acc: MultipleSumAccumulator,
267275
}
268276

269-
impl MultipleSumUpdater {
277+
impl MultipleSumAccumulatorUpdater {
270278
pub fn new() -> Self {
271279
Self {
272280
acc: MultipleSumAccumulator::new(),
273281
}
274282
}
275283
}
276284

277-
impl Default for MultipleSumUpdater {
285+
impl Default for MultipleSumAccumulatorUpdater {
278286
fn default() -> Self {
279287
Self::new()
280288
}
281289
}
282290

283-
impl AccumulatorUpdater for MultipleSumUpdater {
291+
impl AccumulatorUpdater for MultipleSumAccumulatorUpdater {
284292
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
285-
// Multiple-subpopulation — use update_keyed instead
293+
debug_assert!(
294+
false,
295+
"update_single called on keyed updater; use update_keyed"
296+
);
286297
}
287298

288299
fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
@@ -306,26 +317,33 @@ impl AccumulatorUpdater for MultipleSumUpdater {
306317
}
307318

308319
// ---------------------------------------------------------------------------
309-
// MultipleMinMaxUpdater
320+
// MultipleMinMaxAccumulatorUpdater
310321
// ---------------------------------------------------------------------------
311322

312-
pub struct MultipleMinMaxUpdater {
323+
pub struct MultipleMinMaxAccumulatorUpdater {
313324
acc: MultipleMinMaxAccumulator,
314-
sub_type: String,
325+
is_max: bool,
315326
}
316327

317-
impl MultipleMinMaxUpdater {
318-
pub fn new(sub_type: String) -> Self {
328+
impl MultipleMinMaxAccumulatorUpdater {
329+
pub fn new(is_max: bool) -> Self {
319330
Self {
320-
acc: MultipleMinMaxAccumulator::new(sub_type.clone()),
321-
sub_type,
331+
acc: if is_max {
332+
MultipleMinMaxAccumulator::new_max()
333+
} else {
334+
MultipleMinMaxAccumulator::new_min()
335+
},
336+
is_max,
322337
}
323338
}
324339
}
325340

326-
impl AccumulatorUpdater for MultipleMinMaxUpdater {
341+
impl AccumulatorUpdater for MultipleMinMaxAccumulatorUpdater {
327342
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
328-
// Multiple-subpopulation — use update_keyed instead
343+
debug_assert!(
344+
false,
345+
"update_single called on keyed updater; use update_keyed"
346+
);
329347
}
330348

331349
fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
@@ -335,7 +353,11 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater {
335353
impl_clone_accumulator_methods!(acc);
336354

337355
fn reset(&mut self) {
338-
self.acc = MultipleMinMaxAccumulator::new(self.sub_type.clone());
356+
self.acc = if self.is_max {
357+
MultipleMinMaxAccumulator::new_max()
358+
} else {
359+
MultipleMinMaxAccumulator::new_min()
360+
};
339361
}
340362

341363
fn is_keyed(&self) -> bool {
@@ -349,47 +371,49 @@ impl AccumulatorUpdater for MultipleMinMaxUpdater {
349371
}
350372

351373
// ---------------------------------------------------------------------------
352-
// MultipleIncreaseUpdater
374+
// MultipleIncreaseAccumulatorUpdater
353375
// ---------------------------------------------------------------------------
354376

355-
pub struct MultipleIncreaseUpdater {
377+
pub struct MultipleIncreaseAccumulatorUpdater {
356378
acc: MultipleIncreaseAccumulator,
357379
}
358380

359-
impl MultipleIncreaseUpdater {
381+
impl MultipleIncreaseAccumulatorUpdater {
360382
pub fn new() -> Self {
361383
Self {
362384
acc: MultipleIncreaseAccumulator::new(),
363385
}
364386
}
365387
}
366388

367-
impl Default for MultipleIncreaseUpdater {
389+
impl Default for MultipleIncreaseAccumulatorUpdater {
368390
fn default() -> Self {
369391
Self::new()
370392
}
371393
}
372394

373-
impl AccumulatorUpdater for MultipleIncreaseUpdater {
395+
impl AccumulatorUpdater for MultipleIncreaseAccumulatorUpdater {
374396
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
375-
// Multiple-subpopulation — use update_keyed instead
397+
debug_assert!(
398+
false,
399+
"update_single called on keyed updater; use update_keyed"
400+
);
376401
}
377402

378403
fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, timestamp_ms: i64) {
379404
let measurement = Measurement::new(value);
380-
// If key already exists, update it; otherwise create new
381-
if self.acc.increases.contains_key(key) {
382-
if let Some(existing) = self.acc.increases.get_mut(key) {
383-
existing.update(measurement, timestamp_ms);
405+
match self.acc.increases.entry(key.clone()) {
406+
std::collections::hash_map::Entry::Occupied(mut e) => {
407+
e.get_mut().update(measurement, timestamp_ms);
408+
}
409+
std::collections::hash_map::Entry::Vacant(e) => {
410+
e.insert(IncreaseAccumulator::new(
411+
measurement.clone(),
412+
timestamp_ms,
413+
measurement,
414+
timestamp_ms,
415+
));
384416
}
385-
} else {
386-
let new_acc = IncreaseAccumulator::new(
387-
measurement.clone(),
388-
timestamp_ms,
389-
measurement,
390-
timestamp_ms,
391-
);
392-
self.acc.update(key.clone(), new_acc);
393417
}
394418
}
395419

@@ -433,7 +457,10 @@ impl CmsAccumulatorUpdater {
433457

434458
impl AccumulatorUpdater for CmsAccumulatorUpdater {
435459
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
436-
// CMS is keyed — use update_keyed
460+
debug_assert!(
461+
false,
462+
"update_single called on keyed updater; use update_keyed"
463+
);
437464
}
438465

439466
fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
@@ -480,7 +507,10 @@ impl HydraKllAccumulatorUpdater {
480507

481508
impl AccumulatorUpdater for HydraKllAccumulatorUpdater {
482509
fn update_single(&mut self, _value: f64, _timestamp_ms: i64) {
483-
// HydraKLL is keyed — use update_keyed
510+
debug_assert!(
511+
false,
512+
"update_single called on keyed updater; use update_keyed"
513+
);
484514
}
485515

486516
fn update_keyed(&mut self, key: &KeyByLabelValues, value: f64, _timestamp_ms: i64) {
@@ -534,7 +564,8 @@ fn kll_k_param(config: &AggregationConfig) -> u16 {
534564
.get("K")
535565
.or_else(|| config.parameters.get("k"))
536566
.and_then(|v| v.as_u64())
537-
.unwrap_or(200) as u16
567+
.and_then(|v| u16::try_from(v).ok())
568+
.unwrap_or(200)
538569
}
539570

540571
/// Extract `(row_num, col_num)` for CMS / HydraKLL configs.
@@ -569,8 +600,8 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
569600
match config.aggregation_type {
570601
AggregationType::SingleSubpopulation => match sub_type {
571602
"Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()),
572-
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new("min".to_string())),
573-
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new("max".to_string())),
603+
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new(false)),
604+
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new(true)),
574605
"Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()),
575606
"DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => {
576607
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
@@ -584,10 +615,10 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
584615
}
585616
},
586617
AggregationType::MultipleSubpopulation => match sub_type {
587-
"Sum" | "sum" => Box::new(MultipleSumUpdater::new()),
588-
"Min" | "min" => Box::new(MultipleMinMaxUpdater::new("min".to_string())),
589-
"Max" | "max" => Box::new(MultipleMinMaxUpdater::new("max".to_string())),
590-
"Increase" | "increase" => Box::new(MultipleIncreaseUpdater::new()),
618+
"Sum" | "sum" => Box::new(MultipleSumAccumulatorUpdater::new()),
619+
"Min" | "min" => Box::new(MultipleMinMaxAccumulatorUpdater::new(false)),
620+
"Max" | "max" => Box::new(MultipleMinMaxAccumulatorUpdater::new(true)),
621+
"Increase" | "increase" => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
591622
"CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => {
592623
let (row_num, col_num) = cms_params(config);
593624
Box::new(CmsAccumulatorUpdater::new(row_num, col_num))
@@ -601,28 +632,20 @@ pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn Accumul
601632
"Unknown MultipleSubpopulation sub_type '{}', defaulting to Sum",
602633
other
603634
);
604-
Box::new(MultipleSumUpdater::new())
635+
Box::new(MultipleSumAccumulatorUpdater::new())
605636
}
606637
},
607638
AggregationType::DatasketchesKLL => {
608639
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
609640
}
610-
AggregationType::MultipleSum => Box::new(MultipleSumUpdater::new()),
611-
AggregationType::MultipleIncrease => Box::new(MultipleIncreaseUpdater::new()),
612-
AggregationType::MultipleMinMax => Box::new(MultipleMinMaxUpdater::new(
613-
if sub_type.eq_ignore_ascii_case("max") {
614-
"max".to_string()
615-
} else {
616-
"min".to_string()
617-
},
641+
AggregationType::MultipleSum => Box::new(MultipleSumAccumulatorUpdater::new()),
642+
AggregationType::MultipleIncrease => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
643+
AggregationType::MultipleMinMax => Box::new(MultipleMinMaxAccumulatorUpdater::new(
644+
sub_type.eq_ignore_ascii_case("max"),
618645
)),
619646
AggregationType::Sum => Box::new(SumAccumulatorUpdater::new()),
620647
AggregationType::MinMax => Box::new(MinMaxAccumulatorUpdater::new(
621-
if sub_type.eq_ignore_ascii_case("max") {
622-
"max".to_string()
623-
} else {
624-
"min".to_string()
625-
},
648+
sub_type.eq_ignore_ascii_case("max"),
626649
)),
627650
AggregationType::Increase => Box::new(IncreaseAccumulatorUpdater::new()),
628651
AggregationType::CountMinSketch | AggregationType::CountMinSketchWithHeap => {
@@ -663,7 +686,7 @@ mod tests {
663686

664687
#[test]
665688
fn test_minmax_updater() {
666-
let mut updater = MinMaxAccumulatorUpdater::new("max".to_string());
689+
let mut updater = MinMaxAccumulatorUpdater::new(true);
667690
updater.update_single(5.0, 1000);
668691
updater.update_single(3.0, 2000);
669692
updater.update_single(7.0, 3000);
@@ -695,7 +718,7 @@ mod tests {
695718

696719
#[test]
697720
fn test_multiple_sum_updater() {
698-
let mut updater = MultipleSumUpdater::new();
721+
let mut updater = MultipleSumAccumulatorUpdater::new();
699722
assert!(updater.is_keyed());
700723

701724
let key_a = KeyByLabelValues::new_with_labels(vec!["a".to_string()]);

0 commit comments

Comments
 (0)