Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,23 @@ api_auth:
- "/assistant"
- "/vaults"

# Security-related settings
security:
# Allow-list for the /api/documents/upload endpoint. Any file path passed
# to this endpoint must resolve to a location under one of these roots,
# otherwise the upload is rejected. This prevents the endpoint from being
# abused to read arbitrary files (e.g. ~/.ssh/id_rsa) and ship their
# contents to the configured VLM/embedding provider.
#
# Paths under capture.folder_monitor.watch_folder_paths are implicitly
# allowed and do not need to be repeated here.
#
# If both this list and watch_folder_paths are empty, the upload endpoint
# falls back to ~/Documents, ~/Downloads and ~/Desktop (when they exist).
document_upload_allowed_paths: []
# - "${CONTEXT_PATH:.}/uploads"
# - "~/Documents/MineContext"

# Prompts configuration
prompts:
language: "zh"
Expand Down
171 changes: 168 additions & 3 deletions opencontext/server/context_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

import datetime
import os
from typing import Any, Dict, List, Optional
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from opencontext.models.context import ProcessedContext, RawContextProperties, Vectorize
from opencontext.models.enums import (
Expand All @@ -26,6 +27,135 @@
logger = get_logger(__name__)


# Path components / locations that must never be ingested by /api/documents/upload,
# regardless of the configured allow-list. These hold credentials, keys, or system
# state whose contents would be sent to the configured (potentially remote)
# VLM/embedding provider for processing.
_SENSITIVE_PATH_COMPONENTS = frozenset(
{
".ssh",
".aws",
".gnupg",
".azure",
".gcloud",
".kube",
".docker",
".password-store",
"Keychains",
}
)

_SENSITIVE_FILENAMES = frozenset(
{
".env",
"id_rsa",
"id_ed25519",
"id_ecdsa",
"id_dsa",
"shadow",
}
)

_SENSITIVE_PATH_PREFIXES = (
"/etc",
"/proc",
"/sys",
"/dev",
"/root",
"/var/log",
"/var/db",
"/private/etc",
"/private/var/log",
"/private/var/db",
)


def _is_sensitive_path(path: Path) -> Tuple[bool, str]:
"""Defense-in-depth deny check for clearly sensitive locations."""
for part in path.parts:
if part in _SENSITIVE_PATH_COMPONENTS:
return True, f"path contains sensitive directory '{part}'"
if path.name in _SENSITIVE_FILENAMES:
return True, f"file '{path.name}' is on sensitive-filename deny list"
path_str = str(path)
for prefix in _SENSITIVE_PATH_PREFIXES:
if path_str == prefix or path_str.startswith(prefix + os.sep):
return True, f"path is under sensitive system directory '{prefix}'"
return False, ""


def _resolve_paths_from_config(values: Any) -> List[Path]:
"""Coerce a config value (string, list of strings, or None) into a list of
resolved absolute Paths. Silently skips entries that fail to resolve."""
if values is None:
return []
if isinstance(values, str):
values = [values]
if not isinstance(values, list):
return []
out: List[Path] = []
for raw in values:
if not isinstance(raw, str) or not raw:
continue
try:
out.append(Path(raw).expanduser().resolve())
except Exception:
continue
return out


def _resolve_allowed_upload_roots(config: Optional[Dict[str, Any]]) -> List[Path]:
"""Compute the set of directories under which a path passed to
/api/documents/upload is permitted to live.

Sources, in order:
1. capture.folder_monitor.watch_folder_paths — directories the user has
already opted in to having watched/processed.
2. security.document_upload_allowed_paths — explicit allow-list extension
point for the upload endpoint.
3. Fallback if neither is set: ~/Documents, ~/Downloads, ~/Desktop.
"""
cfg = config or {}
roots: List[Path] = []

capture_cfg = cfg.get("capture") or {}
folder_monitor_cfg = capture_cfg.get("folder_monitor") or {}
roots.extend(_resolve_paths_from_config(folder_monitor_cfg.get("watch_folder_paths")))

security_cfg = cfg.get("security") or {}
roots.extend(_resolve_paths_from_config(security_cfg.get("document_upload_allowed_paths")))

if not roots:
try:
home = Path.home().resolve()
for sub in ("Documents", "Downloads", "Desktop"):
candidate = home / sub
if candidate.exists():
roots.append(candidate)
except Exception:
pass

# De-duplicate while preserving order.
seen = set()
unique: List[Path] = []
for r in roots:
key = str(r)
if key not in seen:
seen.add(key)
unique.append(r)
return unique


def _is_path_under_any_root(path: Path, roots: List[Path]) -> bool:
for root in roots:
try:
path.relative_to(root)
return True
except ValueError:
continue
return False


class ContextOperations:
"""Handles context CRUD and search operations."""

Expand Down Expand Up @@ -115,19 +245,54 @@ def add_screenshot(
def add_document(self, file_path: str, context_processor_callback) -> Optional[str]:
"""Add a document to the system."""
import uuid
from pathlib import Path

from opencontext.config.global_config import get_config

# Validate inputs
if not file_path:
return "Document path cannot be empty"

path = Path(file_path).expanduser()
expanded = Path(file_path).expanduser()
if not expanded.is_absolute():
return "Document path must be absolute"

try:
path = expanded.resolve(strict=False)
except Exception as e:
return f"Cannot resolve document path: {e}"

if not path.exists():
return f"Document path {file_path} does not exist"

if not path.is_file():
return f"Path {file_path} is not a file"

# The contents of any file accepted here are forwarded to the configured
# VLM / embedding provider for processing, so this endpoint must not be
# usable as an arbitrary file-read primitive against the host.
sensitive, reason = _is_sensitive_path(path)
if sensitive:
logger.warning(
"Rejected document upload from sensitive path: %s (%s)", file_path, reason
)
return f"Document path is not allowed: {reason}"

allowed_roots = _resolve_allowed_upload_roots(get_config())
if not _is_path_under_any_root(path, allowed_roots):
roots_pretty = ", ".join(str(r) for r in allowed_roots) if allowed_roots else "<none>"
logger.warning(
"Rejected document upload from path outside allow-list: %s "
"(allowed roots: [%s])",
file_path,
roots_pretty,
)
return (
f"Document path is not within an allowed directory. "
f"Allowed roots: [{roots_pretty}]. "
f"To permit additional directories, set "
f"'security.document_upload_allowed_paths' in config.yaml."
)

try:
# Create RawContextProperties
object_id = f"doc_{uuid.uuid4()}"
Expand Down