diff --git a/asap-query-engine/src/main.rs b/asap-query-engine/src/main.rs index af0134e7..c29ca2a8 100644 --- a/asap-query-engine/src/main.rs +++ b/asap-query-engine/src/main.rs @@ -196,7 +196,7 @@ async fn main() -> Result<()> { // check_config() already enforces the ingest source is compatible (http_remote_write or csv). let mut pe_engine_handle: Option = None; - let precompute_handle = if config.streaming_engine == StreamingEngine::Precompute { + let _precompute_runtime = if config.streaming_engine == StreamingEngine::Precompute { let precompute_config = PrecomputeEngineConfig { num_workers: config.precompute_engine.num_workers, allowed_lateness_ms: config.precompute_engine.allowed_lateness_ms, @@ -260,11 +260,18 @@ async fn main() -> Result<()> { spawn_memory_diagnostics(diag_store, Some(worker_diagnostics)).await; }); - Some(tokio::spawn(async move { + let rt = tokio::runtime::Builder::new_multi_thread() + .thread_name("pc-worker") + .worker_threads(config.precompute_engine.num_workers) + .enable_all() + .build() + .expect("failed to build precompute runtime"); + rt.spawn(async move { if let Err(e) = pe.run().await { error!("Precompute engine error: {}", e); } - })) + }); + Some(rt) } else { let diag_store = store.clone(); tokio::spawn(async move { @@ -457,12 +464,6 @@ async fn main() -> Result<()> { let _ = handle.await; } - if let Some(handle) = precompute_handle { - info!("Shutting down precompute engine..."); - handle.abort(); - let _ = handle.await; - } - info!("Shutdown complete"); Ok(()) } diff --git a/asap-tools/experiments/classes/process_monitor.py b/asap-tools/experiments/classes/process_monitor.py index 228c8779..8cea4124 100644 --- a/asap-tools/experiments/classes/process_monitor.py +++ b/asap-tools/experiments/classes/process_monitor.py @@ -1,9 +1,33 @@ import multiprocessing +import time import psutil import traceback -from typing import List, Any +from typing import List, Any, Optional from classes.ProcessMonitorHook import ProcessMonitorHook, ProcessMetricSnapshot +_PRECOMPUTE_THREAD_PREFIX = "pc-worker" + + +def _read_thread_cpu(pid: int) -> dict: + """ + Returns {tid: (thread_name, cpu_seconds)} for all threads of pid. + cpu_seconds = user_time + system_time from psutil; name from /proc/[pid]/task/[tid]/comm. + Silently skips threads that disappear mid-read. + """ + result = {} + try: + threads = psutil.Process(pid).threads() + except (psutil.NoSuchProcess, psutil.AccessDenied): + return result + for t in threads: + try: + with open(f"/proc/{pid}/task/{t.id}/comm") as f: + name = f.read().strip() + result[t.id] = (name, t.user_time + t.system_time) + except (FileNotFoundError, psutil.NoSuchProcess): + pass + return result + class MyMonitor(multiprocessing.Process): def __init__( @@ -15,6 +39,7 @@ def __init__( monitors, hooks: List[ProcessMonitorHook], include_children=False, + thread_attribution_keyword: Optional[str] = None, ): super(MyMonitor, self).__init__() self.pids_to_monitor = pids_to_monitor @@ -24,6 +49,7 @@ def __init__( self.monitors = monitors self.hooks = hooks self.include_children = include_children + self.thread_attribution_keyword = thread_attribution_keyword assert len(self.pids_to_monitor) == len(self.keywords) @@ -34,11 +60,24 @@ def __init__( self.pid_monitor_map[pid] = {m: [] for m in self.monitors} self.pid_monitor_map[pid]["keyword"] = keyword + if self.thread_attribution_keyword is not None: + self._prev_thread_jiffies: dict = {} + self._prev_poll_monotonic: float = 0.0 + for pid, keyword in zip(self.pids_to_monitor, self.keywords): + if keyword == self.thread_attribution_keyword: + self.pid_monitor_map[pid]["precompute_cpu_percent"] = [] + self.pid_monitor_map[pid]["query_cpu_percent"] = [] + def add_child_pid_to_map(self, pid, child_pid): self.pid_monitor_map[child_pid] = {m: [] for m in self.monitors} - self.pid_monitor_map[child_pid]["keyword"] = self.pid_monitor_map[pid][ - "keyword" - ] + keyword = self.pid_monitor_map[pid]["keyword"] + self.pid_monitor_map[child_pid]["keyword"] = keyword + if ( + self.thread_attribution_keyword is not None + and keyword == self.thread_attribution_keyword + ): + self.pid_monitor_map[child_pid]["precompute_cpu_percent"] = [] + self.pid_monitor_map[child_pid]["query_cpu_percent"] = [] def init_hooks(self): """ @@ -70,6 +109,45 @@ def close_hooks(self): hook.close() return + def _compute_thread_group_cpu(self, pid: int, elapsed: float): + """ + Reads current per-thread CPU seconds for pid, diffs against previous snapshot, + and appends precompute_cpu_percent / query_cpu_percent to pid_monitor_map. + + CPU% is on the same scale as psutil's cpu_percent: can exceed 100% on + multi-core systems (e.g. 2 fully loaded cores → ~200%). + """ + current = _read_thread_cpu(pid) + prev = self._prev_thread_jiffies.get(pid, {}) + + if not prev: + self._prev_thread_jiffies[pid] = current + self.pid_monitor_map[pid]["precompute_cpu_percent"].append(0.0) + self.pid_monitor_map[pid]["query_cpu_percent"].append(0.0) + return + + precompute_seconds = 0.0 + query_seconds = 0.0 + + for tid, (name, cpu_secs) in current.items(): + prev_secs = prev.get(tid, (None, 0.0))[1] + delta = max(0.0, cpu_secs - prev_secs) + if name.startswith(_PRECOMPUTE_THREAD_PREFIX): + precompute_seconds += delta + else: + query_seconds += delta + + if elapsed > 0: + precompute_pct = (precompute_seconds / elapsed) * 100.0 + query_pct = (query_seconds / elapsed) * 100.0 + else: + precompute_pct = 0.0 + query_pct = 0.0 + + self.pid_monitor_map[pid]["precompute_cpu_percent"].append(precompute_pct) + self.pid_monitor_map[pid]["query_cpu_percent"].append(query_pct) + self._prev_thread_jiffies[pid] = current + def update_pid_monitor_map(self, p) -> List[ProcessMetricSnapshot]: # if p.pid not in self.pid_monitor_map: # self.pid_monitor_map[p.pid] = {m: [] for m in self.monitors} @@ -97,18 +175,40 @@ def run(self): # of the list of hooks self.init_hooks() self.pipe.send("ready") - stop = False + + if self.thread_attribution_keyword is not None: + self._prev_poll_monotonic = time.monotonic() + for pid, keyword in zip(self.pids_to_monitor, self.keywords): + if keyword == self.thread_attribution_keyword: + self._prev_thread_jiffies[pid] = _read_thread_cpu(pid) try: while True: - iteration_info = [] # list of process snapshots from this iteration + if self.thread_attribution_keyword is not None: + now = time.monotonic() + elapsed = now - self._prev_poll_monotonic + self._prev_poll_monotonic = now + + iteration_info = [] for pid, p in self.psutil_handles.items(): iteration_info += self.update_pid_monitor_map(p) + if ( + self.thread_attribution_keyword is not None + and self.pid_monitor_map[pid]["keyword"] + == self.thread_attribution_keyword + ): + self._compute_thread_group_cpu(pid, elapsed) if self.include_children: for child in p.children(recursive=True): if child.pid not in self.pid_monitor_map: self.add_child_pid_to_map(pid, child.pid) iteration_info += self.update_pid_monitor_map(child) + if ( + self.thread_attribution_keyword is not None + and self.pid_monitor_map[child.pid]["keyword"] + == self.thread_attribution_keyword + ): + self._compute_thread_group_cpu(child.pid, elapsed) self.update_hooks(iteration_info) stop = self.pipe.poll(self.interval) @@ -132,6 +232,7 @@ def start_monitor( monitor_metrics, include_children, hooks: List[ProcessMonitorHook], + thread_attribution_keyword: Optional[str] = None, ): control_pipe, monitor_pipe = multiprocessing.Pipe() monitor = MyMonitor( @@ -142,6 +243,7 @@ def start_monitor( monitor_metrics, hooks, include_children=include_children, + thread_attribution_keyword=thread_attribution_keyword, ) monitor.start() control_pipe.recv() diff --git a/asap-tools/experiments/experiment_only_ingest_path.py b/asap-tools/experiments/experiment_only_ingest_path.py index de424ff2..eb0c9b05 100644 --- a/asap-tools/experiments/experiment_only_ingest_path.py +++ b/asap-tools/experiments/experiment_only_ingest_path.py @@ -387,6 +387,7 @@ def main(cfg: DictConfig): local_experiment_dir, experiment_duration, is_v2, + args.streaming_engine, ) print("-" * 60) @@ -457,6 +458,7 @@ def start_resource_monitoring( local_experiment_dir: str, duration: int, is_v2: bool, + streaming_engine: str, ): """ Start resource monitoring using remote_monitor.py in timed mode. @@ -520,6 +522,7 @@ def start_resource_monitoring( "--monitor_output_file monitor_output.json " f"--time_to_run {duration} " f"--node_offset {node_offset} " + f"--streaming_engine {streaming_engine} " ) cmd_dir = os.path.join(provider.get_home_dir(), "code", "asap-tools", "experiments") diff --git a/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py b/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py index bb07d44f..773f1cbe 100644 --- a/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py +++ b/asap-tools/experiments/experiment_utils/services/remote_monitor_service.py @@ -105,6 +105,7 @@ def start( "--monitor_output_file {} " "--time_to_run {} " "--node_offset {} " + "--streaming_engine {} " ).format( experiment_mode, ",".join(keywords), @@ -117,6 +118,7 @@ def start( "monitor_output.json", timed_duration, self.node_offset, + streaming_engine, ) cmd_dir = os.path.join( @@ -176,6 +178,7 @@ def start( "--monitor_output_file {} " "--prometheus_client_output_file {} " "--node_offset {} " + "--streaming_engine {} " ).format( experiment_mode, ",".join(keywords), @@ -188,6 +191,7 @@ def start( "monitor_output.json", "prometheus_client_output.txt", self.node_offset, + streaming_engine, ) # Add container flag if enabled diff --git a/asap-tools/experiments/post_experiment/analyze_monitor_output.py b/asap-tools/experiments/post_experiment/analyze_monitor_output.py index 661da775..d81e29ad 100755 --- a/asap-tools/experiments/post_experiment/analyze_monitor_output.py +++ b/asap-tools/experiments/post_experiment/analyze_monitor_output.py @@ -61,17 +61,34 @@ def plot_resource_usage(data, file_path, args): keyword_to_pids[keyword].append(pid) get_line_style_for_keyword(keyword, keyword_to_style) - # Create plots for each resource type + # Create plots for each resource type; add thread-attributed CPU if present resources = [ ("cpu_percent", "CPU Usage (%)", "cpu"), ("memory_info", "Memory Usage (MB)", "memory"), ] + if any("precompute_cpu_percent" in pid_info for pid_info in data.values()): + resources.append( + ( + "precompute_cpu_percent", + "Precompute CPU Usage (%) [pc-worker threads]", + "precompute_cpu", + ) + ) + resources.append( + ( + "query_cpu_percent", + "Query CPU Usage (%) [non-precompute threads]", + "query_cpu", + ) + ) for resource_key, resource_label, resource_name in resources: plt.figure(figsize=(20, 8)) # Plot data for each PID for pid, pid_info in data.items(): + if resource_key not in pid_info: + continue keyword = pid_info["keyword"] line_style = keyword_to_style[keyword] @@ -144,6 +161,10 @@ def analyze_monitor_output(file_path: str, args=None): keyword_data[keyword] = {"cpu_percent": [], "memory_info": []} keyword_data[keyword]["cpu_percent"].append(pid_info["cpu_percent"]) keyword_data[keyword]["memory_info"].append(pid_info["memory_info"]) + for thread_field in ("precompute_cpu_percent", "query_cpu_percent"): + if thread_field in pid_info: + keyword_data[keyword].setdefault(thread_field, []) + keyword_data[keyword][thread_field].append(pid_info[thread_field]) # Skip printing if --print not specified if not args or not args.print: @@ -220,6 +241,26 @@ def analyze_monitor_output(file_path: str, args=None): else: print(f" All values: {mem_sum_mb}") + # Thread-attributed CPU stats (only present for precompute engine PIDs) + for thread_field, label in [ + ("precompute_cpu_percent", "Precompute CPU (pc-worker threads)"), + ("query_cpu_percent", "Query CPU (non-precompute threads)"), + ]: + if thread_field not in metrics: + continue + arrays = [np.array(a) for a in metrics[thread_field]] + max_len = max(len(a) for a in arrays) + padded = np.zeros((len(arrays), max_len)) + for i, a in enumerate(arrays): + padded[i, : len(a)] = a + summed = np.sum(padded, axis=0) + + print(f"\n{label} (sum across {len(arrays)} PIDs):") + print(f" Median: {np.median(summed):.2f}%") + print(f" P95: {np.percentile(summed, 95):.2f}%") + print(f" P99: {np.percentile(summed, 99):.2f}%") + print(f" Max: {np.max(summed):.2f}%") + # Optional: Print full time series to a separate file # output_dir = Path(file_path).parent # keyword_clean = keyword.replace('/', '_').replace('\\', '_') diff --git a/asap-tools/experiments/post_experiment/compare_costs.py b/asap-tools/experiments/post_experiment/compare_costs.py index a6d487bf..287cc489 100644 --- a/asap-tools/experiments/post_experiment/compare_costs.py +++ b/asap-tools/experiments/post_experiment/compare_costs.py @@ -44,14 +44,17 @@ def calculate_query_cpu(monitor_info, experiment_mode): pids = [pid for pid in monitor_info.keys() if pid != "all"] if experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME: - # Sum CPU across all PIDs except those with keyword="prometheus" - query_cpu = [0 for _ in range(len(monitor_info[pids[0]]["cpu_percent"]))] + # Use query_cpu_percent (thread-attributed) when available; fall back to cpu_percent. + query_cpu = [0.0 for _ in range(len(monitor_info[pids[0]]["cpu_percent"]))] for pid in pids: keyword = monitor_info[pid]["keyword"] if keyword != PROMETHEUS_PROCESS_KEYWORD: - for i in range(len(monitor_info[pid]["cpu_percent"])): - query_cpu[i] += monitor_info[pid]["cpu_percent"][i] + source = monitor_info[pid].get( + "query_cpu_percent", monitor_info[pid]["cpu_percent"] + ) + for i in range(len(source)): + query_cpu[i] += source[i] return query_cpu @@ -80,6 +83,24 @@ def calculate_query_cpu(monitor_info, experiment_mode): ) +def calculate_precompute_cpu(monitor_info): + """ + Sum precompute_cpu_percent across all PIDs that have thread attribution data. + Returns None if no thread attribution data is present. + """ + pids = [pid for pid in monitor_info.keys() if pid != "all"] + precompute_cpu = None + for pid in pids: + if "precompute_cpu_percent" in monitor_info[pid]: + if precompute_cpu is None: + precompute_cpu = [0.0] * len( + monitor_info[pid]["precompute_cpu_percent"] + ) + for i, v in enumerate(monitor_info[pid]["precompute_cpu_percent"]): + precompute_cpu[i] += v + return precompute_cpu + + def plot_resource_usage(monitor_info, experiment_mode, args): """ Plot raw resource usage data for each resource type. @@ -265,6 +286,7 @@ def main(args): experiment_mode_to_overall_resource_usage = {} experiment_mode_to_query_cpu = {} + experiment_mode_to_precompute_cpu = {} for experiment_mode in experiment_modes: if not args.machine_readable: @@ -330,6 +352,11 @@ def main(args): if not args.machine_readable: print(f"Skipping Query CPU calculation for {experiment_mode}: {e}") + # Calculate Precompute CPU (thread-attributed, sketchdb only) + precompute_cpu = calculate_precompute_cpu(monitor_info) + if precompute_cpu is not None: + experiment_mode_to_precompute_cpu[experiment_mode] = precompute_cpu + # Initialize mode data for machine-readable output if args.machine_readable: machine_readable_output["experiment_modes"][experiment_mode] = { @@ -456,6 +483,31 @@ def main(args): if args.machine_readable: machine_readable_output["query_cpu_benefit"] = query_cpu_benefit + # Handle Precompute CPU statistics + if experiment_mode_to_precompute_cpu: + if not args.machine_readable and args.print: + print("\n" + "=" * 60) + print("Precompute CPU Statistics (pc-worker threads)") + print("=" * 60) + + precompute_cpu_stats = {} + for ( + experiment_mode, + precompute_cpu, + ) in experiment_mode_to_precompute_cpu.items(): + precompute_cpu_stats[experiment_mode] = {} + if not args.machine_readable and args.print: + print(f"\n{experiment_mode}:") + + for stat, agg_func in relevant_stats.items(): + value = agg_func(precompute_cpu) + precompute_cpu_stats[experiment_mode][stat] = value + if not args.machine_readable and args.print: + print(f" {stat}: {round(value, 2)}%") + + if args.machine_readable: + machine_readable_output["precompute_cpu"] = precompute_cpu_stats + # Output machine-readable results if args.machine_readable: print(json.dumps(machine_readable_output, indent=2)) diff --git a/asap-tools/experiments/remote_monitor.py b/asap-tools/experiments/remote_monitor.py index 34a5c2c4..1cd8c5a7 100644 --- a/asap-tools/experiments/remote_monitor.py +++ b/asap-tools/experiments/remote_monitor.py @@ -270,6 +270,12 @@ def main(args): ["memory_info", "cpu_percent"], include_children=True, hooks=monitor_hooks, + thread_attribution_keyword=( + constants.QUERY_ENGINE_RS_CONTAINER_NAME + if args.streaming_engine == "precompute" + and args.experiment_mode == constants.SKETCHDB_EXPERIMENT_NAME + else None + ), ) if args.profile_flink_pids: @@ -417,6 +423,12 @@ def main(args): type=int, required=True, ) + parser.add_argument( + "--streaming_engine", + type=str, + required=True, + help="Streaming engine type (e.g. precompute, arroyo)", + ) args = parser.parse_args() args.keywords = args.keywords.strip().split(",") if args.profile_flink_pids: