@@ -138,56 +138,6 @@ impl PrecomputedOutput {
138138 // })
139139 // }
140140
141- // /// Deserialization for Flink streaming engine
142- // pub fn deserialize_from_json_flink(
143- // data: &serde_json::Value,
144- // streaming_config: &HashMap<u64, AggregationConfig>,
145- // ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
146- // let aggregation_id = data
147- // .get("aggregation_id")
148- // .and_then(|v| v.as_u64())
149- // .ok_or("Missing or invalid 'aggregation_id' field")?;
150-
151- // let start_timestamp = data
152- // .get("start_timestamp")
153- // .and_then(|v| v.as_u64())
154- // .ok_or("Missing or invalid 'start_timestamp' field")?;
155-
156- // let end_timestamp = data
157- // .get("end_timestamp")
158- // .and_then(|v| v.as_u64())
159- // .ok_or("Missing or invalid 'end_timestamp' field")?;
160-
161- // let key = if let Some(key_data) = data.get("key") {
162- // if key_data.is_null() {
163- // None
164- // } else {
165- // Some(KeyByLabelValues::deserialize_from_json(key_data).map_err(
166- // |e| -> Box<dyn std::error::Error + Send + Sync> {
167- // format!("Failed to deserialize key: {e}").into()
168- // },
169- // )?)
170- // }
171- // } else {
172- // None
173- // };
174-
175- // // Get aggregation type from streaming config lookup
176- // let config = streaming_config
177- // .get(&aggregation_id)
178- // .ok_or_else(|| {
179- // format!("Aggregation ID {aggregation_id} not found in streaming config")
180- // })?
181- // .clone();
182-
183- // Ok(Self {
184- // start_timestamp,
185- // end_timestamp,
186- // key,
187- // config,
188- // })
189- // }
190-
191141 /// Deserialization for Arroyo streaming engine
192142 pub fn deserialize_from_json_arroyo (
193143 data : & serde_json:: Value ,
@@ -283,7 +233,6 @@ impl PrecomputedOutput {
283233 let precompute = Self :: create_precompute_from_bytes (
284234 & config. aggregation_type ,
285235 Vec :: as_slice ( & precompute_bytes) ,
286- "arroyo" ,
287236 ) ?;
288237
289238 Ok ( ( precomputed_output, precompute) )
@@ -415,22 +364,14 @@ impl PrecomputedOutput {
415364 fn create_precompute_from_bytes (
416365 precompute_type : & str ,
417366 buffer : & [ u8 ] ,
418- streaming_engine : & str ,
419367 ) -> Result < Box < dyn crate :: data_model:: AggregateCore > , Box < dyn std:: error:: Error + Send + Sync > >
420368 {
421369 use crate :: precompute_operators:: * ;
422370
423- // TODO: add arroyo methods in each operator
424- // TODO: remove flink methods
425-
426371 match precompute_type {
427372 "Sum" | "sum" => {
428- let accumulator = if streaming_engine == "flink" {
429- SumAccumulator :: deserialize_from_bytes ( buffer)
430- } else {
431- SumAccumulator :: deserialize_from_bytes_arroyo ( buffer)
432- }
433- . map_err ( |e| format ! ( "Failed to deserialize SumAccumulator: {e}" ) ) ?;
373+ let accumulator = SumAccumulator :: deserialize_from_bytes_arroyo ( buffer)
374+ . map_err ( |e| format ! ( "Failed to deserialize SumAccumulator: {e}" ) ) ?;
434375 Ok ( Box :: new ( accumulator) )
435376 }
436377 "MinMax" => {
@@ -457,59 +398,50 @@ impl PrecomputedOutput {
457398 Ok ( Box :: new ( accumulator) )
458399 }
459400 "MultipleIncrease" => {
460- let accumulator = if streaming_engine == "flink" {
461- MultipleIncreaseAccumulator :: deserialize_from_bytes ( buffer)
462- } else {
401+ let accumulator =
463402 MultipleIncreaseAccumulator :: deserialize_from_bytes_arroyo ( buffer)
464- }
465- . map_err ( |e| format ! ( "Failed to deserialize MultipleIncreaseAccumulator: {e}" ) ) ?;
403+ . map_err ( |e| {
404+ format ! ( "Failed to deserialize MultipleIncreaseAccumulator: {e}" )
405+ } ) ?;
466406 Ok ( Box :: new ( accumulator) )
467407 }
468408 "CountMinSketch" => {
469- let accumulator = if streaming_engine == "flink" {
470- CountMinSketchAccumulator :: deserialize_from_bytes ( buffer)
471- } else {
472- CountMinSketchAccumulator :: deserialize_from_bytes_arroyo ( buffer)
473- }
474- . map_err ( |e| format ! ( "Failed to deserialize CountMinSketchAccumulator: {e}" ) ) ?;
409+ let accumulator = CountMinSketchAccumulator :: deserialize_from_bytes_arroyo ( buffer)
410+ . map_err ( |e| format ! ( "Failed to deserialize CountMinSketchAccumulator: {e}" ) ) ?;
475411 Ok ( Box :: new ( accumulator) )
476412 }
477413 "CountMinSketchWithHeap" => {
478- let accumulator = if streaming_engine == "flink" {
479- CountMinSketchWithHeapAccumulator :: deserialize_from_bytes ( buffer)
480- } else {
414+ let accumulator =
481415 CountMinSketchWithHeapAccumulator :: deserialize_from_bytes_arroyo ( buffer)
482- }
483- . map_err ( |e| {
484- format ! ( "Failed to deserialize CountMinSketchWithHeapAccumulator: {e}" )
485- } ) ?;
416+ . map_err ( |e| {
417+ format ! (
418+ "Failed to deserialize CountMinSketchWithHeapAccumulator: {e}"
419+ )
420+ } ) ?;
486421 Ok ( Box :: new ( accumulator) )
487422 }
488423 "DatasketchesKLL" => {
489- let accumulator = if streaming_engine == "flink" {
490- DatasketchesKLLAccumulator :: deserialize_from_bytes ( buffer)
491- } else {
424+ let accumulator =
492425 DatasketchesKLLAccumulator :: deserialize_from_bytes_arroyo ( buffer)
493- }
494- . map_err ( |e| format ! ( "Failed to deserialize DatasketchesKLLAccumulator: {e}" ) ) ?;
426+ . map_err ( |e| {
427+ format ! ( "Failed to deserialize DatasketchesKLLAccumulator: {e}" )
428+ } ) ?;
495429 Ok ( Box :: new ( accumulator) )
496430 }
497431 "HydraKLL" => {
498- let accumulator = if streaming_engine == "flink" {
499- return Err ( "HydraKLL not supported for Flink" . into ( ) ) ;
500- } else {
432+ let accumulator =
501433 HydraKllSketchAccumulator :: deserialize_from_bytes_arroyo ( buffer)
502- }
503- . map_err ( |e| format ! ( "Failed to deserialize HydraKllSketchAccumulator: {e}" ) ) ?;
434+ . map_err ( |e| {
435+ format ! ( "Failed to deserialize HydraKllSketchAccumulator: {e}" )
436+ } ) ?;
504437 Ok ( Box :: new ( accumulator) )
505438 }
506439 "DeltaSetAggregator" => {
507- let accumulator = if streaming_engine == "flink" {
508- DeltaSetAggregatorAccumulator :: deserialize_from_bytes ( buffer)
509- } else {
440+ let accumulator =
510441 DeltaSetAggregatorAccumulator :: deserialize_from_bytes_arroyo ( buffer)
511- }
512- . map_err ( |e| format ! ( "Failed to deserialize DeltaSetAggregatorAccumulator: {e}" ) ) ?;
442+ . map_err ( |e| {
443+ format ! ( "Failed to deserialize DeltaSetAggregatorAccumulator: {e}" )
444+ } ) ?;
513445 Ok ( Box :: new ( accumulator) )
514446 }
515447 _ => Err ( format ! ( "Unknown precompute type: {precompute_type}" ) . into ( ) ) ,
0 commit comments