Skip to content

Commit b5da069

Browse files
zzylolclaude
andauthored
Remove unused Flink code pieces (#190)
* Remove unused Flink code pieces from query engine - Delete commenting_out_flink_diff temp file - Remove StreamingEngine enum (Flink/Arroyo variants no longer needed) - Remove --streaming-engine CLI arg from main.rs - Remove streaming_engine field from KafkaConsumerConfig - Remove StreamingEngine::Flink match arm and all commented-out Flink message handling from kafka.rs - Remove deserialize_from_json_flink commented-out method from precomputed_output.rs - Simplify create_precompute_from_bytes: remove streaming_engine param and replace all Flink/Arroyo conditionals with direct arroyo calls - Clean up residual Flink references in accumulator files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Restore StreamingEngine type with only Arroyo variant Keep StreamingEngine enum and --streaming-engine CLI arg but with only the Arroyo variant (Flink variant removed). Also fix cargo fmt issues. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5946aa9 commit b5da069

7 files changed

Lines changed: 65 additions & 1281 deletions

File tree

asap-query-engine/src/commenting_out_flink_diff

Lines changed: 0 additions & 1032 deletions
This file was deleted.

asap-query-engine/src/data_model/enums.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ pub enum InputFormat {
66

77
#[derive(clap::ValueEnum, Clone, Debug)]
88
pub enum StreamingEngine {
9-
Flink,
109
Arroyo,
1110
}
1211

asap-query-engine/src/data_model/precomputed_output.rs

Lines changed: 20 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -138,56 +138,6 @@ impl PrecomputedOutput {
138138
// })
139139
// }
140140

141-
// /// Deserialization for Flink streaming engine
142-
// pub fn deserialize_from_json_flink(
143-
// data: &serde_json::Value,
144-
// streaming_config: &HashMap<u64, AggregationConfig>,
145-
// ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
146-
// let aggregation_id = data
147-
// .get("aggregation_id")
148-
// .and_then(|v| v.as_u64())
149-
// .ok_or("Missing or invalid 'aggregation_id' field")?;
150-
151-
// let start_timestamp = data
152-
// .get("start_timestamp")
153-
// .and_then(|v| v.as_u64())
154-
// .ok_or("Missing or invalid 'start_timestamp' field")?;
155-
156-
// let end_timestamp = data
157-
// .get("end_timestamp")
158-
// .and_then(|v| v.as_u64())
159-
// .ok_or("Missing or invalid 'end_timestamp' field")?;
160-
161-
// let key = if let Some(key_data) = data.get("key") {
162-
// if key_data.is_null() {
163-
// None
164-
// } else {
165-
// Some(KeyByLabelValues::deserialize_from_json(key_data).map_err(
166-
// |e| -> Box<dyn std::error::Error + Send + Sync> {
167-
// format!("Failed to deserialize key: {e}").into()
168-
// },
169-
// )?)
170-
// }
171-
// } else {
172-
// None
173-
// };
174-
175-
// // Get aggregation type from streaming config lookup
176-
// let config = streaming_config
177-
// .get(&aggregation_id)
178-
// .ok_or_else(|| {
179-
// format!("Aggregation ID {aggregation_id} not found in streaming config")
180-
// })?
181-
// .clone();
182-
183-
// Ok(Self {
184-
// start_timestamp,
185-
// end_timestamp,
186-
// key,
187-
// config,
188-
// })
189-
// }
190-
191141
/// Deserialization for Arroyo streaming engine
192142
pub fn deserialize_from_json_arroyo(
193143
data: &serde_json::Value,
@@ -283,7 +233,6 @@ impl PrecomputedOutput {
283233
let precompute = Self::create_precompute_from_bytes(
284234
&config.aggregation_type,
285235
Vec::as_slice(&precompute_bytes),
286-
"arroyo",
287236
)?;
288237

289238
Ok((precomputed_output, precompute))
@@ -415,22 +364,14 @@ impl PrecomputedOutput {
415364
fn create_precompute_from_bytes(
416365
precompute_type: &str,
417366
buffer: &[u8],
418-
streaming_engine: &str,
419367
) -> Result<Box<dyn crate::data_model::AggregateCore>, Box<dyn std::error::Error + Send + Sync>>
420368
{
421369
use crate::precompute_operators::*;
422370

423-
// TODO: add arroyo methods in each operator
424-
// TODO: remove flink methods
425-
426371
match precompute_type {
427372
"Sum" | "sum" => {
428-
let accumulator = if streaming_engine == "flink" {
429-
SumAccumulator::deserialize_from_bytes(buffer)
430-
} else {
431-
SumAccumulator::deserialize_from_bytes_arroyo(buffer)
432-
}
433-
.map_err(|e| format!("Failed to deserialize SumAccumulator: {e}"))?;
373+
let accumulator = SumAccumulator::deserialize_from_bytes_arroyo(buffer)
374+
.map_err(|e| format!("Failed to deserialize SumAccumulator: {e}"))?;
434375
Ok(Box::new(accumulator))
435376
}
436377
"MinMax" => {
@@ -457,58 +398,41 @@ impl PrecomputedOutput {
457398
Ok(Box::new(accumulator))
458399
}
459400
"MultipleIncrease" => {
460-
let accumulator = if streaming_engine == "flink" {
461-
MultipleIncreaseAccumulator::deserialize_from_bytes(buffer)
462-
} else {
463-
MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo(buffer)
464-
}
401+
let accumulator = MultipleIncreaseAccumulator::deserialize_from_bytes_arroyo(
402+
buffer,
403+
)
465404
.map_err(|e| format!("Failed to deserialize MultipleIncreaseAccumulator: {e}"))?;
466405
Ok(Box::new(accumulator))
467406
}
468407
"CountMinSketch" => {
469-
let accumulator = if streaming_engine == "flink" {
470-
CountMinSketchAccumulator::deserialize_from_bytes(buffer)
471-
} else {
472-
CountMinSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
473-
}
474-
.map_err(|e| format!("Failed to deserialize CountMinSketchAccumulator: {e}"))?;
408+
let accumulator = CountMinSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
409+
.map_err(|e| format!("Failed to deserialize CountMinSketchAccumulator: {e}"))?;
475410
Ok(Box::new(accumulator))
476411
}
477412
"CountMinSketchWithHeap" => {
478-
let accumulator = if streaming_engine == "flink" {
479-
CountMinSketchWithHeapAccumulator::deserialize_from_bytes(buffer)
480-
} else {
413+
let accumulator =
481414
CountMinSketchWithHeapAccumulator::deserialize_from_bytes_arroyo(buffer)
482-
}
483-
.map_err(|e| {
484-
format!("Failed to deserialize CountMinSketchWithHeapAccumulator: {e}")
485-
})?;
415+
.map_err(|e| {
416+
format!("Failed to deserialize CountMinSketchWithHeapAccumulator: {e}")
417+
})?;
486418
Ok(Box::new(accumulator))
487419
}
488420
"DatasketchesKLL" => {
489-
let accumulator = if streaming_engine == "flink" {
490-
DatasketchesKLLAccumulator::deserialize_from_bytes(buffer)
491-
} else {
492-
DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer)
493-
}
494-
.map_err(|e| format!("Failed to deserialize DatasketchesKLLAccumulator: {e}"))?;
421+
let accumulator = DatasketchesKLLAccumulator::deserialize_from_bytes_arroyo(buffer)
422+
.map_err(|e| {
423+
format!("Failed to deserialize DatasketchesKLLAccumulator: {e}")
424+
})?;
495425
Ok(Box::new(accumulator))
496426
}
497427
"HydraKLL" => {
498-
let accumulator = if streaming_engine == "flink" {
499-
return Err("HydraKLL not supported for Flink".into());
500-
} else {
501-
HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
502-
}
503-
.map_err(|e| format!("Failed to deserialize HydraKllSketchAccumulator: {e}"))?;
428+
let accumulator = HydraKllSketchAccumulator::deserialize_from_bytes_arroyo(buffer)
429+
.map_err(|e| format!("Failed to deserialize HydraKllSketchAccumulator: {e}"))?;
504430
Ok(Box::new(accumulator))
505431
}
506432
"DeltaSetAggregator" => {
507-
let accumulator = if streaming_engine == "flink" {
508-
DeltaSetAggregatorAccumulator::deserialize_from_bytes(buffer)
509-
} else {
510-
DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo(buffer)
511-
}
433+
let accumulator = DeltaSetAggregatorAccumulator::deserialize_from_bytes_arroyo(
434+
buffer,
435+
)
512436
.map_err(|e| format!("Failed to deserialize DeltaSetAggregatorAccumulator: {e}"))?;
513437
Ok(Box::new(accumulator))
514438
}

asap-query-engine/src/drivers/ingest/kafka.rs

Lines changed: 43 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -273,155 +273,51 @@ impl<T: Store + Send + Sync + 'static> KafkaConsumer<T> {
273273
Err("Binary input format with precompute not implemented".into())
274274
}
275275
InputFormat::Json => {
276-
// Handle streaming engine specific logic
277-
match self.config.streaming_engine {
278-
StreamingEngine::Flink => {
279-
// debug!("Received message of length: {}", payload.len());
280-
281-
// let json_data = if self.config.decompress_json {
282-
// // Decompress using gzip
283-
// let mut decoder = GzDecoder::new(payload);
284-
// let mut decompressed = Vec::new();
285-
// match decoder.read_to_end(&mut decompressed) {
286-
// Ok(_) => {
287-
// debug!(
288-
// "Decompressed JSON message of length: {}",
289-
// decompressed.len()
290-
// );
291-
// decompressed
292-
// }
293-
// Err(e) => {
294-
// error!("Error decompressing gzip data: {}", e);
295-
// return Err(format!("Gzip decompression error: {e}").into());
296-
// }
297-
// }
298-
// } else {
299-
// payload.to_vec()
300-
// };
301-
302-
// let json_str = match String::from_utf8(json_data) {
303-
// Ok(s) => s,
304-
// Err(e) => {
305-
// error!("Error converting bytes to UTF-8: {}", e);
306-
// return Err(format!("UTF-8 conversion error: {e}").into());
307-
// }
308-
// };
309-
310-
// let json_parse_start_time = Instant::now();
311-
312-
// let json_dict: serde_json::Value = match serde_json::from_str(&json_str) {
313-
// Ok(dict) => {
314-
// let json_parse_duration = json_parse_start_time.elapsed();
315-
// debug!(
316-
// "JSON parsing took: {:.2}ms",
317-
// json_parse_duration.as_secs_f64() * 1000.0
318-
// );
319-
// dict
320-
// }
321-
// Err(e) => {
322-
// error!("Error parsing JSON: {}", e);
323-
// debug!("JSON content: {}", json_str);
324-
// return Err(format!("JSON parsing error: {e}").into());
325-
// }
326-
// };
327-
328-
// debug!(
329-
// "Deserializing JSON message: {}, {}, {}",
330-
// json_dict
331-
// .get("aggregation_id")
332-
// .and_then(|v| v.as_u64())
333-
// .unwrap_or(0),
334-
// json_dict
335-
// .get("start_timestamp")
336-
// .and_then(|v| v.as_u64())
337-
// .unwrap_or(0),
338-
// json_dict
339-
// .get("end_timestamp")
340-
// .and_then(|v| v.as_u64())
341-
// .unwrap_or(0)
342-
// );
343-
344-
// let deserialize_start_time = Instant::now();
345-
346-
// match PrecomputedOutput::deserialize_from_json_with_precompute(&json_dict) {
347-
// Ok((output, precompute)) => {
348-
// let deserialize_duration = deserialize_start_time.elapsed();
349-
// debug!(
350-
// "Deserialization took: {:.2}ms",
351-
// deserialize_duration.as_secs_f64() * 1000.0
352-
// );
353-
// debug!(
354-
// "Deserialized item: {}, {}, {}",
355-
// output.config.aggregation_id,
356-
// output.start_timestamp,
357-
// output.end_timestamp
358-
// );
359-
// debug!("Successfully deserialized Flink JSON message with precompute data");
360-
// let total_message_duration = message_start_time.elapsed();
361-
// debug!(
362-
// "Total message processing took: {:.2}ms",
363-
// total_message_duration.as_secs_f64() * 1000.0
364-
// );
365-
// Ok(Some((output, precompute)))
366-
// }
367-
// Err(e) => {
368-
// error!(
369-
// "Error deserializing Flink PrecomputedOutput from JSON with precompute: {}",
370-
// e
371-
// );
372-
// debug!("JSON content: {}", json_str);
373-
// Err(e)
374-
// }
375-
// }
376-
error!("Flink input format with precompute not implemented");
377-
Err("Flink input format with precompute not implemented".into())
276+
// Arroyo messages - gzip decompression is applied at precompute level, not message level
277+
let json_str = match String::from_utf8(payload.to_vec()) {
278+
Ok(s) => s,
279+
Err(e) => {
280+
error!("Error converting bytes to UTF-8: {}", e);
281+
return Err(format!("UTF-8 conversion error: {e}").into());
378282
}
379-
StreamingEngine::Arroyo => {
380-
// Arroyo messages - gzip decompression is applied at precompute level, not message level
381-
let json_str = match String::from_utf8(payload.to_vec()) {
382-
Ok(s) => s,
383-
Err(e) => {
384-
error!("Error converting bytes to UTF-8: {}", e);
385-
return Err(format!("UTF-8 conversion error: {e}").into());
386-
}
387-
};
388-
389-
let json_dict: serde_json::Value = match serde_json::from_str(&json_str) {
390-
Ok(dict) => dict,
391-
Err(e) => {
392-
error!("Error parsing Arroyo JSON: {}", e);
393-
debug!("JSON content: {}", json_str);
394-
return Err(format!("JSON parsing error: {e}").into());
395-
}
396-
};
283+
};
397284

398-
let deserialize_start_time = Instant::now();
399-
match PrecomputedOutput::deserialize_from_json_arroyo(
400-
&json_dict,
401-
&self.streaming_config,
402-
) {
403-
Ok((output, precompute)) => {
404-
let deserialize_duration = deserialize_start_time.elapsed();
405-
debug!(
406-
"Arroyo deserialization took: {:.2}ms",
407-
deserialize_duration.as_secs_f64() * 1000.0
408-
);
409-
debug!("Successfully deserialized Arroyo JSON message with precompute data");
410-
let total_message_duration = message_start_time.elapsed();
411-
debug!(
412-
"Total Arroyo message processing took: {:.2}ms",
413-
total_message_duration.as_secs_f64() * 1000.0
414-
);
415-
Ok(Some((output, precompute)))
416-
}
417-
Err(e) => {
418-
error!(
419-
"Error deserializing Arroyo PrecomputedOutput from JSON with precompute: {e}"
420-
);
421-
debug!("JSON content: {}", json_str);
422-
Err(e)
423-
}
424-
}
285+
let json_dict: serde_json::Value = match serde_json::from_str(&json_str) {
286+
Ok(dict) => dict,
287+
Err(e) => {
288+
error!("Error parsing Arroyo JSON: {}", e);
289+
debug!("JSON content: {}", json_str);
290+
return Err(format!("JSON parsing error: {e}").into());
291+
}
292+
};
293+
294+
let deserialize_start_time = Instant::now();
295+
match PrecomputedOutput::deserialize_from_json_arroyo(
296+
&json_dict,
297+
&self.streaming_config,
298+
) {
299+
Ok((output, precompute)) => {
300+
let deserialize_duration = deserialize_start_time.elapsed();
301+
debug!(
302+
"Arroyo deserialization took: {:.2}ms",
303+
deserialize_duration.as_secs_f64() * 1000.0
304+
);
305+
debug!(
306+
"Successfully deserialized Arroyo JSON message with precompute data"
307+
);
308+
let total_message_duration = message_start_time.elapsed();
309+
debug!(
310+
"Total Arroyo message processing took: {:.2}ms",
311+
total_message_duration.as_secs_f64() * 1000.0
312+
);
313+
Ok(Some((output, precompute)))
314+
}
315+
Err(e) => {
316+
error!(
317+
"Error deserializing Arroyo PrecomputedOutput from JSON with precompute: {e}"
318+
);
319+
debug!("JSON content: {}", json_str);
320+
Err(e)
425321
}
426322
}
427323
}

asap-query-engine/src/precompute_operators/datasketches_kll_accumulator.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,13 @@ impl DatasketchesKLLAccumulator {
4242
.decode(sketch_b64)
4343
.map_err(|e| format!("Failed to decode base64 sketch data: {e}"))?;
4444

45-
// TODO: remove this hardcoding once FlinkSketch serializes k in its output
4645
Ok(Self {
4746
inner: KllSketch::from_dsrs_bytes(&sketch_bytes, 200)?,
4847
})
4948
}
5049

5150
pub fn deserialize_from_bytes(buffer: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
5251
// Mirror Python implementation: deserialize sketch directly from bytes
53-
// TODO: remove this hardcoding once FlinkSketch serializes k in its output
5452
Ok(Self {
5553
inner: KllSketch::from_dsrs_bytes(buffer, 200)?,
5654
})

asap-query-engine/src/precompute_operators/hydra_kll_accumulator.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ impl HydraKllSketchAccumulator {
3030
}
3131

3232
pub fn deserialize_from_bytes(_buffer: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
33-
// HydraKLLSketch is only used with Arroyo, not Flink
34-
Err("deserialize_from_bytes for HydraKllSketchAccumulator not implemented for Flink".into())
33+
Err("deserialize_from_bytes for HydraKllSketchAccumulator not implemented".into())
3534
}
3635

3736
pub fn deserialize_from_bytes_arroyo(

0 commit comments

Comments
 (0)