diff --git a/examples/10_console_example.py b/examples/10_console_example.py new file mode 100644 index 0000000..b579843 --- /dev/null +++ b/examples/10_console_example.py @@ -0,0 +1,311 @@ +""" +Example demonstrating the MCP Web Console feature. + +This example shows how to: +1. Create a FastAPI app with some endpoints +2. Add MCP server capabilities +3. Mount the Web Console for visualization and debugging + +Access the console at: http://localhost:8000/mcp-console +""" + +import sys +import os +import site +from pathlib import Path + +user_site = site.getusersitepackages() +if user_site and user_site not in sys.path: + sys.path.insert(0, user_site) + +project_root = Path(__file__).parent.parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +print("=" * 60) +print("Python Path Debug Info") +print("=" * 60) +print(f"Project root: {project_root}") +print(f"User site-packages: {user_site}") +print(f"Current file: {__file__}") +print() +print("sys.path:") +for i, p in enumerate(sys.path): + print(f" [{i}] {p}") +print() +print("Checking fastapi_mcp module location...") + +try: + import fastapi_mcp + print(f"fastapi_mcp imported from: {fastapi_mcp.__file__}") + print(f"fastapi_mcp version: {fastapi_mcp.__version__}") + + expected_server_py = project_root / 'fastapi_mcp' / 'server.py' + actual_server_py = Path(fastapi_mcp.__file__).parent / 'server.py' + + print() + print(f"Expected server.py: {expected_server_py}") + print(f"Actual module path: {Path(fastapi_mcp.__file__).parent}") + + if str(expected_server_py.parent) != str(Path(fastapi_mcp.__file__).parent): + print() + print("WARNING: Importing from different location!") + print("This may be an older installed version.") + + print() + print("Checking FastApiMCP methods...") + from fastapi_mcp import FastApiMCP + methods = [m for m in dir(FastApiMCP) if not m.startswith('_')] + print(f"FastApiMCP public methods: {methods}") + + if 'mount_console' not in methods: + print() + print("=" * 60) + print("ERROR: 'mount_console' method not found!") + print("=" * 60) + print() + print("This means Python is importing an older version of fastapi-mcp") + print("that doesn't have the new console feature.") + print() + print(f"Expected location: {expected_server_py}") + print(f"Actual location: {actual_server_py}") + print() + print("Solutions:") + print(" 1. Force reinstall local version:") + print(" pip install -e . --user --force-reinstall --no-deps") + print() + print(" 2. Or check what's in actual location:") + print(f" dir {actual_server_py.parent}") + print() + print(" 3. Or remove the older version from site-packages:") + print(f" pip uninstall fastapi-mcp") + print(f" Then run: pip install -e . --user") + sys.exit(1) + else: + print("✓ 'mount_console' method found!") + +except ImportError as e: + print() + print("=" * 60) + print(f"ERROR: Failed to import: {e}") + print("=" * 60) + print() + print("This usually means dependencies are not installed or not in path.") + print() + print("Try installing dependencies:") + print(" pip install fastapi uvicorn pydantic mcp httpx --user") + sys.exit(1) + +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel +from typing import List, Optional + + +app = FastAPI( + title="MCP Console Demo", + description="A demo FastAPI app with MCP Web Console", + version="1.0.0", +) + + +class User(BaseModel): + id: int + name: str + email: str + age: Optional[int] = None + active: bool = True + + +class CreateUserRequest(BaseModel): + name: str + email: str + age: Optional[int] = None + + +class UpdateUserRequest(BaseModel): + name: Optional[str] = None + email: Optional[str] = None + age: Optional[int] = None + active: Optional[bool] = None + + +users_db: dict[int, User] = {} + + +sample_users = [ + User(id=1, name="Alice Johnson", email="alice@example.com", age=30), + User(id=2, name="Bob Smith", email="bob@example.com", age=25), + User(id=3, name="Charlie Brown", email="charlie@example.com", age=35), +] +for user in sample_users: + users_db[user.id] = user + + +@app.get("/users/", response_model=List[User], tags=["users"], operation_id="list_users") +async def list_users( + skip: int = Query(0, ge=0, description="Number of users to skip"), + limit: int = Query(10, ge=1, le=100, description="Maximum number of users to return"), + active_only: bool = Query(False, description="Only return active users"), +): + """ + List all users with pagination support. + + Returns a paginated list of users from the database. + """ + results = list(users_db.values()) + + if active_only: + results = [u for u in results if u.active] + + return results[skip : skip + limit] + + +@app.get("/users/{user_id}", response_model=User, tags=["users"], operation_id="get_user") +async def get_user(user_id: int): + """ + Get a specific user by ID. + + Returns the user details if found, otherwise raises a 404 error. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + return users_db[user_id] + + +@app.post("/users/", response_model=User, tags=["users"], operation_id="create_user") +async def create_user(user: CreateUserRequest): + """ + Create a new user. + + Generates a new unique ID for the user and adds them to the database. + Returns the created user with the assigned ID. + """ + new_id = max(users_db.keys(), default=0) + 1 + new_user = User( + id=new_id, + name=user.name, + email=user.email, + age=user.age, + active=True, + ) + users_db[new_id] = new_user + return new_user + + +@app.put("/users/{user_id}", response_model=User, tags=["users"], operation_id="update_user") +async def update_user(user_id: int, user: UpdateUserRequest): + """ + Update an existing user. + + Only updates the fields that are provided in the request. + Raises a 404 error if the user does not exist. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + + existing = users_db[user_id] + + if user.name is not None: + existing.name = user.name + if user.email is not None: + existing.email = user.email + if user.age is not None: + existing.age = user.age + if user.active is not None: + existing.active = user.active + + return existing + + +@app.delete("/users/{user_id}", tags=["users"], operation_id="delete_user") +async def delete_user(user_id: int): + """ + Delete a user. + + Removes the user from the database. + Raises a 404 error if the user does not exist. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + + del users_db[user_id] + return {"message": "User deleted successfully", "user_id": user_id} + + +@app.get("/users/search/", response_model=List[User], tags=["search"], operation_id="search_users") +async def search_users( + q: Optional[str] = Query(None, description="Search query for name or email"), + min_age: Optional[int] = Query(None, description="Minimum age filter"), + max_age: Optional[int] = Query(None, description="Maximum age filter"), + active: Optional[bool] = Query(None, description="Filter by active status"), +): + """ + Search users with various filter options. + + Allows searching by name/email, and filtering by age range and active status. + """ + results = list(users_db.values()) + + if q: + q = q.lower() + results = [ + user for user in results + if q in user.name.lower() or q in user.email.lower() + ] + + if min_age is not None: + results = [user for user in results if user.age is not None and user.age >= min_age] + + if max_age is not None: + results = [user for user in results if user.age is not None and user.age <= max_age] + + if active is not None: + results = [user for user in results if user.active == active] + + return results + + +@app.get("/health", tags=["system"], operation_id="health_check") +async def health_check(): + """ + Health check endpoint. + + Returns the current health status of the application. + """ + return { + "status": "healthy", + "users_count": len(users_db), + } + + +mcp = FastApiMCP(app) + +mcp.mount_http() + +mcp.mount_console( + mount_path="/mcp-console", + log_max_size=1000, +) + +print("=" * 60) +print("FastAPI-MCP Console Demo") +print("=" * 60) +print() +print("Available endpoints:") +print(" - Swagger UI: http://localhost:8000/docs") +print(" - MCP HTTP: http://localhost:8000/mcp") +print(" - MCP Console: http://localhost:8000/mcp-console") +print() +print("Access the Web Console to:") +print(" - View all registered MCP tools") +print(" - See tool descriptions, parameters, and responses") +print(" - Test tool executions with custom parameters") +print(" - View real-time call logs with timings") +print() +print("=" * 60) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/fastapi_mcp/__init__.py b/fastapi_mcp/__init__.py index f748712..9a1f2a8 100644 --- a/fastapi_mcp/__init__.py +++ b/fastapi_mcp/__init__.py @@ -14,10 +14,16 @@ from .server import FastApiMCP from .types import AuthConfig, OAuthMetadata - +from .console.logs import LogStore, LogEntry, LogStatus, get_global_log_store +from .console.router import ConsoleConfig __all__ = [ "FastApiMCP", "AuthConfig", "OAuthMetadata", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", ] diff --git a/fastapi_mcp/console/__init__.py b/fastapi_mcp/console/__init__.py new file mode 100644 index 0000000..b7caa53 --- /dev/null +++ b/fastapi_mcp/console/__init__.py @@ -0,0 +1,24 @@ +""" +FastAPI-MCP Web Console Module. + +Provides a lightweight web management console for visualizing and debugging MCP tools. +""" + +from .logs import ( + ToolCallLog, + LogStore, + LogEntry, + LogStatus, + get_global_log_store, +) +from .router import ConsoleConfig, get_console_router + +__all__ = [ + "ToolCallLog", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", + "get_console_router", +] diff --git a/fastapi_mcp/console/logs.py b/fastapi_mcp/console/logs.py new file mode 100644 index 0000000..bf28dfc --- /dev/null +++ b/fastapi_mcp/console/logs.py @@ -0,0 +1,203 @@ +""" +Log storage and types for MCP tool call logging. + +Provides in-memory storage for tool call logs with configurable max size. +""" + +import json +import time +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + + +class LogStatus(str, Enum): + """Status of a tool call log entry.""" + + PENDING = "pending" + SUCCESS = "success" + ERROR = "error" + + +@dataclass +class LogEntry: + """A single log entry for an MCP tool call.""" + + id: str + tool_name: str + arguments: Dict[str, Any] + status: LogStatus = LogStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + duration_ms: Optional[float] = None + method: Optional[str] = None + path: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert log entry to a dictionary for JSON serialization.""" + start_datetime = datetime.fromtimestamp(self.start_time).isoformat() + end_datetime = datetime.fromtimestamp(self.end_time).isoformat() if self.end_time else None + + return { + "id": self.id, + "tool_name": self.tool_name, + "arguments": self.arguments, + "status": self.status.value, + "result": self.result, + "error": self.error, + "start_time": self.start_time, + "start_datetime": start_datetime, + "end_time": self.end_time, + "end_datetime": end_datetime, + "duration_ms": self.duration_ms, + "method": self.method, + "path": self.path, + } + + def complete(self, result: Any, error: Optional[str] = None) -> None: + """Mark the log entry as completed.""" + self.end_time = time.time() + self.duration_ms = (self.end_time - self.start_time) * 1000 + + if error: + self.status = LogStatus.ERROR + self.error = error + else: + self.status = LogStatus.SUCCESS + try: + self.result = json.dumps(result, indent=2, ensure_ascii=False, default=str) + except Exception: + self.result = str(result) + + +ToolCallLog = LogEntry + + +class LogStore: + """In-memory storage for tool call logs.""" + + def __init__(self, max_size: int = 1000): + """Initialize the log store. + + Args: + max_size: Maximum number of log entries to keep. Oldest entries are removed when limit is reached. + """ + self._logs: List[LogEntry] = [] + self._max_size = max_size + self._counter: int = 0 + + def _generate_id(self) -> str: + """Generate a unique log entry ID.""" + self._counter += 1 + return f"log_{self._counter}_{int(time.time() * 1000)}" + + def create_entry( + self, + tool_name: str, + arguments: Dict[str, Any], + method: Optional[str] = None, + path: Optional[str] = None, + ) -> LogEntry: + """Create a new log entry for a pending tool call. + + Args: + tool_name: Name of the tool being called + arguments: Arguments passed to the tool + method: HTTP method (if applicable) + path: API path (if applicable) + + Returns: + The created LogEntry + """ + entry = LogEntry( + id=self._generate_id(), + tool_name=tool_name, + arguments=arguments, + method=method, + path=path, + ) + + self._logs.append(entry) + + if len(self._logs) > self._max_size: + self._logs = self._logs[-self._max_size :] + + return entry + + def get_all(self, limit: Optional[int] = None) -> List[LogEntry]: + """Get all log entries, newest first. + + Args: + limit: Maximum number of entries to return. If None, returns all. + + Returns: + List of log entries, newest first + """ + entries = list(reversed(self._logs)) + if limit is not None: + entries = entries[:limit] + return entries + + def get_by_id(self, log_id: str) -> Optional[LogEntry]: + """Get a log entry by its ID. + + Args: + log_id: The log entry ID + + Returns: + The LogEntry if found, None otherwise + """ + for entry in self._logs: + if entry.id == log_id: + return entry + return None + + def clear(self) -> int: + """Clear all log entries. + + Returns: + Number of entries cleared + """ + count = len(self._logs) + self._logs.clear() + return count + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the logs. + + Returns: + Dictionary with statistics + """ + total = len(self._logs) + success = sum(1 for e in self._logs if e.status == LogStatus.SUCCESS) + error = sum(1 for e in self._logs if e.status == LogStatus.ERROR) + pending = sum(1 for e in self._logs if e.status == LogStatus.PENDING) + + durations = [e.duration_ms for e in self._logs if e.duration_ms is not None] + avg_duration = sum(durations) / len(durations) if durations else 0 + + return { + "total": total, + "success": success, + "error": error, + "pending": pending, + "avg_duration_ms": round(avg_duration, 2), + } + + +_global_log_store: Optional[LogStore] = None + + +def get_global_log_store() -> LogStore: + """Get or create the global log store instance. + + Returns: + The global LogStore instance + """ + global _global_log_store + if _global_log_store is None: + _global_log_store = LogStore() + return _global_log_store diff --git a/fastapi_mcp/console/router.py b/fastapi_mcp/console/router.py new file mode 100644 index 0000000..432cf66 --- /dev/null +++ b/fastapi_mcp/console/router.py @@ -0,0 +1,1407 @@ +""" +Console API Router for FastAPI-MCP. + +Provides API endpoints for the web management console and serves the frontend page. +""" + +import json +from typing import Any, Callable, Dict, List, Optional, Sequence + +from fastapi import APIRouter, Depends, HTTPException, Request, params +from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import BaseModel + +from fastapi_mcp.console.logs import LogStore, get_global_log_store +from fastapi_mcp.types import AuthConfig + + +class ConsoleConfig(BaseModel): + """Configuration for the MCP Console.""" + + enabled: bool = True + mount_path: str = "/mcp-console" + log_max_size: int = 1000 + dependencies: Optional[Sequence[params.Depends]] = None + + +class ToolExecuteRequest(BaseModel): + """Request model for executing a tool.""" + + arguments: Dict[str, Any] = {} + + +CONSOLE_HTML = """ + +
+ + +MCP 工具管理与调试面板
+