-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuiltin_tools.py
More file actions
175 lines (150 loc) · 5.99 KB
/
Copy pathbuiltin_tools.py
File metadata and controls
175 lines (150 loc) · 5.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
Built-in mcpproxy utility tools — registered automatically at startup,
no YAML config file required.
mcpproxy__listfiles List files/directories inside the mcpproxy files dir.
mcpproxy__getfile Read a file from the mcpproxy files dir (text or base64).
The *base directory* defaults to ``/app/files`` (mounted as a Docker volume so
artefacts persist across container restarts) and can be overridden at runtime with
the ``MCPPROXY_FILES_DIR`` environment variable. Only files **inside** the base
directory are accessible — path-traversal attempts are rejected.
"""
import base64
import os
from pathlib import Path
from typing import Any
def _base_dir() -> Path:
"""Return the resolved base directory for built-in file access.
Evaluated on each call so that tests can override MCPPROXY_FILES_DIR
with monkeypatch without restarting the process.
"""
raw = os.environ.get("MCPPROXY_FILES_DIR", "/app/files")
return Path(raw).resolve()
def _safe_resolve(relative: str | None) -> Path:
"""Resolve *relative* under the base dir; raise ValueError on traversal."""
base = _base_dir()
target = (base / (relative or "")).resolve()
# relative_to() raises ValueError if target is not under base
try:
target.relative_to(base)
except ValueError:
raise ValueError(
f"Path '{relative}' is outside the allowed directory '{base}'"
)
return target
# ---------------------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------------------
async def list_files(
context: dict[str, Any],
path: str | None = None,
recursive: bool = True,
max_depth: int | None = None,
) -> dict[str, Any]:
"""List files and subdirectories at *path* inside the files base directory.
Returns a JSON object with an ``entries`` list; each entry has ``name``
(basename), ``path`` (relative to the **base files directory**, using ``/``
as the separator — this is the value to pass to ``mcpproxy__getfile``),
``type`` (``"file"`` or ``"directory"``), and ``size`` (bytes, files only).
When *recursive* is true (default), descends into subdirectories. Each
directory is still emitted as its own entry (with ``type="directory"``)
before its children. *max_depth* limits the recursion depth (``1`` =
immediate children only, same as ``recursive=False``; ``None`` = unlimited).
Symlinks to directories are not followed, to avoid cycles.
If the directory does not exist yet the entries list is empty (not an error).
"""
try:
target = _safe_resolve(path)
base = _base_dir()
if not target.exists():
return {
"ok": True,
"base_dir": str(base),
"path": path or "",
"recursive": recursive,
"entries": [],
}
if not target.is_dir():
return {"ok": False, "error": f"'{path}' is not a directory"}
entries: list[dict[str, Any]] = []
def _walk(directory: Path, depth: int) -> None:
for entry in sorted(directory.iterdir()):
is_dir = entry.is_dir() and not entry.is_symlink()
rel = entry.relative_to(base).as_posix()
entries.append(
{
"name": entry.name,
"path": rel,
"type": "directory" if is_dir else "file",
"size": entry.stat().st_size if entry.is_file() else None,
}
)
if recursive and is_dir and (max_depth is None or depth + 1 < max_depth):
_walk(entry, depth + 1)
_walk(target, 0)
return {
"ok": True,
"base_dir": str(base),
"path": path or "",
"recursive": recursive,
"entries": entries,
}
except Exception as exc:
return {"ok": False, "error": str(exc)}
async def get_file(
context: dict[str, Any],
path: str,
encoding: str = "auto",
) -> dict[str, Any]:
"""Read a file from the files base directory.
*encoding* controls how the content is returned:
``"auto"`` (default) — try UTF-8; fall back to base64 for binary files.
``"text"`` — decode as UTF-8; error if the file is binary.
``"base64"`` — always return base64-encoded bytes (safe for images etc.).
Returns a JSON object with ``content`` (string), ``encoding`` used, and
``size`` (bytes).
"""
try:
target = _safe_resolve(path)
if not target.exists():
return {"ok": False, "error": f"File not found: {path}"}
if not target.is_file():
return {"ok": False, "error": f"Not a file: {path}"}
raw = target.read_bytes()
size = len(raw)
if encoding == "base64":
return {
"ok": True,
"path": path,
"size": size,
"content": base64.b64encode(raw).decode(),
"encoding": "base64",
}
# encoding == "text" or "auto"
try:
text = raw.decode("utf-8")
return {
"ok": True,
"path": path,
"size": size,
"content": text,
"encoding": "text",
}
except UnicodeDecodeError:
if encoding == "text":
return {
"ok": False,
"error": (
f"File '{path}' is not valid UTF-8 text. "
"Try encoding='base64'."
),
}
# "auto" fallback → base64
return {
"ok": True,
"path": path,
"size": size,
"content": base64.b64encode(raw).decode(),
"encoding": "base64",
}
except Exception as exc:
return {"ok": False, "error": str(exc)}