Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions asap-planner-rs/src/planner/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ pub fn get_sql_cleanup_param(
) -> Result<u64, String> {
match cleanup_policy {
CleanupPolicy::CircularBuffer | CleanupPolicy::ReadBased => {
if t_repeat == 0 {
return Err(
"repetition_delay must be > 0 for cleanup param calculation; \
set a non-zero repetition_delay in your query group config"
.to_string(),
);
}
Ok(t_lookback.div_ceil(t_repeat))
}
CleanupPolicy::NoCleanup => {
Expand Down
23 changes: 18 additions & 5 deletions asap-query-engine/src/drivers/query/servers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ async fn handle_instant_query_post(
.and_then(|v| v.to_str().ok())
.unwrap_or("");

debug!("Content-Type: {}", content_type);
debug!("POST content-type: '{}'", content_type);

let parsed_request = if content_type.contains("application/json") {
// Handle JSON POST (Elasticsearch)
Expand Down Expand Up @@ -369,11 +369,24 @@ async fn handle_instant_query_post(
}
};

// Parse form parameters
let params: HashMap<String, String> = form_urlencoded::parse(body_str.as_bytes())
debug!(
"POST body prefix (first 200 chars): '{}'",
&body_str.chars().take(200).collect::<String>()
);

// Parse form parameters, falling back to treating the raw body as the
// SQL query if no "query" key is found (ClickHouse native HTTP style).
let mut params: HashMap<String, String> = form_urlencoded::parse(body_str.as_bytes())
.into_owned()
.collect();
debug!("Form params extracted: {:?}", params);
if !params.contains_key("query") && !body_str.is_empty() {
params.clear();
params.insert("query".to_string(), body_str.clone());
}
debug!(
"POST form params keys: {:?}",
params.keys().collect::<Vec<_>>()
);

// Use adapter to parse POST request (handles form-encoded parameters)
match state.adapter.parse_post_request(Form(params)).await {
Expand All @@ -385,7 +398,7 @@ async fn handle_instant_query_post(
req
}
Err(parse_error) => {
debug!("Failed to parse POST request: {:?}", parse_error);
debug!("POST parse failed: {}", parse_error);
return match state.adapter.format_error_response(&parse_error).await {
Ok(json) => json.into_response(),
Err(status) => status.into_response(),
Expand Down
42 changes: 41 additions & 1 deletion asap-query-engine/src/engine_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ pub fn check_config(config: &EngineConfig) -> Result<(), String> {
match (&config.ingest, &config.streaming_engine) {
(IngestConfig::Kafka { .. }, StreamingEngine::Arroyo) => {}
(
IngestConfig::HttpRemoteWrite { .. } | IngestConfig::Csv { .. },
IngestConfig::HttpRemoteWrite { .. }
| IngestConfig::Csv { .. }
| IngestConfig::Json { .. },
StreamingEngine::Precompute,
) => {}
(IngestConfig::Otlp { .. }, StreamingEngine::Arroyo) => {}
Expand Down Expand Up @@ -241,6 +243,18 @@ pub enum IngestConfig {
#[serde(default = "default_otlp_http_port")]
http_port: u16,
},
Json {
path: String,
metric_name: String,
value_col: String,
#[serde(default)]
label_cols: Vec<String>,
timestamp_col: String,
#[serde(default = "default_timestamp_unit")]
timestamp_unit: String,
#[serde(default = "default_json_batch_size")]
batch_size: usize,
},
}

impl Default for IngestConfig {
Expand Down Expand Up @@ -271,6 +285,14 @@ fn default_otlp_http_port() -> u16 {
4318
}

fn default_timestamp_unit() -> String {
"seconds".to_string()
}

fn default_json_batch_size() -> usize {
1000
}

#[derive(Debug, serde::Deserialize)]
#[serde(default)]
pub struct PrecomputeSettings {
Expand Down Expand Up @@ -435,6 +457,24 @@ output_dir: "./output"
assert!(check_config(&config).is_ok());
}

#[test]
fn check_config_valid_json_precompute() {
let yaml = r#"
streaming_engine: "precompute"
ingest:
type: "json"
path: "hits.json"
metric_name: "hits"
value_col: "ResolutionWidth"
label_cols: ["OS", "RegionID"]
timestamp_col: "EventTime"
timestamp_unit: "seconds"
output_dir: "./output"
"#;
let config: EngineConfig = Figment::new().merge(Yaml::string(yaml)).extract().unwrap();
assert!(check_config(&config).is_ok());
}

#[test]
fn check_config_valid_otlp_arroyo() {
let yaml = r#"
Expand Down
30 changes: 29 additions & 1 deletion asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use query_engine_rust::data_model::enums::{CleanupPolicy, StreamingEngine};
use query_engine_rust::drivers::AdapterConfig;
use query_engine_rust::precompute_engine::config::LateDataPolicy;
use query_engine_rust::precompute_engine::csv_ingest::{CsvFileIngestConfig, CsvFileIngestSource};
use query_engine_rust::precompute_engine::json_ingest::{
JsonFileIngestConfig, JsonFileIngestSource, TimestampUnit,
};
use query_engine_rust::precompute_engine::PrecomputeWorkerDiagnostics;
use query_engine_rust::utils::file_io::{read_inference_config, read_streaming_config};
use query_engine_rust::InferenceConfig;
Expand Down Expand Up @@ -238,13 +241,38 @@ async fn main() -> Result<()> {
batch_size: *batch_size,
}))]
}
IngestConfig::Json {
path,
metric_name,
value_col,
label_cols,
timestamp_col,
timestamp_unit,
batch_size,
} => {
let unit = timestamp_unit
.parse::<TimestampUnit>()
.map_err(|e| format!("Invalid timestamp_unit: {e}"))?;
info!("JSON file ingest mode: {}", path);
vec![Box::new(JsonFileIngestSource::new(JsonFileIngestConfig {
path: path.clone(),
metric_name: metric_name.clone(),
value_col: value_col.clone(),
label_cols: label_cols.clone(),
timestamp_col: timestamp_col.clone(),
timestamp_unit: unit,
batch_size: *batch_size,
}))]
}
IngestConfig::HttpRemoteWrite { port } => {
info!("Starting precompute engine on port {}", port);
vec![Box::new(HttpIngestSource::new(HttpIngestConfig {
port: *port,
}))]
}
_ => unreachable!("check_config enforces precompute requires http_remote_write or csv"),
_ => unreachable!(
"check_config enforces precompute requires http_remote_write, csv, or json"
),
};
let pe = PrecomputeEngine::new(
precompute_config,
Expand Down
36 changes: 36 additions & 0 deletions asap-query-engine/src/precompute_engine/ingest_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::precompute_engine::worker::{extract_metric_name, parse_labels_from_se
use arc_swap::ArcSwap;
use asap_types::aggregation_config::AggregationConfig;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, warn};

/// Everything a source needs to push decoded samples into the worker pool.
#[derive(Clone)]
Expand Down Expand Up @@ -89,15 +91,42 @@ pub(crate) async fn route_decoded_samples(

// Load agg_configs once per request (lock-free ArcSwap read).
let agg_configs = ctx.agg_configs.load();

// On first batch: log config metrics vs sample metric to diagnose mismatches.
static FIRST_BATCH_LOGGED: AtomicBool = AtomicBool::new(false);
if !FIRST_BATCH_LOGGED.swap(true, Ordering::Relaxed) {
if let Some(first) = samples.first() {
let sample_metric = extract_metric_name(&first.labels);
warn!(
sample_metric,
sample_labels = %first.labels,
num_agg_configs = agg_configs.len(),
"routing: first batch diagnostic"
);
for cfg in agg_configs.iter() {
warn!(
agg_id = cfg.aggregation_id,
config_metric = %cfg.metric,
config_spatial_filter = %cfg.spatial_filter,
table_name = ?cfg.table_name,
"routing: agg config metric"
);
}
}
}

let mut matched_samples: usize = 0;
for s in &samples {
let metric_name = extract_metric_name(&s.labels);
for config in agg_configs.iter() {
if config.metric != metric_name
&& config.spatial_filter_normalized != metric_name
&& config.spatial_filter != metric_name
&& config.table_name.as_deref() != Some(metric_name)
{
continue;
}
matched_samples += 1;
let group_key = extract_group_key(&s.labels, config);
by_group
.entry((config.aggregation_id, group_key))
Expand All @@ -106,6 +135,13 @@ pub(crate) async fn route_decoded_samples(
}
}

debug!(
total_samples = samples.len(),
matched_samples,
groups_formed = by_group.len(),
"routing: batch match summary"
);

let messages: Vec<WorkerMessage> = by_group
.into_iter()
.map(
Expand Down
Loading
Loading