diff --git a/Cargo.toml b/Cargo.toml index c5b0afe8..6698e11c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,3 +38,6 @@ datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_su elastic_dsl_utilities = { path = "asap-common/dependencies/rs/elastic_dsl_utilities" } asap_planner = { path = "asap-planner-rs" } indexmap = { version = "2.0", features = ["serde"] } + +[profile.release] +debug = 1 # line table + symbol names for flamegraph; no effect on codegen or runtime perf diff --git a/asap-query-engine/src/engine_config.rs b/asap-query-engine/src/engine_config.rs index e39be190..80bd2a21 100644 --- a/asap-query-engine/src/engine_config.rs +++ b/asap-query-engine/src/engine_config.rs @@ -47,7 +47,6 @@ pub struct EngineConfig { pub log_level: String, pub prometheus_scrape_interval: u64, pub streaming_engine: StreamingEngine, - pub do_profiling: bool, pub http_server: HttpServerSettings, pub backend: BackendConfig, pub store: StoreSettings, @@ -66,7 +65,6 @@ impl Default for EngineConfig { log_level: "INFO".to_string(), prometheus_scrape_interval: 15, streaming_engine: StreamingEngine::Precompute, - do_profiling: false, http_server: HttpServerSettings::default(), backend: BackendConfig::default(), store: StoreSettings::default(), diff --git a/asap-tools/experiments/config/config.yaml b/asap-tools/experiments/config/config.yaml index 8b6894be..a62495a6 100644 --- a/asap-tools/experiments/config/config.yaml +++ b/asap-tools/experiments/config/config.yaml @@ -18,7 +18,7 @@ logging: # Profiling options profiling: - query_engine: false + query_engine: false # Rust query engine only; requires use_container.query_engine: false prometheus_time: null # Optional[int] flink: false arroyo: false diff --git a/asap-tools/experiments/experiment_utils/config.py b/asap-tools/experiments/experiment_utils/config.py index 69b01340..eda2df43 100644 --- a/asap-tools/experiments/experiment_utils/config.py +++ b/asap-tools/experiments/experiment_utils/config.py @@ -521,6 +521,18 @@ def validate_config(cfg: DictConfig, script_name: str = "experiment_run_e2e"): "--no_teardown can only be used with a single experiment mode" ) + # Profiling the Rust query engine requires bare-metal mode (debug symbols unavailable in container) + if ( + hasattr(cfg, "profiling") + and cfg.profiling.get("query_engine", False) + and hasattr(cfg, "use_container") + and cfg.use_container.get("query_engine", True) + ): + raise ValueError( + "profiling.query_engine=true requires use_container.query_engine=false. " + "Container builds discard debug symbols, making flamegraph output unreadable." + ) + # Validate aggregate cleanup policy valid_policies = ["circular_buffer", "read_based", "no_cleanup"] if hasattr(cfg, "aggregate_cleanup") and hasattr(cfg.aggregate_cleanup, "policy"): diff --git a/asap-tools/experiments/experiment_utils/services/query_engine.py b/asap-tools/experiments/experiment_utils/services/query_engine.py index fe5e3b6a..9e97ad02 100644 --- a/asap-tools/experiments/experiment_utils/services/query_engine.py +++ b/asap-tools/experiments/experiment_utils/services/query_engine.py @@ -106,7 +106,6 @@ def _build_engine_config( should match streaming.remote_write.base_port in the Hydra config dump_precomputes: Whether to dump received precomputes to output_dir for debugging lock_strategy: Lock strategy for SimpleMapStore ('global' or 'per-key') - profile_query_engine: Whether to enable do_profiling in the engine kafka_broker: Kafka broker address, e.g. '10.10.1.1:9092' (arroyo only) Returns: @@ -138,7 +137,6 @@ def _build_engine_config( "log_level": log_level, "prometheus_scrape_interval": prometheus_scrape_interval, "streaming_engine": streaming_engine, - "do_profiling": profile_query_engine, "http_server": {"port": http_port}, "backend": backend, # already fully resolved by caller "store": {"lock_strategy": lock_strategy}, diff --git a/asap-tools/experiments/remote_monitor.py b/asap-tools/experiments/remote_monitor.py index 1cd8c5a7..ef9582c4 100644 --- a/asap-tools/experiments/remote_monitor.py +++ b/asap-tools/experiments/remote_monitor.py @@ -162,6 +162,56 @@ def stop_profiling_arroyo_pids( logger.debug("Stopped profiling for arroyo pids") +def start_profiling_query_engine_pids(qe_pids, experiment_output_dir): + qe_perf_procs = [] + qe_profiles_dir = os.path.join(experiment_output_dir, "query_engine_profiles") + os.makedirs(qe_profiles_dir, exist_ok=True) + + for pid in qe_pids: + output_file = os.path.join(qe_profiles_dir, f"perf_{pid}.data") + cmd = [ + "perf", + "record", + "-g", + "--call-graph", + "dwarf", + "-F", + "997", + "-o", + output_file, + "--pid", + str(pid), + ] + logger.debug(f"Starting perf record for PID {pid} with command: {cmd}") + proc = subprocess.Popen(cmd) + qe_perf_procs.append(proc) + + logger.debug( + f"Started perf record processes with PIDs: {[p.pid for p in qe_perf_procs]}" + ) + return qe_perf_procs + + +def stop_profiling_query_engine_pids(qe_perf_procs, store: bool): + for proc in qe_perf_procs: + try: + os.kill(proc.pid, signal.SIGTERM) + logger.debug(f"Stopped perf record process PID: {proc.pid}") + except ProcessLookupError: + logger.debug(f"Perf record process PID {proc.pid} already terminated") + for proc in qe_perf_procs: + try: + proc.wait(timeout=60) + logger.debug( + f"Perf record process PID {proc.pid} exited with code {proc.returncode}" + ) + except subprocess.TimeoutExpired: + logger.debug( + f"Perf record process PID {proc.pid} did not terminate within 60s" + ) + logger.debug("Stopped profiling for query engine pids") + + # TODO Provide some way of specifying which hooks will be used def get_process_monitor_hooks( export_cost: bool, provider, node_offset: int @@ -233,14 +283,23 @@ def main(args): logger.error("No matching processes found.") return - profile_query_engine_pid = None + profile_query_engine_pid = ( + None # unused for Rust QE; kept for PrometheusClientService compat + ) + qe_flamegraph_procs = None if args.profile_query_engine: - if ( - constants.QUERY_ENGINE_RS_PROCESS_KEYWORD in args.keywords - or constants.QUERY_ENGINE_RS_CONTAINER_NAME in args.keywords - ): - raise NotImplementedError( - "Profiling for Rust query engine is not implemented yet" + if constants.QUERY_ENGINE_RS_CONTAINER_NAME in args.keywords: + raise ValueError( + "Rust query engine profiling requires bare-metal mode. " + "Set use_container.query_engine: false in config." + ) + if constants.QUERY_ENGINE_RS_PROCESS_KEYWORD in args.keywords: + qe_pids = get_pids(constants.QUERY_ENGINE_RS_PROCESS_KEYWORD) + stop_profiling_query_engine_pids( + [], store=False + ) # clear any stale profilers + qe_flamegraph_procs = start_profiling_query_engine_pids( + qe_pids, args.experiment_output_dir ) logger.debug("Starting process monitors") @@ -345,6 +404,10 @@ def main(args): arroyo_flamegraph_pids, args.experiment_output_dir, store=True ) + if qe_flamegraph_procs: + logger.debug("Stopping profiling for query engine pids") + stop_profiling_query_engine_pids(qe_flamegraph_procs, store=True) + logger.debug("Stopping process monitors") monitor_info = process_monitor.stop_monitor(monitor, control_pipe, monitor_pipe) diff --git a/asap-tools/installation/flamegraph/install.sh b/asap-tools/installation/flamegraph/install.sh new file mode 100644 index 00000000..f17f2229 --- /dev/null +++ b/asap-tools/installation/flamegraph/install.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +cargo install flamegraph diff --git a/asap-tools/installation/flamegraph/setup_dependencies.sh b/asap-tools/installation/flamegraph/setup_dependencies.sh new file mode 100644 index 00000000..8957637e --- /dev/null +++ b/asap-tools/installation/flamegraph/setup_dependencies.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +sudo apt-get install -y linux-tools-common "linux-tools-$(uname -r)" +sudo sh -c 'echo -1 > /proc/sys/kernel/perf_event_paranoid' diff --git a/asap-tools/installation/install_external_components.sh b/asap-tools/installation/install_external_components.sh index 5713435f..151239e3 100755 --- a/asap-tools/installation/install_external_components.sh +++ b/asap-tools/installation/install_external_components.sh @@ -1,7 +1,7 @@ #!/bin/bash # PREDEFINED_COMPONENTS=("benchmarks" "exporters" "flink" "grafana" "kafka" "prometheus" "prometheus_kafka_adapter" "asprof") -PREDEFINED_COMPONENTS=("benchmarks" "exporters" "flink" "grafana" "kafka" "prometheus" "asprof" "arroyo") +PREDEFINED_COMPONENTS=("benchmarks" "exporters" "flink" "grafana" "kafka" "prometheus" "asprof" "arroyo" "flamegraph") if [ "$#" -lt 2 ]; then echo "Usage: $0 [ ...]"