Skip to content

Commit fed9eb5

Browse files
committed
Add process management and shell MCP tools
ac_launch_process spawns a detached subprocess from an argv list (no shell expansion); ac_shell runs a command line via shlex.split and reports exit_code/stdout/stderr; ac_list_processes and ac_kill_process wrap psutil for inspection and cleanup. Working directories are validated against os.path.realpath; missing psutil returns a clear runtime error rather than ImportError.
1 parent c83d250 commit fed9eb5

3 files changed

Lines changed: 196 additions & 1 deletion

File tree

je_auto_control/utils/mcp_server/tools/_factories.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,57 @@ def trigger_tools() -> List[MCPTool]:
751751
]
752752

753753

754+
def process_and_shell_tools() -> List[MCPTool]:
755+
return [
756+
MCPTool(
757+
name="ac_launch_process",
758+
description=("Spawn a subprocess with the given argv list "
759+
"(detached, stdio piped to /dev/null). Returns "
760+
"{pid, argv}. Optional working_directory."),
761+
input_schema=schema({
762+
"argv": {"type": "array", "items": {"type": "string"}},
763+
"working_directory": {"type": "string"},
764+
}, required=["argv"]),
765+
handler=h.launch_process,
766+
annotations=DESTRUCTIVE,
767+
),
768+
MCPTool(
769+
name="ac_list_processes",
770+
description=("List running processes (psutil required). "
771+
"Optionally filter by case-insensitive substring."),
772+
input_schema=schema({
773+
"name_contains": {"type": "string"},
774+
}),
775+
handler=h.list_processes,
776+
annotations=READ_ONLY,
777+
),
778+
MCPTool(
779+
name="ac_kill_process",
780+
description=("Terminate a PID gracefully, escalating to "
781+
"SIGKILL after ``timeout``. Returns 'terminated' "
782+
"/ 'killed' / 'not-found'. psutil required."),
783+
input_schema=schema({
784+
"pid": {"type": "integer"},
785+
"timeout": {"type": "number"},
786+
}, required=["pid"]),
787+
handler=h.kill_process,
788+
annotations=DESTRUCTIVE,
789+
),
790+
MCPTool(
791+
name="ac_shell",
792+
description=("Run a shell-style command line via shlex.split "
793+
"(NO shell expansion). Returns {exit_code, "
794+
"stdout, stderr}."),
795+
input_schema=schema({
796+
"command": {"type": "string"},
797+
"timeout": {"type": "number"},
798+
}, required=["command"]),
799+
handler=h.shell_command,
800+
annotations=DESTRUCTIVE,
801+
),
802+
]
803+
804+
754805
def hotkey_tools() -> List[MCPTool]:
755806
return [
756807
MCPTool(
@@ -802,5 +853,5 @@ def hotkey_tools() -> List[MCPTool]:
802853
mouse_tools, keyboard_tools, screen_tools, image_and_ocr_tools,
803854
window_tools, system_tools, recording_tools, drag_and_send_tools,
804855
semantic_locator_tools, scheduler_tools, trigger_tools, hotkey_tools,
805-
screen_record_tools,
856+
screen_record_tools, process_and_shell_tools,
806857
)

je_auto_control/utils/mcp_server/tools/_handlers.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,96 @@ def window_restore(title_substring: str,
472472
return _show_command(title_substring, bool(case_sensitive), cmd_show=9)
473473

474474

475+
def launch_process(argv: List[str],
476+
working_directory: Optional[str] = None,
477+
) -> Dict[str, Any]:
478+
"""Spawn a detached subprocess with a sanitised argv list."""
479+
import subprocess # nosec B404 # reason: required to spawn child processes
480+
if not isinstance(argv, list) or not argv:
481+
raise ValueError("argv must be a non-empty list")
482+
cleaned = [str(part) for part in argv]
483+
cwd = None
484+
if working_directory is not None:
485+
cwd = os.path.realpath(os.fspath(working_directory))
486+
if not os.path.isdir(cwd):
487+
raise ValueError(f"working_directory does not exist: {cwd}")
488+
process = subprocess.Popen( # nosec B603 # reason: argv list, no shell expansion
489+
cleaned, cwd=cwd, stdout=subprocess.DEVNULL,
490+
stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
491+
)
492+
return {"pid": int(process.pid), "argv": cleaned}
493+
494+
495+
def list_processes(name_contains: Optional[str] = None,
496+
) -> List[Dict[str, Any]]:
497+
"""List running processes via ``psutil`` if installed; raise otherwise."""
498+
try:
499+
import psutil # type: ignore[import-untyped]
500+
except ImportError as error:
501+
raise RuntimeError(
502+
"ac_list_processes requires psutil — pip install psutil"
503+
) from error
504+
needle = name_contains.lower() if name_contains else None
505+
out: List[Dict[str, Any]] = []
506+
for proc in psutil.process_iter(["pid", "name", "username"]):
507+
info = proc.info or {}
508+
name = (info.get("name") or "")
509+
if needle and needle not in name.lower():
510+
continue
511+
out.append({
512+
"pid": int(info.get("pid") or 0),
513+
"name": name,
514+
"username": info.get("username") or "",
515+
})
516+
return out
517+
518+
519+
def kill_process(pid: int, timeout: float = 5.0) -> str:
520+
"""Terminate a PID gracefully, escalating to SIGKILL after ``timeout``."""
521+
try:
522+
import psutil # type: ignore[import-untyped]
523+
except ImportError as error:
524+
raise RuntimeError(
525+
"ac_kill_process requires psutil — pip install psutil"
526+
) from error
527+
try:
528+
proc = psutil.Process(int(pid))
529+
except psutil.NoSuchProcess:
530+
return "not-found"
531+
proc.terminate()
532+
try:
533+
proc.wait(timeout=float(timeout))
534+
return "terminated"
535+
except psutil.TimeoutExpired:
536+
proc.kill()
537+
return "killed"
538+
539+
540+
def shell_command(command: str, timeout: float = 30.0
541+
) -> Dict[str, Any]:
542+
"""Run a shell-style command line and return stdout/stderr/exit_code.
543+
544+
Uses argv-list parsing via ``shlex.split`` so we never enable a
545+
shell — protects against the parameterised command injection
546+
classes Bandit B602 / B605 cover.
547+
"""
548+
import shlex
549+
import subprocess # nosec B404 # reason: required for child execution
550+
551+
if not command or not command.strip():
552+
raise ValueError("command must be a non-empty string")
553+
argv = shlex.split(command, posix=False) if os.name == "nt" \
554+
else shlex.split(command)
555+
proc = subprocess.run( # nosec B603 # reason: argv from shlex.split, no shell
556+
argv, capture_output=True, text=True,
557+
timeout=float(timeout), check=False,
558+
)
559+
return {
560+
"exit_code": int(proc.returncode),
561+
"stdout": proc.stdout, "stderr": proc.stderr,
562+
}
563+
564+
475565
def get_clipboard() -> str:
476566
from je_auto_control.utils.clipboard.clipboard import get_clipboard as _get
477567
return _get()

test/unit_test/headless/test_mcp_server.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,60 @@ def fake_show(hwnd, cmd_show):
13501350
assert seen["call"] == (456, 6)
13511351

13521352

1353+
def test_process_tools_present_in_default_registry():
1354+
names = {tool.name for tool in build_default_tool_registry()}
1355+
assert {"ac_launch_process", "ac_list_processes",
1356+
"ac_kill_process", "ac_shell"}.issubset(names)
1357+
1358+
1359+
def test_shell_command_returns_exit_code_and_stdout():
1360+
"""Run a portable command and verify the shape."""
1361+
import sys as _sys
1362+
by_name = {tool.name: tool for tool in build_default_tool_registry()}
1363+
result = by_name["ac_shell"].invoke({
1364+
"command": f"{_sys.executable} -V",
1365+
"timeout": 5.0,
1366+
})
1367+
assert result["exit_code"] == 0
1368+
# Python 3.4+ prints the version to stdout.
1369+
assert "Python" in (result["stdout"] + result["stderr"])
1370+
1371+
1372+
def test_shell_command_rejects_empty():
1373+
by_name = {tool.name: tool for tool in build_default_tool_registry()}
1374+
try:
1375+
by_name["ac_shell"].invoke({"command": " "})
1376+
except ValueError:
1377+
pass
1378+
else:
1379+
raise AssertionError("expected ValueError for empty command")
1380+
1381+
1382+
def test_launch_process_validates_working_directory(tmp_path):
1383+
import sys as _sys
1384+
by_name = {tool.name: tool for tool in build_default_tool_registry()}
1385+
missing = tmp_path / "ghost"
1386+
try:
1387+
by_name["ac_launch_process"].invoke({
1388+
"argv": [_sys.executable, "-V"],
1389+
"working_directory": str(missing),
1390+
})
1391+
except ValueError as error:
1392+
assert "does not exist" in str(error)
1393+
else:
1394+
raise AssertionError("expected ValueError")
1395+
1396+
1397+
def test_launch_process_rejects_empty_argv():
1398+
by_name = {tool.name: tool for tool in build_default_tool_registry()}
1399+
try:
1400+
by_name["ac_launch_process"].invoke({"argv": []})
1401+
except ValueError:
1402+
pass
1403+
else:
1404+
raise AssertionError("expected ValueError for empty argv")
1405+
1406+
13531407
def test_default_registry_lists_core_automation_tools():
13541408
names = {tool.name for tool in build_default_tool_registry()}
13551409
expected = {

0 commit comments

Comments
 (0)