diff --git a/examples/10_console_example.py b/examples/10_console_example.py new file mode 100644 index 0000000..303aa35 --- /dev/null +++ b/examples/10_console_example.py @@ -0,0 +1,329 @@ +""" +FastAPI-MCP Console Example +============================ + +IMPORTANT: Browser Access Instructions +---------------------------------------- +DO NOT use http://0.0.0.0:8000 in your browser! + +- '0.0.0.0' means the server listens on ALL network interfaces +- For browser access, use: http://localhost:8000/mcp-console +- Or use: http://127.0.0.1:8000/mcp-console + +This example also adds a redirect from root path '/' to '/mcp-console'. +""" + +import sys +import site +from pathlib import Path + +user_site = site.getusersitepackages() +if user_site and user_site not in sys.path: + sys.path.insert(0, user_site) + +project_root = Path(__file__).parent.parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +print("=" * 70) +print("FastAPI-MCP Console Example - Starting Up") +print("=" * 70) +print() + +try: + from fastapi_mcp import FastApiMCP + methods = [m for m in dir(FastApiMCP) if not m.startswith('_')] + + if 'mount_console' not in methods: + print("ERROR: 'mount_console' method not found!") + print() + print("This means Python is importing an older installed version,") + print("not the local development version with console feature.") + print() + print("Try this to fix:") + print(" 1. pip uninstall fastapi-mcp -y") + print(" 2. pip install -e . --user --force-reinstall --no-deps") + print() + print("Or check where it's importing from:") + import fastapi_mcp + print(f" Imported from: {fastapi_mcp.__file__}") + print(f" Expected from: {project_root / 'fastapi_mcp'}") + sys.exit(1) + + print("[OK] FastApiMCP loaded with mount_console method") + print(f" Available methods: {methods}") + print() + +except ImportError as e: + print(f"ERROR: Failed to import fastapi_mcp: {e}") + print() + print("Try installing dependencies:") + print(" pip install fastapi uvicorn pydantic mcp httpx --user") + sys.exit(1) + +from fastapi import FastAPI, HTTPException, Query, Request +from fastapi.responses import RedirectResponse, HTMLResponse +from pydantic import BaseModel +from typing import List, Optional + +CONSOLE_PATH = "/mcp-console" + +app = FastAPI( + title="MCP Console Demo", + description="A demo FastAPI app with MCP Web Console", + version="1.0.0", +) + +class User(BaseModel): + id: int + name: str + email: str + age: Optional[int] = None + active: bool = True + +class CreateUserRequest(BaseModel): + name: str + email: str + age: Optional[int] = None + +class UpdateUserRequest(BaseModel): + name: Optional[str] = None + email: Optional[str] = None + age: Optional[int] = None + active: Optional[bool] = None + +users_db: dict[int, User] = {} + +sample_users = [ + User(id=1, name="Alice Johnson", email="alice@example.com", age=30), + User(id=2, name="Bob Smith", email="bob@example.com", age=25), + User(id=3, name="Charlie Brown", email="charlie@example.com", age=35), +] +for user in sample_users: + users_db[user.id] = user + +@app.get("/", include_in_schema=False) +async def redirect_to_console(): + """ + Redirect root path to the MCP Console. + + This makes it easier for users - if they accidentally visit + the root path, they'll be redirected to the console. + """ + return RedirectResponse(url=CONSOLE_PATH) + +@app.get("/welcome", include_in_schema=False) +async def welcome_page(): + """ + Welcome page that shows all available endpoints. + """ + html = f""" + + + + FastAPI-MCP Console Demo + + + +

FastAPI-MCP Console Demo

+ +
+ Important: '0.0.0.0' is not a valid browser address! +
Use localhost or 127.0.0.1 instead. +
+ +

Welcome to the FastAPI-MCP Console Demo!

+ +

Available Endpoints:

+ +
+ Recommended: + Open MCP Console + - View tools, test executions, see logs +
+ +
+ Swagger UI - Interactive API documentation +
+ +
+ OpenAPI Schema - Raw API schema +
+ +
+ Health Check - Server status +
+ +

Access URLs (for browser):

+ + +

Server Info:

+ + + + """ + return HTMLResponse(content=html) + +@app.get("/users/", response_model=List[User], tags=["users"], operation_id="list_users") +async def list_users( + skip: int = Query(0, ge=0, description="Number of users to skip"), + limit: int = Query(10, ge=1, le=100, description="Maximum number of users to return"), + active_only: bool = Query(False, description="Only return active users"), +): + """List all users with pagination support.""" + results = list(users_db.values()) + if active_only: + results = [u for u in results if u.active] + return results[skip : skip + limit] + +@app.get("/users/{user_id}", response_model=User, tags=["users"], operation_id="get_user") +async def get_user(user_id: int): + """Get a specific user by ID.""" + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + return users_db[user_id] + +@app.post("/users/", response_model=User, tags=["users"], operation_id="create_user") +async def create_user(user: CreateUserRequest): + """Create a new user.""" + new_id = max(users_db.keys(), default=0) + 1 + new_user = User( + id=new_id, + name=user.name, + email=user.email, + age=user.age, + active=True, + ) + users_db[new_id] = new_user + return new_user + +@app.put("/users/{user_id}", response_model=User, tags=["users"], operation_id="update_user") +async def update_user(user_id: int, user: UpdateUserRequest): + """Update an existing user.""" + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + existing = users_db[user_id] + if user.name is not None: + existing.name = user.name + if user.email is not None: + existing.email = user.email + if user.age is not None: + existing.age = user.age + if user.active is not None: + existing.active = user.active + return existing + +@app.delete("/users/{user_id}", tags=["users"], operation_id="delete_user") +async def delete_user(user_id: int): + """Delete a user.""" + if user_id not in users_db: + raise HTTPException(status_code=404, detail="User not found") + del users_db[user_id] + return {"message": "User deleted successfully", "user_id": user_id} + +@app.get("/users/search/", response_model=List[User], tags=["search"], operation_id="search_users") +async def search_users( + q: Optional[str] = Query(None, description="Search query for name or email"), + min_age: Optional[int] = Query(None, description="Minimum age filter"), + max_age: Optional[int] = Query(None, description="Maximum age filter"), + active: Optional[bool] = Query(None, description="Filter by active status"), +): + """Search users with various filter options.""" + results = list(users_db.values()) + if q: + q = q.lower() + results = [u for u in results if q in u.name.lower() or q in u.email.lower()] + if min_age is not None: + results = [u for u in results if u.age is not None and u.age >= min_age] + if max_age is not None: + results = [u for u in results if u.age is not None and u.age <= max_age] + if active is not None: + results = [u for u in results if u.active == active] + return results + +@app.get("/health", tags=["system"], operation_id="health_check") +async def health_check(): + """Health check endpoint.""" + return { + "status": "healthy", + "users_count": len(users_db), + } + +mcp = FastApiMCP(app) +mcp.mount_http(mount_path="/mcp") +mcp.mount_console( + mount_path=CONSOLE_PATH, + log_max_size=1000, +) + +print() +print("=" * 70) +print("Server is Ready!") +print("=" * 70) +print() +print("IMPORTANT: Browser Access URLs") +print(" - DO NOT use: http://0.0.0.0:8000") +print(" - USE THIS: http://localhost:8000" + CONSOLE_PATH) +print(" - OR: http://127.0.0.1:8000" + CONSOLE_PATH) +print() +print("Available Endpoints (localhost):") +print(" - MCP Console: http://localhost:8000" + CONSOLE_PATH) +print(" - Welcome Page: http://localhost:8000/welcome") +print(" - Swagger UI: http://localhost:8000/docs") +print(" - MCP HTTP: http://localhost:8000/mcp") +print() +print("Features:") +print(" - Root path '/' redirects to console") +print(" - View all registered MCP tools") +print(" - Test tool executions with custom parameters") +print(" - View real-time call logs with timings") +print() +print("=" * 70) +print("Press Ctrl+C to stop the server") +print("=" * 70) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/fastapi_mcp/__init__.py b/fastapi_mcp/__init__.py index f748712..9a1f2a8 100644 --- a/fastapi_mcp/__init__.py +++ b/fastapi_mcp/__init__.py @@ -14,10 +14,16 @@ from .server import FastApiMCP from .types import AuthConfig, OAuthMetadata - +from .console.logs import LogStore, LogEntry, LogStatus, get_global_log_store +from .console.router import ConsoleConfig __all__ = [ "FastApiMCP", "AuthConfig", "OAuthMetadata", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", ] diff --git a/fastapi_mcp/console/__init__.py b/fastapi_mcp/console/__init__.py new file mode 100644 index 0000000..b7caa53 --- /dev/null +++ b/fastapi_mcp/console/__init__.py @@ -0,0 +1,24 @@ +""" +FastAPI-MCP Web Console Module. + +Provides a lightweight web management console for visualizing and debugging MCP tools. +""" + +from .logs import ( + ToolCallLog, + LogStore, + LogEntry, + LogStatus, + get_global_log_store, +) +from .router import ConsoleConfig, get_console_router + +__all__ = [ + "ToolCallLog", + "LogStore", + "LogEntry", + "LogStatus", + "get_global_log_store", + "ConsoleConfig", + "get_console_router", +] diff --git a/fastapi_mcp/console/logs.py b/fastapi_mcp/console/logs.py new file mode 100644 index 0000000..bf28dfc --- /dev/null +++ b/fastapi_mcp/console/logs.py @@ -0,0 +1,203 @@ +""" +Log storage and types for MCP tool call logging. + +Provides in-memory storage for tool call logs with configurable max size. +""" + +import json +import time +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + + +class LogStatus(str, Enum): + """Status of a tool call log entry.""" + + PENDING = "pending" + SUCCESS = "success" + ERROR = "error" + + +@dataclass +class LogEntry: + """A single log entry for an MCP tool call.""" + + id: str + tool_name: str + arguments: Dict[str, Any] + status: LogStatus = LogStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + duration_ms: Optional[float] = None + method: Optional[str] = None + path: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert log entry to a dictionary for JSON serialization.""" + start_datetime = datetime.fromtimestamp(self.start_time).isoformat() + end_datetime = datetime.fromtimestamp(self.end_time).isoformat() if self.end_time else None + + return { + "id": self.id, + "tool_name": self.tool_name, + "arguments": self.arguments, + "status": self.status.value, + "result": self.result, + "error": self.error, + "start_time": self.start_time, + "start_datetime": start_datetime, + "end_time": self.end_time, + "end_datetime": end_datetime, + "duration_ms": self.duration_ms, + "method": self.method, + "path": self.path, + } + + def complete(self, result: Any, error: Optional[str] = None) -> None: + """Mark the log entry as completed.""" + self.end_time = time.time() + self.duration_ms = (self.end_time - self.start_time) * 1000 + + if error: + self.status = LogStatus.ERROR + self.error = error + else: + self.status = LogStatus.SUCCESS + try: + self.result = json.dumps(result, indent=2, ensure_ascii=False, default=str) + except Exception: + self.result = str(result) + + +ToolCallLog = LogEntry + + +class LogStore: + """In-memory storage for tool call logs.""" + + def __init__(self, max_size: int = 1000): + """Initialize the log store. + + Args: + max_size: Maximum number of log entries to keep. Oldest entries are removed when limit is reached. + """ + self._logs: List[LogEntry] = [] + self._max_size = max_size + self._counter: int = 0 + + def _generate_id(self) -> str: + """Generate a unique log entry ID.""" + self._counter += 1 + return f"log_{self._counter}_{int(time.time() * 1000)}" + + def create_entry( + self, + tool_name: str, + arguments: Dict[str, Any], + method: Optional[str] = None, + path: Optional[str] = None, + ) -> LogEntry: + """Create a new log entry for a pending tool call. + + Args: + tool_name: Name of the tool being called + arguments: Arguments passed to the tool + method: HTTP method (if applicable) + path: API path (if applicable) + + Returns: + The created LogEntry + """ + entry = LogEntry( + id=self._generate_id(), + tool_name=tool_name, + arguments=arguments, + method=method, + path=path, + ) + + self._logs.append(entry) + + if len(self._logs) > self._max_size: + self._logs = self._logs[-self._max_size :] + + return entry + + def get_all(self, limit: Optional[int] = None) -> List[LogEntry]: + """Get all log entries, newest first. + + Args: + limit: Maximum number of entries to return. If None, returns all. + + Returns: + List of log entries, newest first + """ + entries = list(reversed(self._logs)) + if limit is not None: + entries = entries[:limit] + return entries + + def get_by_id(self, log_id: str) -> Optional[LogEntry]: + """Get a log entry by its ID. + + Args: + log_id: The log entry ID + + Returns: + The LogEntry if found, None otherwise + """ + for entry in self._logs: + if entry.id == log_id: + return entry + return None + + def clear(self) -> int: + """Clear all log entries. + + Returns: + Number of entries cleared + """ + count = len(self._logs) + self._logs.clear() + return count + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the logs. + + Returns: + Dictionary with statistics + """ + total = len(self._logs) + success = sum(1 for e in self._logs if e.status == LogStatus.SUCCESS) + error = sum(1 for e in self._logs if e.status == LogStatus.ERROR) + pending = sum(1 for e in self._logs if e.status == LogStatus.PENDING) + + durations = [e.duration_ms for e in self._logs if e.duration_ms is not None] + avg_duration = sum(durations) / len(durations) if durations else 0 + + return { + "total": total, + "success": success, + "error": error, + "pending": pending, + "avg_duration_ms": round(avg_duration, 2), + } + + +_global_log_store: Optional[LogStore] = None + + +def get_global_log_store() -> LogStore: + """Get or create the global log store instance. + + Returns: + The global LogStore instance + """ + global _global_log_store + if _global_log_store is None: + _global_log_store = LogStore() + return _global_log_store diff --git a/fastapi_mcp/console/router.py b/fastapi_mcp/console/router.py new file mode 100644 index 0000000..432cf66 --- /dev/null +++ b/fastapi_mcp/console/router.py @@ -0,0 +1,1407 @@ +""" +Console API Router for FastAPI-MCP. + +Provides API endpoints for the web management console and serves the frontend page. +""" + +import json +from typing import Any, Callable, Dict, List, Optional, Sequence + +from fastapi import APIRouter, Depends, HTTPException, Request, params +from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import BaseModel + +from fastapi_mcp.console.logs import LogStore, get_global_log_store +from fastapi_mcp.types import AuthConfig + + +class ConsoleConfig(BaseModel): + """Configuration for the MCP Console.""" + + enabled: bool = True + mount_path: str = "/mcp-console" + log_max_size: int = 1000 + dependencies: Optional[Sequence[params.Depends]] = None + + +class ToolExecuteRequest(BaseModel): + """Request model for executing a tool.""" + + arguments: Dict[str, Any] = {} + + +CONSOLE_HTML = """ + + + + + MCP Console - FastAPI-MCP + + + +
+
+ + +
+
+ +
+
+
0
+
已注册工具
+
+
+
0
+
总调用次数
+
+
+
0
+
成功调用
+
+
+
0
+
失败调用
+
+
+ +
+ +
+
+ 已注册的 MCP 工具 + +
+
+
+ + +
+
+ 工具调用日志 +
+ + +
+
+
+
+
+ + + + +""" + + +def get_console_router( + log_store: LogStore, + tools_getter: Callable[[], List[Any]], + operation_map_getter: Callable[[], Dict[str, Any]], + execute_tool_callback: Callable[[str, Dict[str, Any]], Any], + config: ConsoleConfig, +) -> APIRouter: + """Create the console API router. + + Args: + log_store: The log store instance + tools_getter: Function that returns the list of tools + operation_map_getter: Function that returns the operation map + execute_tool_callback: Callback to execute a tool + config: Console configuration + + Returns: + Configured APIRouter + """ + router = APIRouter() + dependencies = config.dependencies or [] + + @router.get("/", response_class=HTMLResponse, include_in_schema=False) + async def get_console_page() -> HTMLResponse: + """Serve the console HTML page.""" + return HTMLResponse(content=CONSOLE_HTML) + + @router.get("/api/tools", include_in_schema=False, dependencies=dependencies) + async def get_tools() -> Dict[str, Any]: + """Get all registered MCP tools. + + Returns: + Dictionary with tools and operation_map + """ + tools = tools_getter() + operation_map = operation_map_getter() + + serializable_tools = [] + for tool in tools: + if hasattr(tool, "model_dump"): + serializable_tools.append(tool.model_dump()) + elif hasattr(tool, "__dict__"): + serializable_tools.append(tool.__dict__) + else: + serializable_tools.append(dict(tool)) + + return {"tools": serializable_tools, "operation_map": operation_map} + + @router.get("/api/tools/{tool_name}", include_in_schema=False, dependencies=dependencies) + async def get_tool(tool_name: str) -> Dict[str, Any]: + """Get a specific tool by name. + + Args: + tool_name: Name of the tool + + Returns: + Tool information + """ + tools = tools_getter() + operation_map = operation_map_getter() + + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") + + tool_dict = tool.model_dump() if hasattr(tool, "model_dump") else dict(tool) + op_info = operation_map.get(tool_name, {}) + + return {"tool": tool_dict, "operation": op_info} + + @router.post("/api/tools/{tool_name}/execute", include_in_schema=False, dependencies=dependencies) + async def execute_tool(tool_name: str, request: Request, body: ToolExecuteRequest) -> Any: + """Execute an MCP tool. + + Args: + tool_name: Name of the tool to execute + request: FastAPI request object + body: Request body containing arguments + + Returns: + Tool execution result + """ + tools = tools_getter() + operation_map = operation_map_getter() + + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}") + + op_info = operation_map.get(tool_name, {}) + + log_entry = log_store.create_entry( + tool_name=tool_name, + arguments=body.arguments, + method=op_info.get("method"), + path=op_info.get("path"), + ) + + try: + result = await execute_tool_callback(tool_name, body.arguments) + log_entry.complete(result=result) + return result + except Exception as e: + log_entry.complete(result=None, error=str(e)) + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/api/logs", include_in_schema=False, dependencies=dependencies) + async def get_logs(limit: Optional[int] = 100) -> Dict[str, Any]: + """Get tool call logs. + + Args: + limit: Maximum number of logs to return + + Returns: + List of log entries + """ + entries = log_store.get_all(limit=limit) + return {"logs": [entry.to_dict() for entry in entries]} + + @router.get("/api/logs/{log_id}", include_in_schema=False, dependencies=dependencies) + async def get_log(log_id: str) -> Dict[str, Any]: + """Get a specific log entry by ID. + + Args: + log_id: The log entry ID + + Returns: + Log entry + """ + entry = log_store.get_by_id(log_id) + if entry is None: + raise HTTPException(status_code=404, detail=f"Log not found: {log_id}") + return entry.to_dict() + + @router.delete("/api/logs", include_in_schema=False, dependencies=dependencies) + async def clear_logs() -> Dict[str, Any]: + """Clear all log entries. + + Returns: + Count of cleared entries + """ + count = log_store.clear() + return {"cleared": count} + + @router.get("/api/stats", include_in_schema=False, dependencies=dependencies) + async def get_stats() -> Dict[str, Any]: + """Get statistics about tool calls. + + Returns: + Statistics dictionary + """ + return log_store.get_stats() + + return router diff --git a/fastapi_mcp/server.py b/fastapi_mcp/server.py index bb75106..4b859e1 100644 --- a/fastapi_mcp/server.py +++ b/fastapi_mcp/server.py @@ -12,6 +12,8 @@ from fastapi_mcp.transport.sse import FastApiSseTransport from fastapi_mcp.transport.http import FastApiHttpSessionManager from fastapi_mcp.types import HTTPRequestInfo, AuthConfig +from fastapi_mcp.console.logs import LogStore, get_global_log_store, LogEntry +from fastapi_mcp.console.router import ConsoleConfig, get_console_router import logging @@ -120,6 +122,8 @@ def __init__( self._forward_headers = {h.lower() for h in headers} self._http_transport: FastApiHttpSessionManager | None = None # Store reference to HTTP transport for cleanup + self._log_store: LogStore | None = None + self._console_mounted: bool = False self.setup_server() @@ -481,6 +485,192 @@ def mount( f"Unsupported transport: {transport}. Use mount_sse() or mount_http() instead." ) + def mount_console( + self, + router: Annotated[ + Optional[FastAPI | APIRouter], + Doc( + """ + The FastAPI app or APIRouter to mount the console to. If not provided, the console + will be mounted to the FastAPI app. + """ + ), + ] = None, + mount_path: Annotated[ + str, + Doc( + """ + Path where the MCP console will be mounted. + Defaults to '/mcp-console'. + """ + ), + ] = "/mcp-console", + log_max_size: Annotated[ + int, + Doc( + """ + Maximum number of log entries to keep. + Defaults to 1000. + """ + ), + ] = 1000, + dependencies: Annotated[ + Optional[Sequence[params.Depends]], + Doc( + """ + FastAPI dependencies to apply to all console endpoints. + Can be used for authentication and authorization. + """ + ), + ] = None, + ) -> None: + """ + Mount the MCP Web Console to **any** FastAPI app or APIRouter. + + The console provides: + - A web UI for viewing all registered MCP tools + - Online debugging capability to test tool executions + - Real-time logging of tool calls with arguments, responses, and execution times + + Args: + router: The FastAPI app or APIRouter to mount the console to + mount_path: Path where the console will be accessible + log_max_size: Maximum number of log entries to keep in memory + dependencies: FastAPI dependencies for authentication/authorization + + Example: + ```python + from fastapi import FastAPI + from fastapi_mcp import FastApiMCP + + app = FastAPI() + + # ... define your routes ... + + mcp = FastApiMCP(app) + mcp.mount_http() + mcp.mount_console() # Access at http://localhost:8000/mcp-console + + if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) + ``` + """ + if not mount_path.startswith("/"): + mount_path = f"/{mount_path}" + if mount_path.endswith("/"): + mount_path = mount_path[:-1] + + if not router: + router = self.fastapi + + assert isinstance(router, (FastAPI, APIRouter)), f"Invalid router type: {type(router)}" + + self._log_store = LogStore(max_size=log_max_size) + + config = ConsoleConfig( + enabled=True, + mount_path=mount_path, + log_max_size=log_max_size, + dependencies=dependencies, + ) + + def tools_getter() -> List[Any]: + return self.tools + + def operation_map_getter() -> Dict[str, Any]: + return self.operation_map + + async def execute_callback(tool_name: str, arguments: Dict[str, Any]) -> Any: + return await self.execute_tool_direct(tool_name, arguments) + + console_router = get_console_router( + log_store=self._log_store, + tools_getter=tools_getter, + operation_map_getter=operation_map_getter, + execute_tool_callback=execute_callback, + config=config, + ) + + if isinstance(router, FastAPI): + router.include_router(console_router, prefix=mount_path) + else: + base_prefix = router.prefix if router.prefix else "" + full_prefix = base_prefix + mount_path + self.fastapi.include_router(console_router, prefix=full_prefix) + + self._console_mounted = True + logger.info(f"MCP Console mounted at {mount_path}") + + async def execute_tool_direct( + self, + tool_name: str, + arguments: Dict[str, Any], + ) -> Any: + """ + Execute a tool directly without going through the MCP protocol. + + This is used by the web console for direct tool execution. + + Args: + tool_name: Name of the tool to execute + arguments: Arguments for the tool + + Returns: + The tool execution result (parsed JSON if possible) + """ + if tool_name not in self.operation_map: + raise ValueError(f"Unknown tool: {tool_name}") + + operation = self.operation_map[tool_name] + path: str = operation["path"] + method: str = operation["method"] + parameters: List[Dict[str, Any]] = operation.get("parameters", []) + arguments = arguments.copy() if arguments else {} + + for param in parameters: + if param.get("in") == "path" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + path = path.replace(f"{{{param_name}}}", str(arguments.pop(param_name))) + + query = {} + for param in parameters: + if param.get("in") == "query" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + query[param_name] = arguments.pop(param_name) + + headers = {} + for param in parameters: + if param.get("in") == "header" and param.get("name") in arguments: + param_name = param.get("name", None) + if param_name is None: + raise ValueError(f"Parameter name is None for parameter: {param}") + headers[param_name] = arguments.pop(param_name) + + body = arguments if arguments else None + + logger.debug(f"Making {method.upper()} request to {path}") + response = await self._request(self._http_client, method, path, query, headers, body) + + try: + result = response.json() + except json.JSONDecodeError: + if hasattr(response, "text"): + result = response.text + else: + result = response.content + + if 400 <= response.status_code < 600: + raise ValueError( + f"Error calling {tool_name}. Status code: {response.status_code}. Response: {response.text}" + ) + + return result + async def _execute_api_tool( self, client: Annotated[httpx.AsyncClient, Doc("httpx client to use in API calls")], diff --git a/verify_console.py b/verify_console.py new file mode 100644 index 0000000..c5f3fcf --- /dev/null +++ b/verify_console.py @@ -0,0 +1,115 @@ +""" +Simple test script to verify the console feature is correctly implemented. +This doesn't require running the full server, just verifies the code structure. +""" + +import sys +import site +from pathlib import Path + +user_site = site.getusersitepackages() +if user_site and user_site not in sys.path: + sys.path.insert(0, user_site) + +project_root = Path(__file__).parent +if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + +print("=" * 60) +print("Verifying fastapi_mcp console implementation") +print("=" * 60) +print() + +print("1. Checking if console module exists...") +console_path = project_root / 'fastapi_mcp' / 'console' +if console_path.exists(): + print(f" [OK] Console directory exists: {console_path}") + + init_file = console_path / '__init__.py' + logs_file = console_path / 'logs.py' + router_file = console_path / 'router.py' + + for f in [init_file, logs_file, router_file]: + if f.exists(): + print(f" [OK] {f.name} exists") + else: + print(f" [FAIL] {f.name} MISSING!") +else: + print(f" [FAIL] Console directory NOT FOUND: {console_path}") + sys.exit(1) + +print() +print("2. Checking server.py for mount_console method...") +server_file = project_root / 'fastapi_mcp' / 'server.py' + +if not server_file.exists(): + print(f" [FAIL] server.py NOT FOUND: {server_file}") + sys.exit(1) + +with open(server_file, 'r', encoding='utf-8') as f: + server_content = f.read() + +checks = [ + ('mount_console', 'mount_console method'), + ('execute_tool_direct', 'execute_tool_direct method'), + ('from fastapi_mcp.console', 'console module import'), + ('LogStore', 'LogStore usage'), + ('ConsoleConfig', 'ConsoleConfig usage'), + ('from fastapi_mcp.console.logs import', 'logs module import'), + ('from fastapi_mcp.console.router import', 'router module import'), + ('_log_store', '_log_store attribute'), + ('_console_mounted', '_console_mounted flag'), +] + +all_pass = True +for keyword, description in checks: + if keyword in server_content: + print(f" [OK] {description} found") + else: + print(f" [FAIL] {description} NOT FOUND!") + all_pass = False + +print() +print("3. Checking __init__.py for new exports...") +init_file = project_root / 'fastapi_mcp' / '__init__.py' + +if not init_file.exists(): + print(f" [FAIL] __init__.py NOT FOUND") + sys.exit(1) + +with open(init_file, 'r', encoding='utf-8') as f: + init_content = f.read() + +export_checks = [ + ('LogStore', 'LogStore export'), + ('LogEntry', 'LogEntry export'), + ('LogStatus', 'LogStatus export'), + ('ConsoleConfig', 'ConsoleConfig export'), + ('get_global_log_store', 'get_global_log_store export'), +] + +for keyword, description in export_checks: + if keyword in init_content: + print(f" [OK] {description} found") + else: + print(f" [FAIL] {description} NOT FOUND!") + all_pass = False + +print() +print("=" * 60) +if all_pass: + print("[OK] All checks passed! The console feature is correctly implemented.") + print() + print("To run the example, you need to fix the pywin32 DLL issue.") + print() + print("Try running from project root with Python module mode:") + print(" cd e:\\test\\fastapi_mcp") + print(" python -c \"import sys; import site; sys.path.insert(0, '.'); sys.path.insert(0, site.getusersitepackages()); from fastapi_mcp.server import FastApiMCP; print('mount_console:', hasattr(FastApiMCP, 'mount_console'))\"") + print() + print("Or try reinstalling pywin32:") + print(" pip uninstall pywin32 -y") + print(" pip install pywin32 --user") +else: + print("[FAIL] Some checks failed. Please review the errors above.") + sys.exit(1) +print("=" * 60)