From 2b31820ca9fcfe538cf63c75cd35115ded664972 Mon Sep 17 00:00:00 2001 From: GaoXiang233 <1679562189@qq.com> Date: Fri, 24 Apr 2026 14:17:51 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=E9=A6=96=E6=AC=A1=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=E6=88=91=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/10_console_example.py | 226 +++++ fastapi_mcp/__init__.py | 8 +- fastapi_mcp/console/__init__.py | 24 + fastapi_mcp/console/logs.py | 203 +++++ fastapi_mcp/console/router.py | 1407 +++++++++++++++++++++++++++++++ fastapi_mcp/server.py | 190 +++++ 6 files changed, 2057 insertions(+), 1 deletion(-) create mode 100644 examples/10_console_example.py create mode 100644 fastapi_mcp/console/__init__.py create mode 100644 fastapi_mcp/console/logs.py create mode 100644 fastapi_mcp/console/router.py diff --git a/examples/10_console_example.py b/examples/10_console_example.py new file mode 100644 index 0000000..47ce2ab --- /dev/null +++ b/examples/10_console_example.py @@ -0,0 +1,226 @@ +""" +Example demonstrating the MCP Web Console feature. + +This example shows how to: +1. Create a FastAPI app with some endpoints +2. Add MCP server capabilities +3. Mount the Web Console for visualization and debugging + +Access the console at: http://localhost:8000/mcp-console +""" + +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel +from typing import List, Optional + +from fastapi_mcp import FastApiMCP + + +app = FastAPI( + title="MCP Console Demo", + description="A demo FastAPI app with MCP Web Console", + version="1.0.0", +) + + +class User(BaseModel): + id: int + name: str + email: str + age: Optional[int] = None + active: bool = True + + +class CreateUserRequest(BaseModel): + name: str + email: str + age: Optional[int] = None + + +class UpdateUserRequest(BaseModel): + name: Optional[str] = None + email: Optional[str] = None + age: Optional[int] = None + active: Optional[bool] = None + + +users_db: dict[int, User] = {} + + +sample_users = [ + User(id=1, name="Alice Johnson", email="alice@example.com", age=30), + User(id=2, name="Bob Smith", email="bob@example.com", age=25), + User(id=3, name="Charlie Brown", email="charlie@example.com", age=35), +] +for user in sample_users: + users_db[user.id] = user + + +@app.get("/users/", response_model=List[User], tags=["users"], operation_id="list_users") +async def list_users( + skip: int = Query(0, ge=0, description="Number of users to skip"), + limit: int = Query(10, ge=1, le=100, description="Maximum number of users to return"), + active_only: bool = Query(False, description="Only return active users"), +): + """ + List all users with pagination support. + + Returns a paginated list of users from the database. + """ + results = list(users_db.values()) + + if active_only: + results = [u for u in results if u.active] + + return results[skip : skip + limit] + + +@app.get("/users/{user_id}", response_model=User, tags=["users"], operation_id="get_user") +async def get_user(user_id: int): + """ + Get a specific user by ID. + + Returns the user details if found, otherwise raises a 404 error. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + return users_db[user_id] + + +@app.post("/users/", response_model=User, tags=["users"], operation_id="create_user") +async def create_user(user: CreateUserRequest): + """ + Create a new user. + + Generates a new unique ID for the user and adds them to the database. + Returns the created user with the assigned ID. + """ + new_id = max(users_db.keys(), default=0) + 1 + new_user = User( + id=new_id, + name=user.name, + email=user.email, + age=user.age, + active=True, + ) + users_db[new_id] = new_user + return new_user + + +@app.put("/users/{user_id}", response_model=User, tags=["users"], operation_id="update_user") +async def update_user(user_id: int, user: UpdateUserRequest): + """ + Update an existing user. + + Only updates the fields that are provided in the request. + Raises a 404 error if the user does not exist. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + + existing = users_db[user_id] + + if user.name is not None: + existing.name = user.name + if user.email is not None: + existing.email = user.email + if user.age is not None: + existing.age = user.age + if user.active is not None: + existing.active = user.active + + return existing + + +@app.delete("/users/{user_id}", tags=["users"], operation_id="delete_user") +async def delete_user(user_id: int): + """ + Delete a user. + + Removes the user from the database. + Raises a 404 error if the user does not exist. + """ + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + + del users_db[user_id] + return {"message": "User deleted successfully", "user_id": user_id} + + +@app.get("/users/search/", response_model=List[User], tags=["search"], operation_id="search_users") +async def search_users( + q: Optional[str] = Query(None, description="Search query for name or email"), + min_age: Optional[int] = Query(None, description="Minimum age filter"), + max_age: Optional[int] = Query(None, description="Maximum age filter"), + active: Optional[bool] = Query(None, description="Filter by active status"), +): + """ + Search users with various filter options. + + Allows searching by name/email, and filtering by age range and active status. + """ + results = list(users_db.values()) + + if q: + q = q.lower() + results = [ + user for user in results + if q in user.name.lower() or q in user.email.lower() + ] + + if min_age is not None: + results = [user for user in results if user.age is not None and user.age >= min_age] + + if max_age is not None: + results = [user for user in results if user.age is not None and user.age <= max_age] + + if active is not None: + results = [user for user in results if user.active == active] + + return results + + +@app.get("/health", tags=["system"], operation_id="health_check") +async def health_check(): + """ + Health check endpoint. + + Returns the current health status of the application. + """ + return { + "status": "healthy", + "users_count": len(users_db), + } + + +mcp = FastApiMCP(app) + +mcp.mount_http() + +mcp.mount_console( + mount_path="/mcp-console", + log_max_size=1000, +) + +print("=" * 60) +print("FastAPI-MCP Console Demo") +print("=" * 60) +print() +print("Available endpoints:") +print(" - Swagger UI: http://localhost:8000/docs") +print(" - MCP HTTP: http://localhost:8000/mcp") +print(" - MCP Console: http://localhost:8000/mcp-console") +print() +print("Access the Web Console to:") +print(" - View all registered MCP tools") +print(" - See tool descriptions, parameters, and responses") +print(" - Test tool executions with custom parameters") +print(" - View real-time call logs with timings") +print() +print("=" * 60) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/fastapi_mcp/__init__.py b/fastapi_mcp/__init__.py index f748712..9a1f2a8 100644 --- a/fastapi_mcp/__init__.py +++ b/fastapi_mcp/__init__.py @@ -14,10 +14,16 @@ from .server import FastApiMCP from .types import AuthConfig, OAuthMetadata - +from .console.logs import LogStore, LogEntry, LogStatus, get_global_log_store +from .console.router import ConsoleConfig __all__ = [ "FastApiMCP", "AuthConfig", "OAuthMetadata", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", ] diff --git a/fastapi_mcp/console/__init__.py b/fastapi_mcp/console/__init__.py new file mode 100644 index 0000000..b7caa53 --- /dev/null +++ b/fastapi_mcp/console/__init__.py @@ -0,0 +1,24 @@ +""" +FastAPI-MCP Web Console Module. + +Provides a lightweight web management console for visualizing and debugging MCP tools. +""" + +from .logs import ( + ToolCallLog, + LogStore, + LogEntry, + LogStatus, + get_global_log_store, +) +from .router import ConsoleConfig, get_console_router + +__all__ = [ + "ToolCallLog", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", + "get_console_router", +] diff --git a/fastapi_mcp/console/logs.py b/fastapi_mcp/console/logs.py new file mode 100644 index 0000000..bf28dfc --- /dev/null +++ b/fastapi_mcp/console/logs.py @@ -0,0 +1,203 @@ +""" +Log storage and types for MCP tool call logging. + +Provides in-memory storage for tool call logs with configurable max size. +""" + +import json +import time +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + + +class LogStatus(str, Enum): + """Status of a tool call log entry.""" + + PENDING = "pending" + SUCCESS = "success" + ERROR = "error" + + +@dataclass +class LogEntry: + """A single log entry for an MCP tool call.""" + + id: str + tool_name: str + arguments: Dict[str, Any] + status: LogStatus = LogStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + duration_ms: Optional[float] = None + method: Optional[str] = None + path: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert log entry to a dictionary for JSON serialization.""" + start_datetime = datetime.fromtimestamp(self.start_time).isoformat() + end_datetime = datetime.fromtimestamp(self.end_time).isoformat() if self.end_time else None + + return { + "id": self.id, + "tool_name": self.tool_name, + "arguments": self.arguments, + "status": self.status.value, + "result": self.result, + "error": self.error, + "start_time": self.start_time, + "start_datetime": start_datetime, + "end_time": self.end_time, + "end_datetime": end_datetime, + "duration_ms": self.duration_ms, + "method": self.method, + "path": self.path, + } + + def complete(self, result: Any, error: Optional[str] = None) -> None: + """Mark the log entry as completed.""" + self.end_time = time.time() + self.duration_ms = (self.end_time - self.start_time) * 1000 + + if error: + self.status = LogStatus.ERROR + self.error = error + else: + self.status = LogStatus.SUCCESS + try: + self.result = json.dumps(result, indent=2, ensure_ascii=False, default=str) + except Exception: + self.result = str(result) + + +ToolCallLog = LogEntry + + +class LogStore: + """In-memory storage for tool call logs.""" + + def __init__(self, max_size: int = 1000): + """Initialize the log store. + + Args: + max_size: Maximum number of log entries to keep. Oldest entries are removed when limit is reached. + """ + self._logs: List[LogEntry] = [] + self._max_size = max_size + self._counter: int = 0 + + def _generate_id(self) -> str: + """Generate a unique log entry ID.""" + self._counter += 1 + return f"log_{self._counter}_{int(time.time() * 1000)}" + + def create_entry( + self, + tool_name: str, + arguments: Dict[str, Any], + method: Optional[str] = None, + path: Optional[str] = None, + ) -> LogEntry: + """Create a new log entry for a pending tool call. + + Args: + tool_name: Name of the tool being called + arguments: Arguments passed to the tool + method: HTTP method (if applicable) + path: API path (if applicable) + + Returns: + The created LogEntry + """ + entry = LogEntry( + id=self._generate_id(), + tool_name=tool_name, + arguments=arguments, + method=method, + path=path, + ) + + self._logs.append(entry) + + if len(self._logs) > self._max_size: + self._logs = self._logs[-self._max_size :] + + return entry + + def get_all(self, limit: Optional[int] = None) -> List[LogEntry]: + """Get all log entries, newest first. + + Args: + limit: Maximum number of entries to return. If None, returns all. + + Returns: + List of log entries, newest first + """ + entries = list(reversed(self._logs)) + if limit is not None: + entries = entries[:limit] + return entries + + def get_by_id(self, log_id: str) -> Optional[LogEntry]: + """Get a log entry by its ID. + + Args: + log_id: The log entry ID + + Returns: + The LogEntry if found, None otherwise + """ + for entry in self._logs: + if entry.id == log_id: + return entry + return None + + def clear(self) -> int: + """Clear all log entries. + + Returns: + Number of entries cleared + """ + count = len(self._logs) + self._logs.clear() + return count + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the logs. + + Returns: + Dictionary with statistics + """ + total = len(self._logs) + success = sum(1 for e in self._logs if e.status == LogStatus.SUCCESS) + error = sum(1 for e in self._logs if e.status == LogStatus.ERROR) + pending = sum(1 for e in self._logs if e.status == LogStatus.PENDING) + + durations = [e.duration_ms for e in self._logs if e.duration_ms is not None] + avg_duration = sum(durations) / len(durations) if durations else 0 + + return { + "total": total, + "success": success, + "error": error, + "pending": pending, + "avg_duration_ms": round(avg_duration, 2), + } + + +_global_log_store: Optional[LogStore] = None + + +def get_global_log_store() -> LogStore: + """Get or create the global log store instance. + + Returns: + The global LogStore instance + """ + global _global_log_store + if _global_log_store is None: + _global_log_store = LogStore() + return _global_log_store diff --git a/fastapi_mcp/console/router.py b/fastapi_mcp/console/router.py new file mode 100644 index 0000000..432cf66 --- /dev/null +++ b/fastapi_mcp/console/router.py @@ -0,0 +1,1407 @@ +""" +Console API Router for FastAPI-MCP. + +Provides API endpoints for the web management console and serves the frontend page. +""" + +import json +from typing import Any, Callable, Dict, List, Optional, Sequence + +from fastapi import APIRouter, Depends, HTTPException, Request, params +from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import BaseModel + +from fastapi_mcp.console.logs import LogStore, get_global_log_store +from fastapi_mcp.types import AuthConfig + + +class ConsoleConfig(BaseModel): + """Configuration for the MCP Console.""" + + enabled: bool = True + mount_path: str = "/mcp-console" + log_max_size: int = 1000 + dependencies: Optional[Sequence[params.Depends]] = None + + +class ToolExecuteRequest(BaseModel): + """Request model for executing a tool.""" + + arguments: Dict[str, Any] = {} + + +CONSOLE_HTML = """ + + + + + MCP Console - FastAPI-MCP + + + +
+
+ + +
+
+ +
+
+
0
+
已注册工具
+
+
+
0
+
总调用次数
+
+
+
0
+
成功调用
+
+
+
0
+
失败调用
+
+
+ +
+ +
+
+ 已注册的 MCP 工具 + +
+
+
+ + +
+
+ 工具调用日志 +
+ + +
+
+
+
+
+ + + + +""" + + +def get_console_router( + log_store: LogStore, + tools_getter: Callable[[], List[Any]], + operation_map_getter: Callable[[], Dict[str, Any]], + execute_tool_callback: Callable[[str, Dict[str, Any]], Any], + config: ConsoleConfig, +) -> APIRouter: + """Create the console API router. + + Args: + log_store: The log store instance + tools_getter: Function that returns the list of tools + operation_map_getter: Function that returns the operation map + execute_tool_callback: Callback to execute a tool + config: Console configuration + + Returns: + Configured APIRouter + """ + router = APIRouter() + dependencies = config.dependencies or [] + + @router.get("/", response_class=HTMLResponse, include_in_schema=False) + async def get_console_page() -> HTMLResponse: + """Serve the console HTML page.""" + return HTMLResponse(content=CONSOLE_HTML) + + @router.get("/api/tools", include_in_schema=False, dependencies=dependencies) + async def get_tools() -> Dict[str, Any]: + """Get all registered MCP tools. + + Returns: + Dictionary with tools and operation_map + """ + tools = tools_getter() + operation_map = operation_map_getter() + + serializable_tools = [] + for tool in tools: + if hasattr(tool, "model_dump"): + serializable_tools.append(tool.model_dump()) + elif hasattr(tool, "__dict__"): + serializable_tools.append(tool.__dict__) + else: + serializable_tools.append(dict(tool)) + + return {"tools": serializable_tools, "operation_map": operation_map} + + @router.get("/api/tools/{tool_name}", include_in_schema=False, dependencies=dependencies) + async def get_tool(tool_name: str) -> Dict[str, Any]: + """Get a specific tool by name. + + Args: + tool_name: Name of the tool + + Returns: + Tool information + """ + tools = tools_getter() + operation_map = operation_map_getter() + + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") + + tool_dict = tool.model_dump() if hasattr(tool, "model_dump") else dict(tool) + op_info = operation_map.get(tool_name, {}) + + return {"tool": tool_dict, "operation": op_info} + + @router.post("/api/tools/{tool_name}/execute", include_in_schema=False, dependencies=dependencies) + async def execute_tool(tool_name: str, request: Request, body: ToolExecuteRequest) -> Any: + """Execute an MCP tool. + + Args: + tool_name: Name of the tool to execute + request: FastAPI request object + body: Request body containing arguments + + Returns: + Tool execution result + """ + tools = tools_getter() + operation_map = operation_map_getter() + + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") + + op_info = operation_map.get(tool_name, {}) + + log_entry = log_store.create_entry( + tool_name=tool_name, + arguments=body.arguments, + method=op_info.get("method"), + path=op_info.get("path"), + ) + + try: + result = await execute_tool_callback(tool_name, body.arguments) + log_entry.complete(result=result) + return result + except Exception as e: + log_entry.complete(result=None, error=str(e)) + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/api/logs", include_in_schema=False, dependencies=dependencies) + async def get_logs(limit: Optional[int] = 100) -> Dict[str, Any]: + """Get tool call logs. + + Args: + limit: Maximum number of logs to return + + Returns: + List of log entries + """ + entries = log_store.get_all(limit=limit) + return {"logs": [entry.to_dict() for entry in entries]} + + @router.get("/api/logs/{log_id}", include_in_schema=False, dependencies=dependencies) + async def get_log(log_id: str) -> Dict[str, Any]: + """Get a specific log entry by ID. + + Args: + log_id: The log entry ID + + Returns: + Log entry + """ + entry = log_store.get_by_id(log_id) + if entry is None: + raise HTTPException(status_code=404, detail=f"Log not found: {log_id}") + return entry.to_dict() + + @router.delete("/api/logs", include_in_schema=False, dependencies=dependencies) + async def clear_logs() -> Dict[str, Any]: + """Clear all log entries. + + Returns: + Count of cleared entries + """ + count = log_store.clear() + return {"cleared": count} + + @router.get("/api/stats", include_in_schema=False, dependencies=dependencies) + async def get_stats() -> Dict[str, Any]: + """Get statistics about tool calls. + + Returns: + Statistics dictionary + """ + return log_store.get_stats() + + return router diff --git a/fastapi_mcp/server.py b/fastapi_mcp/server.py index bb75106..4b859e1 100644 --- a/fastapi_mcp/server.py +++ b/fastapi_mcp/server.py @@ -12,6 +12,8 @@ from fastapi_mcp.transport.sse import FastApiSseTransport from fastapi_mcp.transport.http import FastApiHttpSessionManager from fastapi_mcp.types import HTTPRequestInfo, AuthConfig +from fastapi_mcp.console.logs import LogStore, get_global_log_store, LogEntry +from fastapi_mcp.console.router import ConsoleConfig, get_console_router import logging @@ -120,6 +122,8 @@ def __init__( self._forward_headers = {h.lower() for h in headers} self._http_transport: FastApiHttpSessionManager | None = None # Store reference to HTTP transport for cleanup + self._log_store: LogStore | None = None + self._console_mounted: bool = False self.setup_server() @@ -481,6 +485,192 @@ def mount( f"Unsupported transport: {transport}. Use mount_sse() or mount_http() instead." ) + def mount_console( + self, + router: Annotated[ + Optional[FastAPI | APIRouter], + Doc( + """ + The FastAPI app or APIRouter to mount the console to. If not provided, the console + will be mounted to the FastAPI app. + """ + ), + ] = None, + mount_path: Annotated[ + str, + Doc( + """ + Path where the MCP console will be mounted. + Defaults to '/mcp-console'. + """ + ), + ] = "/mcp-console", + log_max_size: Annotated[ + int, + Doc( + """ + Maximum number of log entries to keep. + Defaults to 1000. + """ + ), + ] = 1000, + dependencies: Annotated[ + Optional[Sequence[params.Depends]], + Doc( + """ + FastAPI dependencies to apply to all console endpoints. + Can be used for authentication and authorization. + """ + ), + ] = None, + ) -> None: + """ + Mount the MCP Web Console to **any** FastAPI app or APIRouter. + + The console provides: + - A web UI for viewing all registered MCP tools + - Online debugging capability to test tool executions + - Real-time logging of tool calls with arguments, responses, and execution times + + Args: + router: The FastAPI app or APIRouter to mount the console to + mount_path: Path where the console will be accessible + log_max_size: Maximum number of log entries to keep in memory + dependencies: FastAPI dependencies for authentication/authorization + + Example: + ```python + from fastapi import FastAPI + from fastapi_mcp import FastApiMCP + + app = FastAPI() + + # ... define your routes ... + + mcp = FastApiMCP(app) + mcp.mount_http() + mcp.mount_console() # Access at http://localhost:8000/mcp-console + + if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) + ``` + """ + if not mount_path.startswith("/"): + mount_path = f"/{mount_path}" + if mount_path.endswith("/"): + mount_path = mount_path[:-1] + + if not router: + router = self.fastapi + + assert isinstance(router, (FastAPI, APIRouter)), f"Invalid router type: {type(router)}" + + self._log_store = LogStore(max_size=log_max_size) + + config = ConsoleConfig( + enabled=True, + mount_path=mount_path, + log_max_size=log_max_size, + dependencies=dependencies, + ) + + def tools_getter() -> List[Any]: + return self.tools + + def operation_map_getter() -> Dict[str, Any]: + return self.operation_map + + async def execute_callback(tool_name: str, arguments: Dict[str, Any]) -> Any: + return await self.execute_tool_direct(tool_name, arguments) + + console_router = get_console_router( + log_store=self._log_store, + tools_getter=tools_getter, + operation_map_getter=operation_map_getter, + execute_tool_callback=execute_callback, + config=config, + ) + + if isinstance(router, FastAPI): + router.include_router(console_router, prefix=mount_path) + else: + base_prefix = router.prefix if router.prefix else "" + full_prefix = base_prefix + mount_path + self.fastapi.include_router(console_router, prefix=full_prefix) + + self._console_mounted = True + logger.info(f"MCP Console mounted at {mount_path}") + + async def execute_tool_direct( + self, + tool_name: str, + arguments: Dict[str, Any], + ) -> Any: + """ + Execute a tool directly without going through the MCP protocol. + + This is used by the web console for direct tool execution. + + Args: + tool_name: Name of the tool to execute + arguments: Arguments for the tool + + Returns: + The tool execution result (parsed JSON if possible) + """ + if tool_name not in self.operation_map: + raise ValueError(f"Unknown tool: {tool_name}") + + operation = self.operation_map[tool_name] + path: str = operation["path"] + method: str = operation["method"] + parameters: List[Dict[str, Any]] = operation.get("parameters", []) + arguments = arguments.copy() if arguments else {} + + for param in parameters: + if param.get("in") == "path" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + path = path.replace(f"{{{param_name}}}", str(arguments.pop(param_name))) + + query = {} + for param in parameters: + if param.get("in") == "query" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + query[param_name] = arguments.pop(param_name) + + headers = {} + for param in parameters: + if param.get("in") == "header" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + headers[param_name] = arguments.pop(param_name) + + body = arguments if arguments else None + + logger.debug(f"Making {method.upper()} request to {path}") + response = await self._request(self._http_client, method, path, query, headers, body) + + try: + result = response.json() + except json.JSONDecodeError: + if hasattr(response, "text"): + result = response.text + else: + result = response.content + + if 400 <= response.status_code < 600: + raise ValueError( + f"Error calling {tool_name}. Status code: {response.status_code}. Response: {response.text}" + ) + + return result + async def _execute_api_tool( self, client: Annotated[httpx.AsyncClient, Doc("httpx client to use in API calls")], From 78316354b16be924e7011313cf956cfda0fb1051 Mon Sep 17 00:00:00 2001 From: GaoXiang233 <1679562189@qq.com> Date: Fri, 24 Apr 2026 15:36:29 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20=E9=A6=96=E6=AC=A1=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=E6=88=91=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/10_console_example.py | 89 ++++++++++++++++++++++++- verify_console.py | 115 +++++++++++++++++++++++++++++++++ 2 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 verify_console.py diff --git a/examples/10_console_example.py b/examples/10_console_example.py index 47ce2ab..b579843 100644 --- a/examples/10_console_example.py +++ b/examples/10_console_example.py @@ -9,12 +9,97 @@ Access the console at: http://localhost:8000/mcp-console """ +import sys +import os +import site +from pathlib import Path + +user_site = site.getusersitepackages() +if user_site and user_site not in sys.path: + sys.path.insert(0, user_site) + +project_root = Path(__file__).parent.parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +print("=" * 60) +print("Python Path Debug Info") +print("=" * 60) +print(f"Project root: {project_root}") +print(f"User site-packages: {user_site}") +print(f"Current file: {__file__}") +print() +print("sys.path:") +for i, p in enumerate(sys.path): + print(f" [{i}] {p}") +print() +print("Checking fastapi_mcp module location...") + +try: + import fastapi_mcp + print(f"fastapi_mcp imported from: {fastapi_mcp.__file__}") + print(f"fastapi_mcp version: {fastapi_mcp.__version__}") + + expected_server_py = project_root / 'fastapi_mcp' / 'server.py' + actual_server_py = Path(fastapi_mcp.__file__).parent / 'server.py' + + print() + print(f"Expected server.py: {expected_server_py}") + print(f"Actual module path: {Path(fastapi_mcp.__file__).parent}") + + if str(expected_server_py.parent) != str(Path(fastapi_mcp.__file__).parent): + print() + print("WARNING: Importing from different location!") + print("This may be an older installed version.") + + print() + print("Checking FastApiMCP methods...") + from fastapi_mcp import FastApiMCP + methods = [m for m in dir(FastApiMCP) if not m.startswith('_')] + print(f"FastApiMCP public methods: {methods}") + + if 'mount_console' not in methods: + print() + print("=" * 60) + print("ERROR: 'mount_console' method not found!") + print("=" * 60) + print() + print("This means Python is importing an older version of fastapi-mcp") + print("that doesn't have the new console feature.") + print() + print(f"Expected location: {expected_server_py}") + print(f"Actual location: {actual_server_py}") + print() + print("Solutions:") + print(" 1. Force reinstall local version:") + print(" pip install -e . --user --force-reinstall --no-deps") + print() + print(" 2. Or check what's in actual location:") + print(f" dir {actual_server_py.parent}") + print() + print(" 3. Or remove the older version from site-packages:") + print(f" pip uninstall fastapi-mcp") + print(f" Then run: pip install -e . --user") + sys.exit(1) + else: + print("✓ 'mount_console' method found!") + +except ImportError as e: + print() + print("=" * 60) + print(f"ERROR: Failed to import: {e}") + print("=" * 60) + print() + print("This usually means dependencies are not installed or not in path.") + print() + print("Try installing dependencies:") + print(" pip install fastapi uvicorn pydantic mcp httpx --user") + sys.exit(1) + from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from typing import List, Optional -from fastapi_mcp import FastApiMCP - app = FastAPI( title="MCP Console Demo", diff --git a/verify_console.py b/verify_console.py new file mode 100644 index 0000000..c5f3fcf --- /dev/null +++ b/verify_console.py @@ -0,0 +1,115 @@ +""" +Simple test script to verify the console feature is correctly implemented. +This doesn't require running the full server, just verifies the code structure. +""" + +import sys +import site +from pathlib import Path + +user_site = site.getusersitepackages() +if user_site and user_site not in sys.path: + sys.path.insert(0, user_site) + +project_root = Path(__file__).parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +print("=" * 60) +print("Verifying fastapi_mcp console implementation") +print("=" * 60) +print() + +print("1. Checking if console module exists...") +console_path = project_root / 'fastapi_mcp' / 'console' +if console_path.exists(): + print(f" [OK] Console directory exists: {console_path}") + + init_file = console_path / '__init__.py' + logs_file = console_path / 'logs.py' + router_file = console_path / 'router.py' + + for f in [init_file, logs_file, router_file]: + if f.exists(): + print(f" [OK] {f.name} exists") + else: + print(f" [FAIL] {f.name} MISSING!") +else: + print(f" [FAIL] Console directory NOT FOUND: {console_path}") + sys.exit(1) + +print() +print("2. Checking server.py for mount_console method...") +server_file = project_root / 'fastapi_mcp' / 'server.py' + +if not server_file.exists(): + print(f" [FAIL] server.py NOT FOUND: {server_file}") + sys.exit(1) + +with open(server_file, 'r', encoding='utf-8') as f: + server_content = f.read() + +checks = [ + ('mount_console', 'mount_console method'), + ('execute_tool_direct', 'execute_tool_direct method'), + ('from fastapi_mcp.console', 'console module import'), + ('LogStore', 'LogStore usage'), + ('ConsoleConfig', 'ConsoleConfig usage'), + ('from fastapi_mcp.console.logs import', 'logs module import'), + ('from fastapi_mcp.console.router import', 'router module import'), + ('_log_store', '_log_store attribute'), + ('_console_mounted', '_console_mounted flag'), +] + +all_pass = True +for keyword, description in checks: + if keyword in server_content: + print(f" [OK] {description} found") + else: + print(f" [FAIL] {description} NOT FOUND!") + all_pass = False + +print() +print("3. Checking __init__.py for new exports...") +init_file = project_root / 'fastapi_mcp' / '__init__.py' + +if not init_file.exists(): + print(f" [FAIL] __init__.py NOT FOUND") + sys.exit(1) + +with open(init_file, 'r', encoding='utf-8') as f: + init_content = f.read() + +export_checks = [ + ('LogStore', 'LogStore export'), + ('LogEntry', 'LogEntry export'), + ('LogStatus', 'LogStatus export'), + ('ConsoleConfig', 'ConsoleConfig export'), + ('get_global_log_store', 'get_global_log_store export'), +] + +for keyword, description in export_checks: + if keyword in init_content: + print(f" [OK] {description} found") + else: + print(f" [FAIL] {description} NOT FOUND!") + all_pass = False + +print() +print("=" * 60) +if all_pass: + print("[OK] All checks passed! The console feature is correctly implemented.") + print() + print("To run the example, you need to fix the pywin32 DLL issue.") + print() + print("Try running from project root with Python module mode:") + print(" cd e:\\test\\fastapi_mcp") + print(" python -c \"import sys; import site; sys.path.insert(0, '.'); sys.path.insert(0, site.getusersitepackages()); from fastapi_mcp.server import FastApiMCP; print('mount_console:', hasattr(FastApiMCP, 'mount_console'))\"") + print() + print("Or try reinstalling pywin32:") + print(" pip uninstall pywin32 -y") + print(" pip install pywin32 --user") +else: + print("[FAIL] Some checks failed. Please review the errors above.") + sys.exit(1) +print("=" * 60) From 92901981585942d4cc75723f88dfe1a21acb70b0 Mon Sep 17 00:00:00 2001 From: GaoXiang233 <1679562189@qq.com> Date: Fri, 24 Apr 2026 15:50:15 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=E9=A6=96=E6=AC=A1=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=E6=88=91=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/10_console_example.py | 314 +++++++++++++++++---------------- 1 file changed, 166 insertions(+), 148 deletions(-) diff --git a/examples/10_console_example.py b/examples/10_console_example.py index b579843..303aa35 100644 --- a/examples/10_console_example.py +++ b/examples/10_console_example.py @@ -1,16 +1,19 @@ """ -Example demonstrating the MCP Web Console feature. +FastAPI-MCP Console Example +============================ -This example shows how to: -1. Create a FastAPI app with some endpoints -2. Add MCP server capabilities -3. Mount the Web Console for visualization and debugging +IMPORTANT: Browser Access Instructions +---------------------------------------- +DO NOT use http://0.0.0.0:8000 in your browser! -Access the console at: http://localhost:8000/mcp-console +- '0.0.0.0' means the server listens on ALL network interfaces +- For browser access, use: http://localhost:8000/mcp-console +- Or use: http://127.0.0.1:8000/mcp-console + +This example also adds a redirect from root path '/' to '/mcp-console'. """ import sys -import os import site from pathlib import Path @@ -22,84 +25,48 @@ if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) -print("=" * 60) -print("Python Path Debug Info") -print("=" * 60) -print(f"Project root: {project_root}") -print(f"User site-packages: {user_site}") -print(f"Current file: {__file__}") -print() -print("sys.path:") -for i, p in enumerate(sys.path): - print(f" [{i}] {p}") +print("=" * 70) +print("FastAPI-MCP Console Example - Starting Up") +print("=" * 70) print() -print("Checking fastapi_mcp module location...") try: - import fastapi_mcp - print(f"fastapi_mcp imported from: {fastapi_mcp.__file__}") - print(f"fastapi_mcp version: {fastapi_mcp.__version__}") - - expected_server_py = project_root / 'fastapi_mcp' / 'server.py' - actual_server_py = Path(fastapi_mcp.__file__).parent / 'server.py' - - print() - print(f"Expected server.py: {expected_server_py}") - print(f"Actual module path: {Path(fastapi_mcp.__file__).parent}") - - if str(expected_server_py.parent) != str(Path(fastapi_mcp.__file__).parent): - print() - print("WARNING: Importing from different location!") - print("This may be an older installed version.") - - print() - print("Checking FastApiMCP methods...") from fastapi_mcp import FastApiMCP methods = [m for m in dir(FastApiMCP) if not m.startswith('_')] - print(f"FastApiMCP public methods: {methods}") if 'mount_console' not in methods: - print() - print("=" * 60) print("ERROR: 'mount_console' method not found!") - print("=" * 60) - print() - print("This means Python is importing an older version of fastapi-mcp") - print("that doesn't have the new console feature.") print() - print(f"Expected location: {expected_server_py}") - print(f"Actual location: {actual_server_py}") + print("This means Python is importing an older installed version,") + print("not the local development version with console feature.") print() - print("Solutions:") - print(" 1. Force reinstall local version:") - print(" pip install -e . --user --force-reinstall --no-deps") + print("Try this to fix:") + print(" 1. pip uninstall fastapi-mcp -y") + print(" 2. pip install -e . --user --force-reinstall --no-deps") print() - print(" 2. Or check what's in actual location:") - print(f" dir {actual_server_py.parent}") - print() - print(" 3. Or remove the older version from site-packages:") - print(f" pip uninstall fastapi-mcp") - print(f" Then run: pip install -e . --user") + print("Or check where it's importing from:") + import fastapi_mcp + print(f" Imported from: {fastapi_mcp.__file__}") + print(f" Expected from: {project_root / 'fastapi_mcp'}") sys.exit(1) - else: - print("✓ 'mount_console' method found!") - -except ImportError as e: - print() - print("=" * 60) - print(f"ERROR: Failed to import: {e}") - print("=" * 60) + + print("[OK] FastApiMCP loaded with mount_console method") + print(f" Available methods: {methods}") print() - print("This usually means dependencies are not installed or not in path.") + +except ImportError as e: + print(f"ERROR: Failed to import fastapi_mcp: {e}") print() print("Try installing dependencies:") print(" pip install fastapi uvicorn pydantic mcp httpx --user") sys.exit(1) -from fastapi import FastAPI, HTTPException, Query +from fastapi import FastAPI, HTTPException, Query, Request +from fastapi.responses import RedirectResponse, HTMLResponse from pydantic import BaseModel from typing import List, Optional +CONSOLE_PATH = "/mcp-console" app = FastAPI( title="MCP Console Demo", @@ -107,7 +74,6 @@ version="1.0.0", ) - class User(BaseModel): id: int name: str @@ -115,23 +81,19 @@ class User(BaseModel): age: Optional[int] = None active: bool = True - class CreateUserRequest(BaseModel): name: str email: str age: Optional[int] = None - class UpdateUserRequest(BaseModel): name: Optional[str] = None email: Optional[str] = None age: Optional[int] = None active: Optional[bool] = None - users_db: dict[int, User] = {} - sample_users = [ User(id=1, name="Alice Johnson", email="alice@example.com", age=30), User(id=2, name="Bob Smith", email="bob@example.com", age=25), @@ -140,6 +102,109 @@ class UpdateUserRequest(BaseModel): for user in sample_users: users_db[user.id] = user +@app.get("/", include_in_schema=False) +async def redirect_to_console(): + """ + Redirect root path to the MCP Console. + + This makes it easier for users - if they accidentally visit + the root path, they'll be redirected to the console. + """ + return RedirectResponse(url=CONSOLE_PATH) + +@app.get("/welcome", include_in_schema=False) +async def welcome_page(): + """ + Welcome page that shows all available endpoints. + """ + html = f""" + + + + FastAPI-MCP Console Demo + + + +

FastAPI-MCP Console Demo

+ +
+ Important: '0.0.0.0' is not a valid browser address! +
Use localhost or 127.0.0.1 instead. +
+ +

Welcome to the FastAPI-MCP Console Demo!

+ +

Available Endpoints:

+ +
+ Recommended: + Open MCP Console + - View tools, test executions, see logs +
+ +
+ Swagger UI - Interactive API documentation +
+ +
+ OpenAPI Schema - Raw API schema +
+ +
+ Health Check - Server status +
+ +

Access URLs (for browser):

+ + +

Server Info:

+ + + + """ + return HTMLResponse(content=html) @app.get("/users/", response_model=List[User], tags=["users"], operation_id="list_users") async def list_users( @@ -147,39 +212,22 @@ async def list_users( limit: int = Query(10, ge=1, le=100, description="Maximum number of users to return"), active_only: bool = Query(False, description="Only return active users"), ): - """ - List all users with pagination support. - - Returns a paginated list of users from the database. - """ + """List all users with pagination support.""" results = list(users_db.values()) - if active_only: results = [u for u in results if u.active] - return results[skip : skip + limit] - @app.get("/users/{user_id}", response_model=User, tags=["users"], operation_id="get_user") async def get_user(user_id: int): - """ - Get a specific user by ID. - - Returns the user details if found, otherwise raises a 404 error. - """ + """Get a specific user by ID.""" if user_id not in users_db: raise HTTPException(status_code=404, detail="User not found") return users_db[user_id] - @app.post("/users/", response_model=User, tags=["users"], operation_id="create_user") async def create_user(user: CreateUserRequest): - """ - Create a new user. - - Generates a new unique ID for the user and adds them to the database. - Returns the created user with the assigned ID. - """ + """Create a new user.""" new_id = max(users_db.keys(), default=0) + 1 new_user = User( id=new_id, @@ -191,20 +239,12 @@ async def create_user(user: CreateUserRequest): users_db[new_id] = new_user return new_user - @app.put("/users/{user_id}", response_model=User, tags=["users"], operation_id="update_user") async def update_user(user_id: int, user: UpdateUserRequest): - """ - Update an existing user. - - Only updates the fields that are provided in the request. - Raises a 404 error if the user does not exist. - """ + """Update an existing user.""" if user_id not in users_db: raise HTTPException(status_code=404, detail="User not found") - existing = users_db[user_id] - if user.name is not None: existing.name = user.name if user.email is not None: @@ -213,25 +253,16 @@ async def update_user(user_id: int, user: UpdateUserRequest): existing.age = user.age if user.active is not None: existing.active = user.active - return existing - @app.delete("/users/{user_id}", tags=["users"], operation_id="delete_user") async def delete_user(user_id: int): - """ - Delete a user. - - Removes the user from the database. - Raises a 404 error if the user does not exist. - """ + """Delete a user.""" if user_id not in users_db: raise HTTPException(status_code=404, detail="User not found") - del users_db[user_id] return {"message": "User deleted successfully", "user_id": user_id} - @app.get("/users/search/", response_model=List[User], tags=["search"], operation_id="search_users") async def search_users( q: Optional[str] = Query(None, description="Search query for name or email"), @@ -239,73 +270,60 @@ async def search_users( max_age: Optional[int] = Query(None, description="Maximum age filter"), active: Optional[bool] = Query(None, description="Filter by active status"), ): - """ - Search users with various filter options. - - Allows searching by name/email, and filtering by age range and active status. - """ + """Search users with various filter options.""" results = list(users_db.values()) - if q: q = q.lower() - results = [ - user for user in results - if q in user.name.lower() or q in user.email.lower() - ] - + results = [u for u in results if q in u.name.lower() or q in u.email.lower()] if min_age is not None: - results = [user for user in results if user.age is not None and user.age >= min_age] - + results = [u for u in results if u.age is not None and u.age >= min_age] if max_age is not None: - results = [user for user in results if user.age is not None and user.age <= max_age] - + results = [u for u in results if u.age is not None and u.age <= max_age] if active is not None: - results = [user for user in results if user.active == active] - + results = [u for u in results if u.active == active] return results - @app.get("/health", tags=["system"], operation_id="health_check") async def health_check(): - """ - Health check endpoint. - - Returns the current health status of the application. - """ + """Health check endpoint.""" return { "status": "healthy", "users_count": len(users_db), } - mcp = FastApiMCP(app) - -mcp.mount_http() - +mcp.mount_http(mount_path="/mcp") mcp.mount_console( - mount_path="/mcp-console", + mount_path=CONSOLE_PATH, log_max_size=1000, ) -print("=" * 60) -print("FastAPI-MCP Console Demo") -print("=" * 60) print() -print("Available endpoints:") -print(" - Swagger UI: http://localhost:8000/docs") -print(" - MCP HTTP: http://localhost:8000/mcp") -print(" - MCP Console: http://localhost:8000/mcp-console") +print("=" * 70) +print("Server is Ready!") +print("=" * 70) +print() +print("IMPORTANT: Browser Access URLs") +print(" - DO NOT use: http://0.0.0.0:8000") +print(" - USE THIS: http://localhost:8000" + CONSOLE_PATH) +print(" - OR: http://127.0.0.1:8000" + CONSOLE_PATH) +print() +print("Available Endpoints (localhost):") +print(" - MCP Console: http://localhost:8000" + CONSOLE_PATH) +print(" - Welcome Page: http://localhost:8000/welcome") +print(" - Swagger UI: http://localhost:8000/docs") +print(" - MCP HTTP: http://localhost:8000/mcp") print() -print("Access the Web Console to:") +print("Features:") +print(" - Root path '/' redirects to console") print(" - View all registered MCP tools") -print(" - See tool descriptions, parameters, and responses") print(" - Test tool executions with custom parameters") print(" - View real-time call logs with timings") print() -print("=" * 60) - +print("=" * 70) +print("Press Ctrl+C to stop the server") +print("=" * 70) if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000)