Skip to content

Commit a2ca071

Browse files
feat(tools): run precompute engine on dedicated named tokio runtime and add per-thread CPU attribution to process monitor
1 parent 600dfa5 commit a2ca071

2 files changed

Lines changed: 93 additions & 11 deletions

File tree

asap-query-engine/src/main.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ async fn main() -> Result<()> {
196196
// check_config() already enforces the ingest source is compatible (http_remote_write or csv).
197197
let mut pe_engine_handle: Option<PrecomputeEngineHandle> = None;
198198

199-
let precompute_handle = if config.streaming_engine == StreamingEngine::Precompute {
199+
let _precompute_runtime = if config.streaming_engine == StreamingEngine::Precompute {
200200
let precompute_config = PrecomputeEngineConfig {
201201
num_workers: config.precompute_engine.num_workers,
202202
allowed_lateness_ms: config.precompute_engine.allowed_lateness_ms,
@@ -260,11 +260,18 @@ async fn main() -> Result<()> {
260260
spawn_memory_diagnostics(diag_store, Some(worker_diagnostics)).await;
261261
});
262262

263-
Some(tokio::spawn(async move {
263+
let rt = tokio::runtime::Builder::new_multi_thread()
264+
.thread_name("pc-worker")
265+
.worker_threads(config.precompute_engine.num_workers)
266+
.enable_all()
267+
.build()
268+
.expect("failed to build precompute runtime");
269+
rt.spawn(async move {
264270
if let Err(e) = pe.run().await {
265271
error!("Precompute engine error: {}", e);
266272
}
267-
}))
273+
});
274+
Some(rt)
268275
} else {
269276
let diag_store = store.clone();
270277
tokio::spawn(async move {
@@ -457,12 +464,6 @@ async fn main() -> Result<()> {
457464
let _ = handle.await;
458465
}
459466

460-
if let Some(handle) = precompute_handle {
461-
info!("Shutting down precompute engine...");
462-
handle.abort();
463-
let _ = handle.await;
464-
}
465-
466467
info!("Shutdown complete");
467468
Ok(())
468469
}

asap-tools/experiments/classes/process_monitor.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,33 @@
11
import multiprocessing
2+
import time
23
import psutil
34
import traceback
45
from typing import List, Any
56
from classes.ProcessMonitorHook import ProcessMonitorHook, ProcessMetricSnapshot
67

8+
_PRECOMPUTE_THREAD_PREFIX = "pc-worker"
9+
10+
11+
def _read_thread_cpu(pid: int) -> dict:
12+
"""
13+
Returns {tid: (thread_name, cpu_seconds)} for all threads of pid.
14+
cpu_seconds = user_time + system_time from psutil; name from /proc/[pid]/task/[tid]/comm.
15+
Silently skips threads that disappear mid-read.
16+
"""
17+
result = {}
18+
try:
19+
threads = psutil.Process(pid).threads()
20+
except (psutil.NoSuchProcess, psutil.AccessDenied):
21+
return result
22+
for t in threads:
23+
try:
24+
with open(f"/proc/{pid}/task/{t.id}/comm") as f:
25+
name = f.read().strip()
26+
result[t.id] = (name, t.user_time + t.system_time)
27+
except (FileNotFoundError, psutil.NoSuchProcess):
28+
pass
29+
return result
30+
731

832
class MyMonitor(multiprocessing.Process):
933
def __init__(
@@ -34,11 +58,20 @@ def __init__(
3458
self.pid_monitor_map[pid] = {m: [] for m in self.monitors}
3559
self.pid_monitor_map[pid]["keyword"] = keyword
3660

61+
self._prev_thread_jiffies: dict = {}
62+
self._prev_poll_monotonic: float = 0.0
63+
64+
for pid in self.pids_to_monitor:
65+
self.pid_monitor_map[pid]["precompute_cpu_percent"] = []
66+
self.pid_monitor_map[pid]["query_cpu_percent"] = []
67+
3768
def add_child_pid_to_map(self, pid, child_pid):
3869
self.pid_monitor_map[child_pid] = {m: [] for m in self.monitors}
3970
self.pid_monitor_map[child_pid]["keyword"] = self.pid_monitor_map[pid][
4071
"keyword"
4172
]
73+
self.pid_monitor_map[child_pid]["precompute_cpu_percent"] = []
74+
self.pid_monitor_map[child_pid]["query_cpu_percent"] = []
4275

4376
def init_hooks(self):
4477
"""
@@ -70,6 +103,45 @@ def close_hooks(self):
70103
hook.close()
71104
return
72105

106+
def _compute_thread_group_cpu(self, pid: int, elapsed: float):
107+
"""
108+
Reads current per-thread CPU seconds for pid, diffs against previous snapshot,
109+
and appends precompute_cpu_percent / query_cpu_percent to pid_monitor_map.
110+
111+
CPU% is on the same scale as psutil's cpu_percent: can exceed 100% on
112+
multi-core systems (e.g. 2 fully loaded cores → ~200%).
113+
"""
114+
current = _read_thread_cpu(pid)
115+
prev = self._prev_thread_jiffies.get(pid, {})
116+
117+
if not prev:
118+
self._prev_thread_jiffies[pid] = current
119+
self.pid_monitor_map[pid]["precompute_cpu_percent"].append(0.0)
120+
self.pid_monitor_map[pid]["query_cpu_percent"].append(0.0)
121+
return
122+
123+
precompute_seconds = 0.0
124+
query_seconds = 0.0
125+
126+
for tid, (name, cpu_secs) in current.items():
127+
prev_secs = prev.get(tid, (None, 0.0))[1]
128+
delta = max(0.0, cpu_secs - prev_secs)
129+
if name.startswith(_PRECOMPUTE_THREAD_PREFIX):
130+
precompute_seconds += delta
131+
else:
132+
query_seconds += delta
133+
134+
if elapsed > 0:
135+
precompute_pct = (precompute_seconds / elapsed) * 100.0
136+
query_pct = (query_seconds / elapsed) * 100.0
137+
else:
138+
precompute_pct = 0.0
139+
query_pct = 0.0
140+
141+
self.pid_monitor_map[pid]["precompute_cpu_percent"].append(precompute_pct)
142+
self.pid_monitor_map[pid]["query_cpu_percent"].append(query_pct)
143+
self._prev_thread_jiffies[pid] = current
144+
73145
def update_pid_monitor_map(self, p) -> List[ProcessMetricSnapshot]:
74146
# if p.pid not in self.pid_monitor_map:
75147
# self.pid_monitor_map[p.pid] = {m: [] for m in self.monitors}
@@ -97,18 +169,27 @@ def run(self):
97169
# of the list of hooks
98170
self.init_hooks()
99171
self.pipe.send("ready")
100-
stop = False
172+
173+
self._prev_poll_monotonic = time.monotonic()
174+
for pid in self.pids_to_monitor:
175+
self._prev_thread_jiffies[pid] = _read_thread_cpu(pid)
101176

102177
try:
103178
while True:
104-
iteration_info = [] # list of process snapshots from this iteration
179+
now = time.monotonic()
180+
elapsed = now - self._prev_poll_monotonic
181+
self._prev_poll_monotonic = now
182+
183+
iteration_info = []
105184
for pid, p in self.psutil_handles.items():
106185
iteration_info += self.update_pid_monitor_map(p)
186+
self._compute_thread_group_cpu(pid, elapsed)
107187
if self.include_children:
108188
for child in p.children(recursive=True):
109189
if child.pid not in self.pid_monitor_map:
110190
self.add_child_pid_to_map(pid, child.pid)
111191
iteration_info += self.update_pid_monitor_map(child)
192+
self._compute_thread_group_cpu(child.pid, elapsed)
112193

113194
self.update_hooks(iteration_info)
114195
stop = self.pipe.poll(self.interval)

0 commit comments

Comments
 (0)