diff --git a/examples/10_console_example.py b/examples/10_console_example.py new file mode 100644 index 0000000..47ce2ab --- /dev/null +++ b/examples/10_console_example.py @@ -0,0 +1,226 @@ +""" +Example demonstrating the MCP Web Console feature. + +This example shows how to: +1. Create a FastAPI app with some endpoints +2. Add MCP server capabilities +3. Mount the Web Console for visualization and debugging + +Access the console at: http://localhost:8000/mcp-console +""" + +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel +from typing import List, Optional + +from fastapi_mcp import FastApiMCP + + +app = FastAPI( + title="MCP Console Demo", + description="A demo FastAPI app with MCP Web Console", + version="1.0.0", +) + + +class User(BaseModel): + id: int + name: str + email: str + age: Optional[int] = None + active: bool = True + + +class CreateUserRequest(BaseModel): + name: str + email: str + age: Optional[int] = None + + +class UpdateUserRequest(BaseModel): + name: Optional[str] = None + email: Optional[str] = None + age: Optional[int] = None + active: Optional[bool] = None + + +users_db: dict[int, User] = {} + + +sample_users = [ + User(id=1, name="Alice Johnson", email="alice@example.com", age=30), + User(id=2, name="Bob Smith", email="bob@example.com", age=25), + User(id=3, name="Charlie Brown", email="charlie@example.com", age=35), +] +for user in sample_users: + users_db[user.id] = user + + +@app.get("/users/", response_model=List[User], tags=["users"], operation_id="list_users") +async def list_users( + skip: int = Query(0, ge=0, description="Number of users to skip"), + limit: int = Query(10, ge=1, le=100, description="Maximum number of users to return"), + active_only: bool = Query(False, description="Only return active users"), +): + """ + List all users with pagination support. + + Returns a paginated list of users from the database. + """ + results = list(users_db.values()) + + if active_only: + results = [u for u in results if u.active] + + return results[skip : skip + limit] + + +@app.get("/users/{user_id}", response_model=User, tags=["users"], operation_id="get_user") +async def get_user(user_id: int): + """ + Get a specific user by ID. + + Returns the user details if found, otherwise raises a 404 error. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + return users_db[user_id] + + +@app.post("/users/", response_model=User, tags=["users"], operation_id="create_user") +async def create_user(user: CreateUserRequest): + """ + Create a new user. + + Generates a new unique ID for the user and adds them to the database. + Returns the created user with the assigned ID. + """ + new_id = max(users_db.keys(), default=0) + 1 + new_user = User( + id=new_id, + name=user.name, + email=user.email, + age=user.age, + active=True, + ) + users_db[new_id] = new_user + return new_user + + +@app.put("/users/{user_id}", response_model=User, tags=["users"], operation_id="update_user") +async def update_user(user_id: int, user: UpdateUserRequest): + """ + Update an existing user. + + Only updates the fields that are provided in the request. + Raises a 404 error if the user does not exist. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + + existing = users_db[user_id] + + if user.name is not None: + existing.name = user.name + if user.email is not None: + existing.email = user.email + if user.age is not None: + existing.age = user.age + if user.active is not None: + existing.active = user.active + + return existing + + +@app.delete("/users/{user_id}", tags=["users"], operation_id="delete_user") +async def delete_user(user_id: int): + """ + Delete a user. + + Removes the user from the database. + Raises a 404 error if the user does not exist. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + + del users_db[user_id] + return {"message": "User deleted successfully", "user_id": user_id} + + +@app.get("/users/search/", response_model=List[User], tags=["search"], operation_id="search_users") +async def search_users( + q: Optional[str] = Query(None, description="Search query for name or email"), + min_age: Optional[int] = Query(None, description="Minimum age filter"), + max_age: Optional[int] = Query(None, description="Maximum age filter"), + active: Optional[bool] = Query(None, description="Filter by active status"), +): + """ + Search users with various filter options. + + Allows searching by name/email, and filtering by age range and active status. + """ + results = list(users_db.values()) + + if q: + q = q.lower() + results = [ + user for user in results + if q in user.name.lower() or q in user.email.lower() + ] + + if min_age is not None: + results = [user for user in results if user.age is not None and user.age >= min_age] + + if max_age is not None: + results = [user for user in results if user.age is not None and user.age <= max_age] + + if active is not None: + results = [user for user in results if user.active == active] + + return results + + +@app.get("/health", tags=["system"], operation_id="health_check") +async def health_check(): + """ + Health check endpoint. + + Returns the current health status of the application. + """ + return { + "status": "healthy", + "users_count": len(users_db), + } + + +mcp = FastApiMCP(app) + +mcp.mount_http() + +mcp.mount_console( + mount_path="/mcp-console", + log_max_size=1000, +) + +print("=" * 60) +print("FastAPI-MCP Console Demo") +print("=" * 60) +print() +print("Available endpoints:") +print(" - Swagger UI: http://localhost:8000/docs") +print(" - MCP HTTP: http://localhost:8000/mcp") +print(" - MCP Console: http://localhost:8000/mcp-console") +print() +print("Access the Web Console to:") +print(" - View all registered MCP tools") +print(" - See tool descriptions, parameters, and responses") +print(" - Test tool executions with custom parameters") +print(" - View real-time call logs with timings") +print() +print("=" * 60) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/fastapi_mcp/__init__.py b/fastapi_mcp/__init__.py index f748712..9a1f2a8 100644 --- a/fastapi_mcp/__init__.py +++ b/fastapi_mcp/__init__.py @@ -14,10 +14,16 @@ from .server import FastApiMCP from .types import AuthConfig, OAuthMetadata - +from .console.logs import LogStore, LogEntry, LogStatus, get_global_log_store +from .console.router import ConsoleConfig __all__ = [ "FastApiMCP", "AuthConfig", "OAuthMetadata", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", ] diff --git a/fastapi_mcp/console/__init__.py b/fastapi_mcp/console/__init__.py new file mode 100644 index 0000000..b7caa53 --- /dev/null +++ b/fastapi_mcp/console/__init__.py @@ -0,0 +1,24 @@ +""" +FastAPI-MCP Web Console Module. + +Provides a lightweight web management console for visualizing and debugging MCP tools. +""" + +from .logs import ( + ToolCallLog, + LogStore, + LogEntry, + LogStatus, + get_global_log_store, +) +from .router import ConsoleConfig, get_console_router + +__all__ = [ + "ToolCallLog", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", + "get_console_router", +] diff --git a/fastapi_mcp/console/logs.py b/fastapi_mcp/console/logs.py new file mode 100644 index 0000000..bf28dfc --- /dev/null +++ b/fastapi_mcp/console/logs.py @@ -0,0 +1,203 @@ +""" +Log storage and types for MCP tool call logging. + +Provides in-memory storage for tool call logs with configurable max size. +""" + +import json +import time +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + + +class LogStatus(str, Enum): + """Status of a tool call log entry.""" + + PENDING = "pending" + SUCCESS = "success" + ERROR = "error" + + +@dataclass +class LogEntry: + """A single log entry for an MCP tool call.""" + + id: str + tool_name: str + arguments: Dict[str, Any] + status: LogStatus = LogStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + duration_ms: Optional[float] = None + method: Optional[str] = None + path: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert log entry to a dictionary for JSON serialization.""" + start_datetime = datetime.fromtimestamp(self.start_time).isoformat() + end_datetime = datetime.fromtimestamp(self.end_time).isoformat() if self.end_time else None + + return { + "id": self.id, + "tool_name": self.tool_name, + "arguments": self.arguments, + "status": self.status.value, + "result": self.result, + "error": self.error, + "start_time": self.start_time, + "start_datetime": start_datetime, + "end_time": self.end_time, + "end_datetime": end_datetime, + "duration_ms": self.duration_ms, + "method": self.method, + "path": self.path, + } + + def complete(self, result: Any, error: Optional[str] = None) -> None: + """Mark the log entry as completed.""" + self.end_time = time.time() + self.duration_ms = (self.end_time - self.start_time) * 1000 + + if error: + self.status = LogStatus.ERROR + self.error = error + else: + self.status = LogStatus.SUCCESS + try: + self.result = json.dumps(result, indent=2, ensure_ascii=False, default=str) + except Exception: + self.result = str(result) + + +ToolCallLog = LogEntry + + +class LogStore: + """In-memory storage for tool call logs.""" + + def __init__(self, max_size: int = 1000): + """Initialize the log store. + + Args: + max_size: Maximum number of log entries to keep. Oldest entries are removed when limit is reached. + """ + self._logs: List[LogEntry] = [] + self._max_size = max_size + self._counter: int = 0 + + def _generate_id(self) -> str: + """Generate a unique log entry ID.""" + self._counter += 1 + return f"log_{self._counter}_{int(time.time() * 1000)}" + + def create_entry( + self, + tool_name: str, + arguments: Dict[str, Any], + method: Optional[str] = None, + path: Optional[str] = None, + ) -> LogEntry: + """Create a new log entry for a pending tool call. + + Args: + tool_name: Name of the tool being called + arguments: Arguments passed to the tool + method: HTTP method (if applicable) + path: API path (if applicable) + + Returns: + The created LogEntry + """ + entry = LogEntry( + id=self._generate_id(), + tool_name=tool_name, + arguments=arguments, + method=method, + path=path, + ) + + self._logs.append(entry) + + if len(self._logs) > self._max_size: + self._logs = self._logs[-self._max_size :] + + return entry + + def get_all(self, limit: Optional[int] = None) -> List[LogEntry]: + """Get all log entries, newest first. + + Args: + limit: Maximum number of entries to return. If None, returns all. + + Returns: + List of log entries, newest first + """ + entries = list(reversed(self._logs)) + if limit is not None: + entries = entries[:limit] + return entries + + def get_by_id(self, log_id: str) -> Optional[LogEntry]: + """Get a log entry by its ID. + + Args: + log_id: The log entry ID + + Returns: + The LogEntry if found, None otherwise + """ + for entry in self._logs: + if entry.id == log_id: + return entry + return None + + def clear(self) -> int: + """Clear all log entries. + + Returns: + Number of entries cleared + """ + count = len(self._logs) + self._logs.clear() + return count + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the logs. + + Returns: + Dictionary with statistics + """ + total = len(self._logs) + success = sum(1 for e in self._logs if e.status == LogStatus.SUCCESS) + error = sum(1 for e in self._logs if e.status == LogStatus.ERROR) + pending = sum(1 for e in self._logs if e.status == LogStatus.PENDING) + + durations = [e.duration_ms for e in self._logs if e.duration_ms is not None] + avg_duration = sum(durations) / len(durations) if durations else 0 + + return { + "total": total, + "success": success, + "error": error, + "pending": pending, + "avg_duration_ms": round(avg_duration, 2), + } + + +_global_log_store: Optional[LogStore] = None + + +def get_global_log_store() -> LogStore: + """Get or create the global log store instance. + + Returns: + The global LogStore instance + """ + global _global_log_store + if _global_log_store is None: + _global_log_store = LogStore() + return _global_log_store diff --git a/fastapi_mcp/console/router.py b/fastapi_mcp/console/router.py new file mode 100644 index 0000000..432cf66 --- /dev/null +++ b/fastapi_mcp/console/router.py @@ -0,0 +1,1407 @@ +""" +Console API Router for FastAPI-MCP. + +Provides API endpoints for the web management console and serves the frontend page. +""" + +import json +from typing import Any, Callable, Dict, List, Optional, Sequence + +from fastapi import APIRouter, Depends, HTTPException, Request, params +from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import BaseModel + +from fastapi_mcp.console.logs import LogStore, get_global_log_store +from fastapi_mcp.types import AuthConfig + + +class ConsoleConfig(BaseModel): + """Configuration for the MCP Console.""" + + enabled: bool = True + mount_path: str = "/mcp-console" + log_max_size: int = 1000 + dependencies: Optional[Sequence[params.Depends]] = None + + +class ToolExecuteRequest(BaseModel): + """Request model for executing a tool.""" + + arguments: Dict[str, Any] = {} + + +CONSOLE_HTML = """ + + + + + MCP Console - FastAPI-MCP + + + +
+
+ + +
+
+ +
+
+
0
+
已注册工具
+
+
+
0
+
总调用次数
+
+
+
0
+
成功调用
+
+
+
0
+
失败调用
+
+
+ +
+ +
+
+ 已注册的 MCP 工具 + +
+
+
+ + +
+
+ 工具调用日志 +
+ + +
+
+
+
+
+ + + + +""" + + +def get_console_router( + log_store: LogStore, + tools_getter: Callable[[], List[Any]], + operation_map_getter: Callable[[], Dict[str, Any]], + execute_tool_callback: Callable[[str, Dict[str, Any]], Any], + config: ConsoleConfig, +) -> APIRouter: + """Create the console API router. + + Args: + log_store: The log store instance + tools_getter: Function that returns the list of tools + operation_map_getter: Function that returns the operation map + execute_tool_callback: Callback to execute a tool + config: Console configuration + + Returns: + Configured APIRouter + """ + router = APIRouter() + dependencies = config.dependencies or [] + + @router.get("/", response_class=HTMLResponse, include_in_schema=False) + async def get_console_page() -> HTMLResponse: + """Serve the console HTML page.""" + return HTMLResponse(content=CONSOLE_HTML) + + @router.get("/api/tools", include_in_schema=False, dependencies=dependencies) + async def get_tools() -> Dict[str, Any]: + """Get all registered MCP tools. + + Returns: + Dictionary with tools and operation_map + """ + tools = tools_getter() + operation_map = operation_map_getter() + + serializable_tools = [] + for tool in tools: + if hasattr(tool, "model_dump"): + serializable_tools.append(tool.model_dump()) + elif hasattr(tool, "__dict__"): + serializable_tools.append(tool.__dict__) + else: + serializable_tools.append(dict(tool)) + + return {"tools": serializable_tools, "operation_map": operation_map} + + @router.get("/api/tools/{tool_name}", include_in_schema=False, dependencies=dependencies) + async def get_tool(tool_name: str) -> Dict[str, Any]: + """Get a specific tool by name. + + Args: + tool_name: Name of the tool + + Returns: + Tool information + """ + tools = tools_getter() + operation_map = operation_map_getter() + + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") + + tool_dict = tool.model_dump() if hasattr(tool, "model_dump") else dict(tool) + op_info = operation_map.get(tool_name, {}) + + return {"tool": tool_dict, "operation": op_info} + + @router.post("/api/tools/{tool_name}/execute", include_in_schema=False, dependencies=dependencies) + async def execute_tool(tool_name: str, request: Request, body: ToolExecuteRequest) -> Any: + """Execute an MCP tool. + + Args: + tool_name: Name of the tool to execute + request: FastAPI request object + body: Request body containing arguments + + Returns: + Tool execution result + """ + tools = tools_getter() + operation_map = operation_map_getter() + + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") + + op_info = operation_map.get(tool_name, {}) + + log_entry = log_store.create_entry( + tool_name=tool_name, + arguments=body.arguments, + method=op_info.get("method"), + path=op_info.get("path"), + ) + + try: + result = await execute_tool_callback(tool_name, body.arguments) + log_entry.complete(result=result) + return result + except Exception as e: + log_entry.complete(result=None, error=str(e)) + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/api/logs", include_in_schema=False, dependencies=dependencies) + async def get_logs(limit: Optional[int] = 100) -> Dict[str, Any]: + """Get tool call logs. + + Args: + limit: Maximum number of logs to return + + Returns: + List of log entries + """ + entries = log_store.get_all(limit=limit) + return {"logs": [entry.to_dict() for entry in entries]} + + @router.get("/api/logs/{log_id}", include_in_schema=False, dependencies=dependencies) + async def get_log(log_id: str) -> Dict[str, Any]: + """Get a specific log entry by ID. + + Args: + log_id: The log entry ID + + Returns: + Log entry + """ + entry = log_store.get_by_id(log_id) + if entry is None: + raise HTTPException(status_code=404, detail=f"Log not found: {log_id}") + return entry.to_dict() + + @router.delete("/api/logs", include_in_schema=False, dependencies=dependencies) + async def clear_logs() -> Dict[str, Any]: + """Clear all log entries. + + Returns: + Count of cleared entries + """ + count = log_store.clear() + return {"cleared": count} + + @router.get("/api/stats", include_in_schema=False, dependencies=dependencies) + async def get_stats() -> Dict[str, Any]: + """Get statistics about tool calls. + + Returns: + Statistics dictionary + """ + return log_store.get_stats() + + return router diff --git a/fastapi_mcp/server.py b/fastapi_mcp/server.py index bb75106..4b859e1 100644 --- a/fastapi_mcp/server.py +++ b/fastapi_mcp/server.py @@ -12,6 +12,8 @@ from fastapi_mcp.transport.sse import FastApiSseTransport from fastapi_mcp.transport.http import FastApiHttpSessionManager from fastapi_mcp.types import HTTPRequestInfo, AuthConfig +from fastapi_mcp.console.logs import LogStore, get_global_log_store, LogEntry +from fastapi_mcp.console.router import ConsoleConfig, get_console_router import logging @@ -120,6 +122,8 @@ def __init__( self._forward_headers = {h.lower() for h in headers} self._http_transport: FastApiHttpSessionManager | None = None # Store reference to HTTP transport for cleanup + self._log_store: LogStore | None = None + self._console_mounted: bool = False self.setup_server() @@ -481,6 +485,192 @@ def mount( f"Unsupported transport: {transport}. Use mount_sse() or mount_http() instead." ) + def mount_console( + self, + router: Annotated[ + Optional[FastAPI | APIRouter], + Doc( + """ + The FastAPI app or APIRouter to mount the console to. If not provided, the console + will be mounted to the FastAPI app. + """ + ), + ] = None, + mount_path: Annotated[ + str, + Doc( + """ + Path where the MCP console will be mounted. + Defaults to '/mcp-console'. + """ + ), + ] = "/mcp-console", + log_max_size: Annotated[ + int, + Doc( + """ + Maximum number of log entries to keep. + Defaults to 1000. + """ + ), + ] = 1000, + dependencies: Annotated[ + Optional[Sequence[params.Depends]], + Doc( + """ + FastAPI dependencies to apply to all console endpoints. + Can be used for authentication and authorization. + """ + ), + ] = None, + ) -> None: + """ + Mount the MCP Web Console to **any** FastAPI app or APIRouter. + + The console provides: + - A web UI for viewing all registered MCP tools + - Online debugging capability to test tool executions + - Real-time logging of tool calls with arguments, responses, and execution times + + Args: + router: The FastAPI app or APIRouter to mount the console to + mount_path: Path where the console will be accessible + log_max_size: Maximum number of log entries to keep in memory + dependencies: FastAPI dependencies for authentication/authorization + + Example: + ```python + from fastapi import FastAPI + from fastapi_mcp import FastApiMCP + + app = FastAPI() + + # ... define your routes ... + + mcp = FastApiMCP(app) + mcp.mount_http() + mcp.mount_console() # Access at http://localhost:8000/mcp-console + + if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) + ``` + """ + if not mount_path.startswith("/"): + mount_path = f"/{mount_path}" + if mount_path.endswith("/"): + mount_path = mount_path[:-1] + + if not router: + router = self.fastapi + + assert isinstance(router, (FastAPI, APIRouter)), f"Invalid router type: {type(router)}" + + self._log_store = LogStore(max_size=log_max_size) + + config = ConsoleConfig( + enabled=True, + mount_path=mount_path, + log_max_size=log_max_size, + dependencies=dependencies, + ) + + def tools_getter() -> List[Any]: + return self.tools + + def operation_map_getter() -> Dict[str, Any]: + return self.operation_map + + async def execute_callback(tool_name: str, arguments: Dict[str, Any]) -> Any: + return await self.execute_tool_direct(tool_name, arguments) + + console_router = get_console_router( + log_store=self._log_store, + tools_getter=tools_getter, + operation_map_getter=operation_map_getter, + execute_tool_callback=execute_callback, + config=config, + ) + + if isinstance(router, FastAPI): + router.include_router(console_router, prefix=mount_path) + else: + base_prefix = router.prefix if router.prefix else "" + full_prefix = base_prefix + mount_path + self.fastapi.include_router(console_router, prefix=full_prefix) + + self._console_mounted = True + logger.info(f"MCP Console mounted at {mount_path}") + + async def execute_tool_direct( + self, + tool_name: str, + arguments: Dict[str, Any], + ) -> Any: + """ + Execute a tool directly without going through the MCP protocol. + + This is used by the web console for direct tool execution. + + Args: + tool_name: Name of the tool to execute + arguments: Arguments for the tool + + Returns: + The tool execution result (parsed JSON if possible) + """ + if tool_name not in self.operation_map: + raise ValueError(f"Unknown tool: {tool_name}") + + operation = self.operation_map[tool_name] + path: str = operation["path"] + method: str = operation["method"] + parameters: List[Dict[str, Any]] = operation.get("parameters", []) + arguments = arguments.copy() if arguments else {} + + for param in parameters: + if param.get("in") == "path" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + path = path.replace(f"{{{param_name}}}", str(arguments.pop(param_name))) + + query = {} + for param in parameters: + if param.get("in") == "query" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + query[param_name] = arguments.pop(param_name) + + headers = {} + for param in parameters: + if param.get("in") == "header" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + headers[param_name] = arguments.pop(param_name) + + body = arguments if arguments else None + + logger.debug(f"Making {method.upper()} request to {path}") + response = await self._request(self._http_client, method, path, query, headers, body) + + try: + result = response.json() + except json.JSONDecodeError: + if hasattr(response, "text"): + result = response.text + else: + result = response.content + + if 400 <= response.status_code < 600: + raise ValueError( + f"Error calling {tool_name}. Status code: {response.status_code}. Response: {response.text}" + ) + + return result + async def _execute_api_tool( self, client: Annotated[httpx.AsyncClient, Doc("httpx client to use in API calls")],