Skip to content

Commit 8af309b

Browse files
zzylolclaude
andcommitted
fix: resolve build errors in precompute engine (E0432, E0609, E0061) and apply fmt
- Restore decode_prometheus_remote_write and related types that were fully commented out in prometheus_remote_write.rs - Fix CountMinSketchAccumulator field access via .inner. in accumulator_factory - Fix AggregationConfig::new call in worker test (missing slide_interval arg, wrong types for spatial_filter/metric, extra trailing None) - Apply cargo fmt across precompute engine files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 3239648 commit 8af309b

9 files changed

Lines changed: 204 additions & 231 deletions

File tree

asap-query-engine/src/bin/precompute_engine.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use clap::Parser;
2+
use query_engine_rust::data_model::QueryLanguage;
23
use query_engine_rust::data_model::{
34
CleanupPolicy, InferenceConfig, LockStrategy, StreamingConfig,
45
};
@@ -9,7 +10,6 @@ use query_engine_rust::precompute_engine::output_sink::{RawPassthroughSink, Stor
910
use query_engine_rust::precompute_engine::PrecomputeEngine;
1011
use query_engine_rust::stores::SimpleMapStore;
1112
use query_engine_rust::{HttpServer, HttpServerConfig};
12-
use query_engine_rust::data_model::QueryLanguage;
1313
use std::sync::Arc;
1414
use tracing::info;
1515
use tracing_subscriber::fmt::format::FmtSpan;
@@ -85,20 +85,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8585
);
8686

8787
// Create the store
88-
let store: Arc<dyn query_engine_rust::stores::Store> = Arc::new(
89-
SimpleMapStore::new_with_strategy(
88+
let store: Arc<dyn query_engine_rust::stores::Store> =
89+
Arc::new(SimpleMapStore::new_with_strategy(
9090
streaming_config.clone(),
9191
CleanupPolicy::CircularBuffer,
9292
args.lock_strategy,
93-
),
94-
);
93+
));
9594

9695
// Optionally start the query HTTP server
9796
if args.query_port > 0 {
98-
let inference_config = InferenceConfig::new(
99-
QueryLanguage::promql,
100-
CleanupPolicy::CircularBuffer,
101-
);
97+
let inference_config =
98+
InferenceConfig::new(QueryLanguage::promql, CleanupPolicy::CircularBuffer);
10299
let query_engine = Arc::new(SimpleEngine::new(
103100
store.clone(),
104101
inference_config,

asap-query-engine/src/bin/test_e2e_precompute.rs

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
7878
.init();
7979

8080
// Load configs the same way main.rs does
81-
let inference_config =
82-
read_inference_config("examples/promql/inference_config.yaml", QueryLanguage::promql)?;
81+
let inference_config = read_inference_config(
82+
"examples/promql/inference_config.yaml",
83+
QueryLanguage::promql,
84+
)?;
8385
println!(
8486
"Loaded inference config with {} query configs",
8587
inference_config.query_configs.len()
@@ -101,13 +103,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
101103
println!("\n=== Starting precompute engine (ingest={INGEST_PORT}, query={QUERY_PORT}) ===");
102104

103105
// Create store
104-
let store: Arc<dyn query_engine_rust::stores::Store> = Arc::new(
105-
SimpleMapStore::new_with_strategy(
106+
let store: Arc<dyn query_engine_rust::stores::Store> =
107+
Arc::new(SimpleMapStore::new_with_strategy(
106108
streaming_config.clone(),
107109
cleanup_policy,
108110
LockStrategy::PerKey,
109-
),
110-
);
111+
));
111112

112113
// Start query server
113114
let query_engine = Arc::new(SimpleEngine::new(
@@ -207,9 +208,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
207208

208209
// Use the exact query pattern from inference_config
209210
let queries_instant = vec![
210-
("quantile by (label_0) (0.99, fake_metric)", "10", "Configured query at t=10"),
211-
("quantile by (label_0) (0.99, fake_metric)", "15", "Configured query at t=15"),
212-
("sum_over_time(fake_metric[1s])", "10", "Temporal: sum_over_time at t=10"),
211+
(
212+
"quantile by (label_0) (0.99, fake_metric)",
213+
"10",
214+
"Configured query at t=10",
215+
),
216+
(
217+
"quantile by (label_0) (0.99, fake_metric)",
218+
"15",
219+
"Configured query at t=15",
220+
),
221+
(
222+
"sum_over_time(fake_metric[1s])",
223+
"10",
224+
"Temporal: sum_over_time at t=10",
225+
),
213226
("sum(fake_metric)", "10", "Spatial: sum at t=10"),
214227
];
215228

@@ -272,11 +285,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
272285
raw_mode_aggregation_id: raw_agg_id,
273286
};
274287
let raw_sink = Arc::new(RawPassthroughSink::new(store.clone()));
275-
let raw_engine = PrecomputeEngine::new(
276-
raw_engine_config,
277-
streaming_config.clone(),
278-
raw_sink,
279-
);
288+
let raw_engine = PrecomputeEngine::new(raw_engine_config, streaming_config.clone(), raw_sink);
280289
tokio::spawn(async move {
281290
if let Err(e) = raw_engine.run().await {
282291
eprintln!("Raw precompute engine error: {e}");
@@ -299,20 +308,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
299308
.body(body)
300309
.send()
301310
.await?;
302-
println!(" Sent raw t={ts}ms v={val} -> HTTP {}", resp.status().as_u16());
311+
println!(
312+
" Sent raw t={ts}ms v={val} -> HTTP {}",
313+
resp.status().as_u16()
314+
);
303315
}
304316

305317
// Short wait for processing (no watermark advancement needed)
306318
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
307319

308320
// Verify raw samples appeared in the store
309321
println!("\n=== Verifying raw samples in store ===");
310-
let results = store.query_precomputed_output(
311-
"fake_metric",
312-
raw_agg_id,
313-
100_000,
314-
103_000,
315-
)?;
322+
let results = store.query_precomputed_output("fake_metric", raw_agg_id, 100_000, 103_000)?;
316323
let total_buckets: usize = results.values().map(|v| v.len()).sum();
317324
println!(" Found {total_buckets} buckets for aggregation_id={raw_agg_id} in [100000, 103000)");
318325
assert!(

0 commit comments

Comments
 (0)