diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 47a152170..fea515e4a 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -493,96 +493,45 @@ def _validate_cli_backend_compatibility(self, config, framework): def _validate_agents_config(self, config): """ - Validate agent configuration for typos in field names and provide suggestions. + Validate agent configuration with fail-fast validation and aggregated errors. Args: config (dict): The parsed YAML configuration + + Raises: + ValueError: If configuration has validation errors """ - known_fields = { - 'role', 'goal', 'instructions', 'backstory', 'tools', 'toolsets', 'tasks', 'llm', - 'function_calling_llm', 'allow_delegation', 'max_iter', 'max_rpm', - 'max_execution_time', 'verbose', 'cache', 'system_template', - 'prompt_template', 'response_template', 'tool_timeout', 'tool_retry_policy', - 'planning_tools', 'planning', 'autonomy', 'guardrails', 'streaming', 'stream', - 'approval', 'skills', 'cli_backend', 'runtime', 'reflection', 'handoff', 'web', 'web_fetch' - } + # Use the new comprehensive validator + from .config.validator import ConfigValidator - # Collect YAML-local model aliases (valid even if not in global catalogue) - local_model_aliases = set() - models_cfg = config.get("models", {}) - if isinstance(models_cfg, dict): - local_model_aliases = {k for k in models_cfg.keys() if isinstance(k, str)} + # Use existing tool resolver if available + validator = ConfigValidator(tool_resolver=self.tool_resolver) - # Try to load model catalogue for validation - model_catalogue = None - try: - from .llm.catalogue import ModelCatalogue - model_catalogue = ModelCatalogue() - except ImportError: - pass # Catalogue not available - - for section_name in ('agents', 'roles'): - section = config.get(section_name, {}) - if not isinstance(section, dict): - continue - - entity_name = 'agent' if section_name == 'agents' else 'role' - for name, section_config in section.items(): - if not isinstance(section_config, dict): - continue - - for field_name in section_config: - if field_name in known_fields: - continue - - close_matches = difflib.get_close_matches( - field_name, - known_fields, - n=1, - cutoff=0.6 - ) - suggestion = f" Did you mean '{close_matches[0]}'?" if close_matches else "" - self.logger.warning( - f"Unknown field '{field_name}' in {entity_name} '{name}'.{suggestion}" - ) - - # Validate model/llm values if catalogue available - if model_catalogue: - for model_field in ('llm', 'function_calling_llm'): - model_value = section_config.get(model_field) - if model_value and isinstance(model_value, str): - # Skip validation for local model aliases defined in YAML - if model_value in local_model_aliases: - continue - try: - model_catalogue.validate_model(model_value) - except ValueError as e: - self.logger.warning( - f"Invalid model '{model_value}' in {entity_name} '{name}' field '{model_field}': {e}" - ) - - # Check for tools configured with non-tool-calling models - if section_config.get('tools'): - llm_value = section_config.get('llm') - if llm_value: - model_info = model_catalogue.describe_model(llm_value) - if model_info and not model_info.get('supports_tools'): - self.logger.warning( - f"{entity_name.capitalize()} '{name}' has tools configured but model '{llm_value}' does not support tool calling" - ) - - # Also validate top-level llm/model config - if model_catalogue: - for model_field in ('llm', 'model'): - model_value = config.get(model_field) - if model_value and isinstance(model_value, str): - # Skip validation for local model aliases defined in YAML - if model_value in local_model_aliases: - continue - try: - model_catalogue.validate_model(model_value) - except ValueError as e: - self.logger.warning(f"Invalid model '{model_value}' in top-level '{model_field}': {e}") + # Check for strict mode from environment or config + import os + strict_mode = os.getenv('PRAISONAI_VALIDATE_STRICT', 'false').lower() == 'true' + + # Validate configuration + result = validator.validate_config(config, strict=strict_mode) + + # Log warnings + for warning in result.warnings: + self.logger.warning(warning) + + # If there are errors, fail fast with aggregated error message + if not result.valid: + error_msg = f"Configuration validation failed with {len(result.errors)} error(s):\n" + for i, error in enumerate(result.errors, 1): + error_msg += f" {i}. {error}\n" + + # Include warnings if any + if result.warnings: + error_msg += f"\nAdditionally, there are {len(result.warnings)} warning(s):\n" + for i, warning in enumerate(result.warnings, 1): + error_msg += f" {i}. {warning}\n" + + self.logger.error(error_msg) + raise ValueError(error_msg) diff --git a/src/praisonai/praisonai/cli/app.py b/src/praisonai/praisonai/cli/app.py index 0ba32b6d0..b21eb0679 100644 --- a/src/praisonai/praisonai/cli/app.py +++ b/src/praisonai/praisonai/cli/app.py @@ -127,6 +127,7 @@ class OutputFormat(str, Enum): "setup": (".commands.setup", "app", "Interactive onboarding / configuration wizard"), "onboard": (".commands.onboard", "app", "Messaging bot onboarding wizard"), "obs": (".commands.obs", "app", "Observability diagnostics and management"), + "validate": (".commands.validate", "app", "Validate YAML configuration files"), "acp": (".commands.acp", "app", "Agent Client Protocol server"), "mcp": (".commands.mcp", "app", "MCP server management"), "serve": (".commands.serve", "app", "API server management"), diff --git a/src/praisonai/praisonai/cli/commands/validate.py b/src/praisonai/praisonai/cli/commands/validate.py new file mode 100644 index 000000000..e1f1d21aa --- /dev/null +++ b/src/praisonai/praisonai/cli/commands/validate.py @@ -0,0 +1,291 @@ +""" +YAML configuration validation command. + +Validates agents/tasks/workflow YAML configurations with comprehensive error checking. +""" + +import sys +from pathlib import Path +from typing import Optional + +import typer +from rich.console import Console +from rich.table import Table + +app = typer.Typer( + help="Validate YAML configuration files", + no_args_is_help=True +) + +console = Console() + + +@app.command() +def validate( + file: str = typer.Argument( + ..., + help="Path to YAML configuration file to validate" + ), + strict: bool = typer.Option( + False, + "--strict", + help="Treat warnings as errors (strict validation mode)" + ), + quiet: bool = typer.Option( + False, + "--quiet", "-q", + help="Only show errors, suppress success messages" + ), + json_output: bool = typer.Option( + False, + "--json", + help="Output validation results as JSON" + ), +): + """ + Validate a YAML configuration file for agents, tasks, and workflows. + + This command performs comprehensive validation including: + - Schema validation for required and optional fields + - Type checking for all configuration values + - Cross-reference validation (tasks -> agents, tools -> definitions) + - Detection of unknown fields with suggestions + + Examples: + praisonai validate agents.yaml + praisonai validate agents.yaml --strict + praisonai validate my-config.yaml --json + """ + from ...config.validator import ConfigValidator + import json as json_module + + file_path = Path(file) + + # Handle JSON output for missing file + if not file_path.exists(): + if json_output: + output = { + "file": str(file_path), + "valid": False, + "errors": [f"File not found: {file}"], + "warnings": [], + "strict_mode": strict + } + sys.stdout.write(json_module.dumps(output, indent=2) + "\n") + else: + console.print(f"[red]✗ File not found:[/red] {file}", style="bold") + sys.exit(1) + + # Initialize validator + validator = ConfigValidator() + + # Validate the file + result = validator.validate_yaml_file(str(file_path), strict=strict) + + # Output JSON if requested + if json_output: + output = { + "file": str(file_path), + "valid": result.valid, + "errors": result.errors, + "warnings": result.warnings, + "strict_mode": strict + } + sys.stdout.write(json_module.dumps(output, indent=2) + "\n") + sys.exit(0 if result.valid else 1) + + # Display results + if result.valid: + if not quiet: + console.print(f"[green]✓ Configuration is valid[/green]: {file_path}", style="bold") + + if result.warnings: + console.print(f"\n[yellow]Warnings ({len(result.warnings)}):[/yellow]") + for i, warning in enumerate(result.warnings, 1): + console.print(f" {i}. {warning}") + sys.exit(0) + else: + console.print(f"[red]✗ Configuration validation failed[/red]: {file_path}", style="bold") + + if result.errors: + console.print(f"\n[red]Errors ({len(result.errors)}):[/red]") + for i, error in enumerate(result.errors, 1): + console.print(f" {i}. {error}") + + if result.warnings and not strict: + console.print(f"\n[yellow]Warnings ({len(result.warnings)}):[/yellow]") + for i, warning in enumerate(result.warnings, 1): + console.print(f" {i}. {warning}") + + if not quiet: + console.print(f"\n[dim]Fix the errors above and run validation again.[/dim]") + if result.warnings and not strict: + console.print("[dim]Use --strict to treat warnings as errors.[/dim]") + + sys.exit(1) + + +@app.command() +def check( + directory: str = typer.Argument( + ".", + help="Directory to search for YAML files" + ), + pattern: str = typer.Option( + "*.yaml", + "--pattern", "-p", + help="Glob pattern for YAML files" + ), + strict: bool = typer.Option( + False, + "--strict", + help="Treat warnings as errors" + ), + stop_on_error: bool = typer.Option( + False, + "--stop-on-error", + help="Stop checking files after first error" + ), +): + """ + Check all YAML configuration files in a directory. + + Examples: + praisonai validate check . + praisonai validate check ./configs --pattern "*.yml" + praisonai validate check . --strict --stop-on-error + """ + from ...config.validator import ConfigValidator + import glob + + # Find all matching files + dir_path = Path(directory) + if not dir_path.exists(): + console.print(f"[red]✗ Directory not found:[/red] {directory}", style="bold") + sys.exit(1) + + # Find files matching pattern + yaml_files = list(dir_path.glob(pattern)) + if pattern == "*.yaml": + yaml_files.extend(dir_path.glob("*.yml")) + + if not yaml_files: + console.print(f"[yellow]No YAML files found matching pattern:[/yellow] {pattern}") + sys.exit(0) + + # Initialize validator + validator = ConfigValidator() + + # Create results table + table = Table(title=f"Validation Results for {len(yaml_files)} file(s)") + table.add_column("File", style="cyan") + table.add_column("Status", style="bold") + table.add_column("Errors", justify="center") + table.add_column("Warnings", justify="center") + + total_valid = 0 + total_invalid = 0 + total_errors = 0 + total_warnings = 0 + + for yaml_file in sorted(yaml_files): + result = validator.validate_yaml_file(str(yaml_file), strict=strict) + + if result.valid: + status = "[green]✓ Valid[/green]" + total_valid += 1 + else: + status = "[red]✗ Invalid[/red]" + total_invalid += 1 + + if stop_on_error: + console.print(f"\n[red]Validation failed for:[/red] {yaml_file}") + console.print(result.format_message()) + sys.exit(1) + + total_errors += len(result.errors) + total_warnings += len(result.warnings) + + table.add_row( + str(yaml_file.relative_to(dir_path)), + status, + str(len(result.errors)) if result.errors else "-", + str(len(result.warnings)) if result.warnings else "-" + ) + + # Display results + console.print(table) + + # Summary + console.print(f"\nSummary:") + console.print(f" Valid files: [green]{total_valid}[/green]") + console.print(f" Invalid files: [red]{total_invalid}[/red]") + console.print(f" Total errors: {total_errors}") + console.print(f" Total warnings: {total_warnings}") + + sys.exit(0 if total_invalid == 0 else 1) + + +@app.command() +def schema(): + """ + Display the YAML configuration schema documentation. + + Shows all available fields, their types, and descriptions for: + - Agent/Role configuration + - Task configuration + - Workflow configuration + - Global settings + """ + from ...config.schema import YAMLConfig, AgentConfig, TaskConfig + import json + + console.print("[bold cyan]PraisonAI YAML Configuration Schema[/bold cyan]\n") + + # Display main sections + console.print("[bold]Main Configuration Sections:[/bold]") + console.print(" • roles/agents - Define agent roles and capabilities") + console.print(" • tasks - Define tasks and their assignments") + console.print(" • workflow - Configure workflow execution") + console.print(" • tools/toolsets - Global tool configuration") + console.print(" • config - Global settings (acp, lsp)") + console.print() + + # Display agent fields + console.print("[bold]Agent/Role Fields:[/bold]") + agent_schema = AgentConfig.model_json_schema() + required_fields = agent_schema.get('required', []) + + for field_name, field_info in agent_schema.get('properties', {}).items(): + required = " [red](required)[/red]" if field_name in required_fields else "" + description = field_info.get('description', 'No description') + field_type = field_info.get('type', 'any') + + # Handle complex types + if 'anyOf' in field_info: + types = [t.get('type', 'object') for t in field_info['anyOf'] if 'type' in t] + field_type = ' | '.join(types) if types else 'mixed' + elif '$ref' in field_info: + field_type = field_info['$ref'].split('/')[-1] + + console.print(f" • {field_name}{required} ({field_type}): {description}") + + console.print() + + # Display task fields + console.print("[bold]Task Fields:[/bold]") + task_schema = TaskConfig.model_json_schema() + required_fields = task_schema.get('required', []) + + for field_name, field_info in task_schema.get('properties', {}).items(): + required = " [red](required)[/red]" if field_name in required_fields else "" + description = field_info.get('description', 'No description') + field_type = field_info.get('type', 'any') + console.print(f" • {field_name}{required} ({field_type}): {description}") + + console.print() + console.print("[dim]Use 'praisonai validate ' to validate your configuration.[/dim]") + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/praisonai/praisonai/config/__init__.py b/src/praisonai/praisonai/config/__init__.py new file mode 100644 index 000000000..f75488f56 --- /dev/null +++ b/src/praisonai/praisonai/config/__init__.py @@ -0,0 +1,40 @@ +""" +Configuration validation and schema management. +""" + +from .schema import ( + YAMLConfig, + AgentConfig, + TaskConfig, + WorkflowConfig, + WorkflowStep, + ValidationResult, + ProcessType, + HandoffPolicy, + ToolRetryPolicy, + HandoffConfig, + ApprovalConfig, + RuntimeConfig, + CliBackendConfig, + GlobalConfig, +) + +from .validator import ConfigValidator + +__all__ = [ + 'YAMLConfig', + 'AgentConfig', + 'TaskConfig', + 'WorkflowConfig', + 'WorkflowStep', + 'ValidationResult', + 'ProcessType', + 'HandoffPolicy', + 'ToolRetryPolicy', + 'HandoffConfig', + 'ApprovalConfig', + 'RuntimeConfig', + 'CliBackendConfig', + 'GlobalConfig', + 'ConfigValidator', +] \ No newline at end of file diff --git a/src/praisonai/praisonai/config/schema.py b/src/praisonai/praisonai/config/schema.py new file mode 100644 index 000000000..bc65b3891 --- /dev/null +++ b/src/praisonai/praisonai/config/schema.py @@ -0,0 +1,402 @@ +""" +YAML configuration schema validation using Pydantic models. + +This module provides schema validation for agents/tasks/workflow YAML configurations +with fail-fast validation, aggregated errors, and cross-reference checking. +""" + +from enum import Enum +from typing import Optional, Dict, List, Any, Union +from pydantic import BaseModel, Field, field_validator, model_validator +import re + + +class ProcessType(str, Enum): + """Process type for task execution.""" + SEQUENTIAL = "sequential" + HIERARCHICAL = "hierarchical" + CONSENSUAL = "consensual" + WORKFLOW = "workflow" + + +class HandoffPolicy(str, Enum): + """Handoff policy for agent delegation.""" + ANY = "any" + ALL = "all" + ROUND_ROBIN = "round_robin" + LEAST_BUSY = "least_busy" + + +class ToolRetryPolicy(BaseModel): + """Configuration for tool retry behavior.""" + max_attempts: int = Field(default=3, ge=1, description="Maximum retry attempts") + delay: float = Field(default=1.0, ge=0, description="Delay between retries in seconds") + backoff_factor: float = Field(default=2.0, ge=1, description="Exponential backoff factor") + max_delay: float = Field(default=60.0, ge=0, description="Maximum delay between retries") + + +class HandoffConfig(BaseModel): + """Configuration for agent handoff behavior.""" + to: List[str] = Field(default_factory=list, description="List of agent roles to handoff to") + policy: Optional[HandoffPolicy] = Field(default=HandoffPolicy.ANY, description="Handoff policy") + timeout: Optional[float] = Field(default=300.0, ge=0, description="Handoff timeout in seconds") + max_depth: Optional[int] = Field(default=5, ge=1, description="Maximum handoff depth") + max_concurrent: Optional[int] = Field(default=3, ge=1, description="Maximum concurrent handoffs") + detect_cycles: Optional[bool] = Field(default=True, description="Detect handoff cycles") + + +class ApprovalConfig(BaseModel): + """Configuration for agent approval requirements.""" + enabled: bool = Field(default=False, description="Enable approval mode") + timeout: Optional[float] = Field(default=300.0, ge=0, description="Approval timeout in seconds") + level: Optional[str] = Field(default="tool", description="Approval level (tool/step/all)") + auto_approve: List[str] = Field(default_factory=list, description="Auto-approved tools") + + +class RuntimeConfig(BaseModel): + """Configuration for agent runtime environment.""" + type: str = Field(..., description="Runtime type (docker/sandbox/local)") + image: Optional[str] = Field(default=None, description="Runtime image") + env: Dict[str, str] = Field(default_factory=dict, description="Environment variables") + + +class CliBackendConfig(BaseModel): + """Configuration for CLI backend.""" + type: str = Field(..., description="CLI backend type") + config: Optional[Dict[str, Any]] = Field(default=None, description="Backend-specific config") + + +class AgentConfig(BaseModel): + """Configuration for a single agent/role.""" + # Required fields + role: str = Field(..., description="Agent role") + goal: str = Field(..., description="Agent goal") + backstory: str = Field(..., description="Agent backstory") + + # Optional fields + instructions: Optional[str] = Field(default=None, description="Additional instructions (alias for backstory)") + tools: Optional[List[str]] = Field(default=None, description="List of tools the agent can use") + toolsets: Optional[List[str]] = Field(default=None, description="List of toolsets the agent can use") + llm: Optional[Union[str, Dict[str, Any]]] = Field(default=None, description="LLM model to use (string or dict with 'model' key)") + function_calling_llm: Optional[Union[str, Dict[str, Any]]] = Field(default=None, description="LLM for function calling (string or dict with 'model' key)") + tasks: Optional[Dict[str, Union[Dict[str, Any], 'TaskConfig']]] = Field(default=None, description="Tasks assigned to this agent") + + # Behavior configuration + allow_delegation: Optional[bool] = Field(default=True, description="Allow delegation to other agents") + max_iter: Optional[int] = Field(default=10, ge=1, description="Maximum iterations") + max_rpm: Optional[int] = Field(default=60, ge=1, description="Maximum requests per minute") + max_execution_time: Optional[float] = Field(default=None, ge=0, description="Maximum execution time") + verbose: Optional[bool] = Field(default=False, description="Verbose output") + cache: Optional[bool] = Field(default=True, description="Enable caching") + streaming: Optional[bool] = Field(default=False, description="Enable streaming") + stream: Optional[bool] = Field(default=None, description="Alias for streaming") + + # Advanced configuration + tool_timeout: Optional[float] = Field(default=None, ge=0, description="Tool execution timeout") + tool_retry_policy: Optional[Union[Dict[str, Any], ToolRetryPolicy]] = Field(default=None, description="Tool retry policy") + planning_tools: Optional[List[str]] = Field(default=None, description="Planning tools") + planning: Optional[bool] = Field(default=False, description="Enable planning mode") + autonomy: Optional[int] = Field(default=0, ge=0, le=10, description="Autonomy level (0-10)") + guardrails: Optional[List[str]] = Field(default=None, description="Guardrails to apply") + approval: Optional[Union[bool, Dict[str, Any], ApprovalConfig]] = Field(default=None, description="Approval configuration") + skills: Optional[List[str]] = Field(default=None, description="Skills the agent has") + reflection: Optional[bool] = Field(default=False, description="Enable reflection") + handoff: Optional[Union[Dict[str, Any], HandoffConfig]] = Field(default=None, description="Handoff configuration") + web: Optional[bool] = Field(default=False, description="Enable web access") + web_fetch: Optional[bool] = Field(default=False, description="Enable web fetching") + + # Runtime configuration + cli_backend: Optional[Union[str, Dict[str, Any], CliBackendConfig]] = Field(default=None, description="CLI backend config") + runtime: Optional[Union[str, Dict[str, Any], RuntimeConfig]] = Field(default=None, description="Runtime configuration") + + # Templates + system_template: Optional[str] = Field(default=None, description="System prompt template") + prompt_template: Optional[str] = Field(default=None, description="Prompt template") + response_template: Optional[str] = Field(default=None, description="Response template") + + @model_validator(mode='before') + @classmethod + def normalize_stream_alias(cls, data): + """Map legacy 'stream' into canonical 'streaming'.""" + if isinstance(data, dict) and 'streaming' not in data and 'stream' in data: + data['streaming'] = data['stream'] + return data + + @model_validator(mode='after') + def normalize_config_objects(self): + """Convert dict configs to proper model objects.""" + # Convert tasks dict to TaskConfig objects + if isinstance(self.tasks, dict): + normalized_tasks = {} + for task_name, task_config in self.tasks.items(): + if isinstance(task_config, dict): + # Add the agent field if not present (use self.role) + if 'agent' not in task_config: + task_config['agent'] = self.role + normalized_tasks[task_name] = TaskConfig(**task_config) + else: + normalized_tasks[task_name] = task_config + self.tasks = normalized_tasks + + # Convert tool_retry_policy dict to ToolRetryPolicy + if isinstance(self.tool_retry_policy, dict): + self.tool_retry_policy = ToolRetryPolicy(**self.tool_retry_policy) + + # Convert approval dict/bool to ApprovalConfig + if isinstance(self.approval, bool): + self.approval = ApprovalConfig(enabled=self.approval) + elif isinstance(self.approval, dict): + self.approval = ApprovalConfig(**self.approval) + + # Convert handoff dict to HandoffConfig + if isinstance(self.handoff, dict): + self.handoff = HandoffConfig(**self.handoff) + + # Convert cli_backend to CliBackendConfig + if isinstance(self.cli_backend, str): + self.cli_backend = CliBackendConfig(type=self.cli_backend) + elif isinstance(self.cli_backend, dict): + self.cli_backend = CliBackendConfig(**self.cli_backend) + + # Convert runtime to RuntimeConfig + if isinstance(self.runtime, str): + self.runtime = RuntimeConfig(type=self.runtime) + elif isinstance(self.runtime, dict): + self.runtime = RuntimeConfig(**self.runtime) + + return self + + +class TaskConfig(BaseModel): + """Configuration for a single task.""" + description: str = Field(..., description="Task description") + agent: str = Field(..., description="Agent to execute the task") + + # Optional fields + expected_output: Optional[str] = Field(default=None, description="Expected output format") + tools: Optional[List[str]] = Field(default=None, description="Tools to use for this task") + context: Optional[List[str]] = Field(default=None, description="Context from other tasks") + output_file: Optional[str] = Field(default=None, description="Output file path") + async_execution: Optional[bool] = Field(default=False, description="Execute asynchronously") + condition: Optional[str] = Field(default=None, description="Condition for task execution") + + @field_validator('agent') + @classmethod + def validate_agent_name(cls, v): + """Validate agent name format.""" + if not v or not v.strip(): + raise ValueError("Agent name cannot be empty") + # Allow alphanumeric, underscore, hyphen + if not re.match(r'^[a-zA-Z0-9_-]+$', v): + raise ValueError(f"Invalid agent name: {v}. Use only letters, numbers, underscore, and hyphen.") + return v + + +class WorkflowStep(BaseModel): + """Configuration for a workflow step.""" + name: str = Field(..., description="Step name") + type: Optional[str] = Field(default="task", description="Step type (task/route/parallel/loop)") + agent: Optional[str] = Field(default=None, description="Agent for task steps") + task: Optional[str] = Field(default=None, description="Task description") + steps: Optional[List['WorkflowStep']] = Field(default=None, description="Sub-steps for complex types") + condition: Optional[str] = Field(default=None, description="Condition for step execution") + routes: Optional[Dict[str, List['WorkflowStep']]] = Field(default=None, description="Routes for routing steps") + count: Optional[int] = Field(default=None, ge=1, description="Loop count") + + @model_validator(mode='after') + def validate_step_type(self): + """Validate step configuration based on type.""" + allowed = {'task', 'parallel', 'loop', 'route'} + if self.type not in allowed: + raise ValueError( + f"Step '{self.name}' has invalid type '{self.type}'. " + f"Allowed values: {', '.join(sorted(allowed))}" + ) + + if self.type == 'task': + if not self.agent or not self.task: + raise ValueError(f"Task step '{self.name}' requires both 'agent' and 'task' fields") + elif self.type in ('parallel', 'loop'): + if not self.steps: + raise ValueError(f"{self.type.capitalize()} step '{self.name}' requires 'steps' field") + if self.type == 'loop' and self.count is None: + raise ValueError(f"Loop step '{self.name}' requires 'count' field") + elif self.type == 'route': + if not self.routes: + raise ValueError(f"Route step '{self.name}' requires 'routes' field") + + return self + + +# Enable forward references for recursive models +WorkflowStep.model_rebuild() + + +class WorkflowConfig(BaseModel): + """Configuration for workflow execution.""" + default_llm: Optional[str] = Field(default=None, description="Default LLM for workflow") + timeout: Optional[float] = Field(default=None, ge=0, description="Workflow timeout") + max_parallel: Optional[int] = Field(default=3, ge=1, description="Maximum parallel executions") + error_handling: Optional[str] = Field(default="stop", description="Error handling strategy") + + +class GlobalConfig(BaseModel): + """Global configuration settings.""" + acp: Optional[bool] = Field(default=False, description="Enable ACP mode") + lsp: Optional[bool] = Field(default=False, description="Enable LSP mode") + + +class YAMLConfig(BaseModel): + """Complete YAML configuration schema.""" + # Metadata + name: Optional[str] = Field(default=None, description="Configuration name") + description: Optional[str] = Field(default=None, description="Configuration description") + framework: Optional[str] = Field(default="praisonai", description="Framework to use") + process: Optional[ProcessType] = Field(default=ProcessType.SEQUENTIAL, description="Process type") + + # Core sections (at least one required) + roles: Optional[Dict[str, AgentConfig]] = Field(default=None, description="Agent roles (canonical)") + agents: Optional[Dict[str, AgentConfig]] = Field(default=None, description="Agents (backward compat)") + tasks: Optional[List[TaskConfig]] = Field(default=None, description="Task definitions") + workflow: Optional[WorkflowConfig] = Field(default=None, description="Workflow configuration") + steps: Optional[List[WorkflowStep]] = Field(default=None, description="Workflow steps") + + # Input/topic + input: Optional[str] = Field(default=None, description="Input/topic (canonical)") + topic: Optional[str] = Field(default=None, description="Topic (backward compat)") + + # Tools + tools: Optional[List[str]] = Field(default=None, description="Global tools") + toolsets: Optional[List[str]] = Field(default=None, description="Global toolsets") + + # Global config + config: Optional[GlobalConfig] = Field(default=None, description="Global configuration") + + # LLM config + llm: Optional[str] = Field(default=None, description="Default LLM") + models: Optional[Dict[str, Any]] = Field(default=None, description="Model configurations") + providers: Optional[Dict[str, Any]] = Field(default=None, description="Provider configurations") + + @model_validator(mode='after') + def validate_config_structure(self): + """Validate overall configuration structure.""" + # Ensure at least one of roles/agents is present + if not self.roles and not self.agents: + raise ValueError("Configuration must define either 'roles' or 'agents' section") + + # Normalize agents -> roles + if self.agents and not self.roles: + self.roles = self.agents + + # Validate workflow mode requirements + if self.process == ProcessType.WORKFLOW: + if not self.steps and not self.workflow: + raise ValueError("Workflow process requires 'steps' or 'workflow' section") + + # Normalize input/topic + if self.topic and not self.input: + self.input = self.topic + + return self + + def validate_cross_references(self) -> List[str]: + """Validate cross-references between agents, tasks, and tools. + + Returns: + List of validation error messages (empty if valid) + """ + errors = [] + + # Get all defined agent names + agent_names = set() + if self.roles: + agent_names.update(self.roles.keys()) + if self.agents: + agent_names.update(self.agents.keys()) + + # Validate task agent references + if self.tasks: + for i, task in enumerate(self.tasks): + if task.agent not in agent_names: + errors.append( + f"Task {i+1} references undefined agent '{task.agent}'. " + f"Available agents: {', '.join(sorted(agent_names))}" + ) + + # Validate workflow step agent references + def validate_steps(steps: List[WorkflowStep], path: str = ""): + for i, step in enumerate(steps or []): + step_path = f"{path}step[{i+1}]({step.name})" + + if step.type == 'task' and step.agent: + if step.agent not in agent_names: + errors.append( + f"Workflow {step_path} references undefined agent '{step.agent}'. " + f"Available agents: {', '.join(sorted(agent_names))}" + ) + + # Recursively check sub-steps + if step.steps: + validate_steps(step.steps, f"{step_path}/") + + # Check routes + if step.routes: + for route_name, route_steps in step.routes.items(): + validate_steps(route_steps, f"{step_path}/route[{route_name}]/") + + if self.steps: + validate_steps(self.steps) + + # Validate handoff references + all_roles = set() + all_agent_configs = [] + for agents_dict in [self.roles, self.agents]: + if agents_dict: + all_agent_configs.extend(agents_dict.values()) + for agent_config in agents_dict.values(): + all_roles.add(agent_config.role) + + for agent_config in all_agent_configs: + if agent_config.handoff and isinstance(agent_config.handoff, HandoffConfig): + for target in agent_config.handoff.to: + if target not in all_roles: + errors.append( + f"Agent '{agent_config.role}' handoff references undefined role '{target}'. " + f"Available roles: {', '.join(sorted(all_roles))}" + ) + + return errors + + +class ValidationResult(BaseModel): + """Result of YAML validation.""" + valid: bool = Field(..., description="Whether configuration is valid") + errors: List[str] = Field(default_factory=list, description="List of validation errors") + warnings: List[str] = Field(default_factory=list, description="List of validation warnings") + + def format_message(self) -> str: + """Format validation result as a readable message.""" + if self.valid: + msg = "✓ Configuration is valid" + if self.warnings: + msg += f"\n\nWarnings ({len(self.warnings)}):\n" + for i, warning in enumerate(self.warnings, 1): + msg += f" {i}. {warning}\n" + return msg + + msg = f"✗ Configuration validation failed with {len(self.errors)} error(s)" + if self.errors: + msg += "\n\nErrors:\n" + for i, error in enumerate(self.errors, 1): + msg += f" {i}. {error}\n" + + if self.warnings: + msg += f"\nWarnings ({len(self.warnings)}):\n" + for i, warning in enumerate(self.warnings, 1): + msg += f" {i}. {warning}\n" + + return msg + + +# Resolve forward references for TaskConfig in AgentConfig +AgentConfig.model_rebuild() \ No newline at end of file diff --git a/src/praisonai/praisonai/config/validator.py b/src/praisonai/praisonai/config/validator.py new file mode 100644 index 000000000..756707b92 --- /dev/null +++ b/src/praisonai/praisonai/config/validator.py @@ -0,0 +1,310 @@ +""" +Configuration validator with fail-fast validation and aggregated error reporting. +""" + +import yaml +from pathlib import Path +from typing import Dict, Any, List, Optional +from pydantic import ValidationError + +from .schema import YAMLConfig, ValidationResult +from ..tool_resolver import ToolResolver +from ..tool_registry import ToolRegistry + + +class ConfigValidator: + """Validator for YAML configuration with schema and cross-reference validation.""" + + def __init__(self, tool_resolver: Optional[ToolResolver] = None): + """Initialize validator with optional tool resolver. + + Args: + tool_resolver: Tool resolver for validating tool references + """ + self.tool_resolver = tool_resolver + if not self.tool_resolver: + # Create a default tool resolver + registry = ToolRegistry() + registry.register_builtin_autogen_adapters(_suppress_deprecation_warning=True) + self.tool_resolver = ToolResolver(registry=registry) + + def validate_yaml_string(self, yaml_content: str, strict: bool = False) -> ValidationResult: + """Validate YAML configuration from string. + + Args: + yaml_content: YAML configuration as string + strict: If True, treat warnings as errors + + Returns: + ValidationResult with errors and warnings + """ + try: + config_dict = yaml.safe_load(yaml_content) + except yaml.YAMLError as e: + return ValidationResult( + valid=False, + errors=[f"YAML syntax error: {e}"] + ) + + return self.validate_config(config_dict, strict=strict) + + def validate_yaml_file(self, file_path: str, strict: bool = False) -> ValidationResult: + """Validate YAML configuration from file. + + Args: + file_path: Path to YAML file + strict: If True, treat warnings as errors + + Returns: + ValidationResult with errors and warnings + """ + path = Path(file_path) + + if not path.exists(): + return ValidationResult( + valid=False, + errors=[f"File not found: {file_path}"] + ) + + try: + with open(path, 'r') as f: + config_dict = yaml.safe_load(f) + except yaml.YAMLError as e: + return ValidationResult( + valid=False, + errors=[f"YAML syntax error in {file_path}: {e}"] + ) + except Exception as e: + return ValidationResult( + valid=False, + errors=[f"Error reading file {file_path}: {e}"] + ) + + return self.validate_config(config_dict, strict=strict, file_path=file_path) + + def validate_config(self, config: Dict[str, Any], strict: bool = False, file_path: Optional[str] = None) -> ValidationResult: + """Validate configuration dictionary. + + Args: + config: Configuration dictionary + strict: If True, treat warnings as errors + file_path: Optional file path for better error messages + + Returns: + ValidationResult with errors and warnings + """ + errors = [] + warnings = [] + file_prefix = f"{file_path}: " if file_path else "" + + if config is None: + return ValidationResult( + valid=False, + errors=[f"{file_prefix}Configuration is empty. Expected a YAML mapping at the root."] + ) + if not isinstance(config, dict): + return ValidationResult( + valid=False, + errors=[f"{file_prefix}Invalid root type '{type(config).__name__}'. Expected a YAML mapping/object."] + ) + + # Validate against schema + try: + yaml_config = YAMLConfig(**config) + except ValidationError as e: + # Parse Pydantic errors into readable messages + for error in e.errors(): + field_path = '.'.join(str(x) for x in error['loc']) + msg = f"{file_prefix}{field_path}: {error['msg']}" + + # Add context for common errors + if error['type'] == 'missing': + msg += f" (required field)" + elif error['type'] == 'type_error': + msg += f" (expected {error.get('ctx', {}).get('expected_type', 'different type')})" + elif error['type'] == 'value_error': + if 'ctx' in error and 'error' in error['ctx']: + msg = f"{file_prefix}{field_path}: {error['ctx']['error']}" + + errors.append(msg) + + return ValidationResult(valid=False, errors=errors, warnings=warnings) + + # Validate cross-references + cross_ref_errors = yaml_config.validate_cross_references() + for error in cross_ref_errors: + errors.append(f"{file_prefix}{error}") + + # Validate tool references + tool_errors, tool_warnings = self._validate_tools(yaml_config, file_prefix) + errors.extend(tool_errors) + warnings.extend(tool_warnings) + + # Check for unknown fields (as warnings) + unknown_warnings = self._check_unknown_fields(config, file_prefix) + warnings.extend(unknown_warnings) + + # In strict mode, treat warnings as errors + if strict and warnings: + errors.extend([f"[strict mode] {w}" for w in warnings]) + warnings = [] + + return ValidationResult( + valid=len(errors) == 0, + errors=errors, + warnings=warnings + ) + + def _validate_tools(self, config: YAMLConfig, file_prefix: str = "") -> tuple[List[str], List[str]]: + """Validate tool references in configuration. + + Args: + config: Validated YAML configuration + file_prefix: File path prefix for error messages + + Returns: + Tuple of (errors, warnings) + """ + errors = [] + warnings = [] + + # Collect all tool references + tool_refs = set() + + # Global tools + if config.tools: + tool_refs.update(config.tools) + if config.toolsets: + tool_refs.update(config.toolsets) + + # Agent tools + for agents_dict in [config.roles, config.agents]: + if agents_dict: + for agent_name, agent in agents_dict.items(): + if agent.tools: + tool_refs.update(agent.tools) + if agent.toolsets: + tool_refs.update(agent.toolsets) + if agent.planning_tools: + tool_refs.update(agent.planning_tools) + + # Task tools + if config.tasks: + for task in config.tasks: + if task.tools: + tool_refs.update(task.tools) + + # Validate each tool reference + for tool_name in tool_refs: + try: + # Try to resolve the tool + resolved = self.tool_resolver.resolve_tool(tool_name) + if resolved is None: + # Check if it might be a valid tool that's not installed + if self._is_known_optional_tool(tool_name): + warnings.append( + f"{file_prefix}Tool '{tool_name}' requires additional dependencies. " + f"Install with: pip install 'praisonai[tools]' or the specific tool package." + ) + else: + errors.append( + f"{file_prefix}Unknown tool '{tool_name}'. " + f"Ensure it's properly installed or defined in your configuration." + ) + except Exception as e: + # Tool resolution failed + warnings.append( + f"{file_prefix}Could not validate tool '{tool_name}': {e}" + ) + + return errors, warnings + + def _is_known_optional_tool(self, tool_name: str) -> bool: + """Check if a tool name is a known optional tool. + + Args: + tool_name: Tool name to check + + Returns: + True if it's a known optional tool + """ + # List of known optional tools that require extra dependencies + known_optional = { + # Database tools + 'PostgreSQLTool', 'MySQLTool', 'SQLiteTool', 'MongoDBTool', 'RedisTool', + 'SurrealDBTool', 'CassandraTool', 'ElasticsearchTool', + + # Web/API tools + 'SlackTool', 'DiscordTool', 'TelegramTool', 'EmailTool', + 'TwitterTool', 'LinkedInTool', 'GitHubTool', + + # Cloud tools + 'AWSTool', 'AzureTool', 'GCPTool', 'S3Tool', + + # AI/ML tools + 'HuggingFaceTool', 'OpenAITool', 'AnthropicTool', + + # Data tools + 'PandasTool', 'NumpyTool', 'ScipyTool', + + # Other + 'BrowserTool', 'SeleniumTool', 'PlaywrightTool', + 'KubernetesTool', 'DockerTool', 'TerraformTool', + } + + return tool_name in known_optional + + def _check_unknown_fields(self, config: Dict[str, Any], file_prefix: str = "") -> List[str]: + """Check for unknown fields in configuration. + + Args: + config: Raw configuration dictionary + file_prefix: File path prefix for messages + + Returns: + List of warnings about unknown fields + """ + warnings = [] + + # Known top-level fields + known_top_level = { + 'name', 'description', 'framework', 'process', 'type', + 'roles', 'agents', 'tasks', 'workflow', 'steps', + 'input', 'topic', 'tools', 'toolsets', + 'config', 'llm', 'models', 'providers', + 'deploy', # Deployment configuration + 'dependencies', # Task dependency declarations + } + + # Known agent/role fields + known_agent_fields = { + 'role', 'goal', 'instructions', 'backstory', 'tools', 'toolsets', 'tasks', 'llm', + 'function_calling_llm', 'allow_delegation', 'max_iter', 'max_rpm', + 'max_execution_time', 'verbose', 'cache', 'system_template', + 'prompt_template', 'response_template', 'tool_timeout', 'tool_retry_policy', + 'planning_tools', 'planning', 'autonomy', 'guardrails', 'streaming', 'stream', + 'approval', 'skills', 'cli_backend', 'runtime', 'reflection', 'handoff', + 'web', 'web_fetch', 'name', # Sometimes used as alias + } + + # Check top-level unknown fields + for field in config: + if field not in known_top_level: + warnings.append( + f"{file_prefix}Unknown top-level field '{field}'. " + f"This field will be ignored." + ) + + # Check agent/role fields + for section in ['agents', 'roles']: + if section in config and isinstance(config[section], dict): + for agent_name, agent_config in config[section].items(): + if isinstance(agent_config, dict): + for field in agent_config: + if field not in known_agent_fields: + warnings.append( + f"{file_prefix}{section}.{agent_name}: Unknown field '{field}'. " + f"This field will be ignored." + ) + + return warnings \ No newline at end of file diff --git a/src/praisonai/pyproject.toml b/src/praisonai/pyproject.toml index af05134a0..98ef1c3c4 100644 --- a/src/praisonai/pyproject.toml +++ b/src/praisonai/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "mcp>=1.20.0", "typer>=0.9.0", "textual>=0.47.0", + "pydantic>=2.0.0", ] [project.scripts]