diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 070896dcc..f21644e12 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -20,7 +20,6 @@ # Import new architecture components from .framework_adapters.base import FrameworkAdapter from .framework_adapters.registry import FrameworkAdapterRegistry, get_default_registry -from .tool_registry import ToolRegistry # Import availability flags # Compatibility imports - now handled by centralized detection @@ -299,13 +298,9 @@ def __init__(self, agent_file, framework, config_list, log_level=None, agent_cal elif os.environ.get('LOGLEVEL'): self.logger.setLevel(getattr(logging, os.environ.get('LOGLEVEL', 'INFO').upper(), logging.INFO)) - # Keep tool registry for backward compatibility with autogen adapters - self.tool_registry = ToolRegistry() - self.tool_registry.register_builtin_autogen_adapters(_suppress_deprecation_warning=True) - - # Initialize tool resolver with the registry wired in (single source of truth for tool resolution) + # Initialize tool resolver (single source of truth for tool resolution) from .tool_resolver import ToolResolver - self.tool_resolver = ToolResolver(registry=self.tool_registry) + self.tool_resolver = ToolResolver() # DI-friendly: tests/multi-tenant runtimes pass their own registry; # CLI users get the process default. @@ -472,57 +467,6 @@ def is_function_or_decorated(self, obj): """ return inspect.isfunction(obj) or hasattr(obj, '__call__') - def load_tools_from_module(self, module_path): - """ - Load function tools from a user-supplied module (gated by PRAISONAI_ALLOW_LOCAL_TOOLS). - - Parameters: - module_path (str): The path to the module containing the tools. - - Returns: - dict: A dictionary containing the names of the tools as keys and the corresponding functions or objects as values. - Returns an empty dict if the module cannot be loaded (path missing, loading blocked by PRAISONAI_ALLOW_LOCAL_TOOLS, or any other load error). - """ - from ._safe_loader import load_user_module - module = load_user_module(module_path, name="tools_module") - if module is None: - return {} - return {name: obj for name, obj in inspect.getmembers(module, self.is_function_or_decorated)} - - def load_tools_from_module_class(self, module_path): - """ - Load BaseTool / langchain tool classes from a user-supplied module (gated by PRAISONAI_ALLOW_LOCAL_TOOLS). - """ - from ._safe_loader import load_user_module - module = load_user_module(module_path, name="tools_module") - if module is None: - return {} - return self.tool_resolver._extract_tool_classes(module) - - def load_tools_from_package(self, package_path): - """ - Loads tools from a specified package path containing modules with functions or classes. - - Parameters: - package_path (str): The path to the package containing the tools. - - Returns: - dict: A dictionary containing the names of the tools as keys and the corresponding initialized instances of the classes as values. - - Raises: - FileNotFoundError: If the specified package path does not exist. - - This function iterates through all the .py files in the specified package path, excluding those that start with "__". For each file, it imports the corresponding module and checks if it contains any functions or classes that can be loaded as tools. The function then returns a dictionary containing the names of the tools as keys and the corresponding initialized instances of the classes as values. - """ - tools_dict = {} - for module_file in os.listdir(package_path): - if module_file.endswith('.py') and not module_file.startswith('__'): - module_name = f"{package_path.name}.{module_file[:-3]}" # Remove .py for import - module = importlib.import_module(module_name) - for name, obj in inspect.getmembers(module, self.is_function_or_decorated): - tools_dict[name] = obj - return tools_dict - def generate_crew_and_kickoff(self): """ diff --git a/src/praisonai/praisonai/framework_adapters/autogen_adapter.py b/src/praisonai/praisonai/framework_adapters/autogen_adapter.py index 02df11193..3f7f5ba89 100644 --- a/src/praisonai/praisonai/framework_adapters/autogen_adapter.py +++ b/src/praisonai/praisonai/framework_adapters/autogen_adapter.py @@ -527,4 +527,4 @@ def tool_fn(**kwargs): if not result_content: result_content = "Task completed." - return f"### AG2 Output ###\n{result_content}" + return f"### AG2 Output ###\n{result_content}" \ No newline at end of file diff --git a/src/praisonai/praisonai/framework_adapters/crewai_adapter.py b/src/praisonai/praisonai/framework_adapters/crewai_adapter.py index fd4d90d0d..0d381664d 100644 --- a/src/praisonai/praisonai/framework_adapters/crewai_adapter.py +++ b/src/praisonai/praisonai/framework_adapters/crewai_adapter.py @@ -195,5 +195,4 @@ def run( except Exception as e: # noqa: BLE001 -- agentops errors must not crash the caller logger.warning(f"agentops.end_session failed: {e}") - return result - + return result \ No newline at end of file diff --git a/src/praisonai/praisonai/framework_adapters/praisonai_adapter.py b/src/praisonai/praisonai/framework_adapters/praisonai_adapter.py index 32ab342a5..0a2548da6 100644 --- a/src/praisonai/praisonai/framework_adapters/praisonai_adapter.py +++ b/src/praisonai/praisonai/framework_adapters/praisonai_adapter.py @@ -210,5 +210,4 @@ def run( from praisonai._async_bridge import run_sync run_sync(interactive_runtime.stop()) except Exception as e: - logger.error(f"Error stopping InteractiveRuntime: {e}") - + logger.error(f"Error stopping InteractiveRuntime: {e}") \ No newline at end of file diff --git a/src/praisonai/praisonai/storage/__init__.py b/src/praisonai/praisonai/storage/__init__.py deleted file mode 100644 index 1efb5b4cc..000000000 --- a/src/praisonai/praisonai/storage/__init__.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Storage adapter implementations for PraisonAI. - -Heavy implementations that follow StorageBackendProtocol for: -- Redis (praisonai[redis]) -- MongoDB (praisonai[mongodb]) -- PostgreSQL (praisonai[postgresql]) -- DynamoDB (praisonai[dynamodb]) -- Valkey (praisonai[valkey]) - -These implementations are kept in the wrapper to avoid bloating the core SDK. -""" - -# Lazy imports - only import when needed -__all__ = [ - "RedisStorageAdapter", - "MongoDBStorageAdapter", - "PostgreSQLStorageAdapter", - "DynamoDBStorageAdapter", - "ValkeyStorageAdapter", - "ValkeySearchBackend", - "ValkeyBackend", -] - -def __getattr__(name: str): - """Lazy import storage adapters.""" - if name == "RedisStorageAdapter": - from .redis_adapter import RedisStorageAdapter - return RedisStorageAdapter - elif name == "MongoDBStorageAdapter": - from .mongodb_adapter import MongoDBStorageAdapter - return MongoDBStorageAdapter - elif name == "PostgreSQLStorageAdapter": - from .postgresql_adapter import PostgreSQLStorageAdapter - return PostgreSQLStorageAdapter - elif name == "DynamoDBStorageAdapter": - from .dynamodb_adapter import DynamoDBStorageAdapter - return DynamoDBStorageAdapter - elif name == "ValkeyStorageAdapter": - from .valkey_adapter import ValkeyStorageAdapter - return ValkeyStorageAdapter - elif name == "ValkeySearchBackend": - from .valkey_adapter import ValkeySearchBackend - return ValkeySearchBackend - elif name == "ValkeyBackend": - cls = _make_valkey_backend_class() - globals()["ValkeyBackend"] = cls - return cls - else: - raise AttributeError(f"module '{__name__}' has no attribute '{name}'") - - -def _make_valkey_backend_class(): - """Return ValkeyBackend — ValkeyStorageAdapter pre-configured from env vars.""" - import os - from .valkey_adapter import ValkeyStorageAdapter - - class ValkeyBackend(ValkeyStorageAdapter): - def __init__( - self, - host: str = None, - port: int = None, - prefix: str = None, - ttl=None, - password: str = None, - ): - super().__init__( - host=host if host is not None else os.environ.get("VALKEY_HOST", "localhost"), - port=int(port if port is not None else os.environ.get("VALKEY_PORT", 6379)), - prefix=prefix if prefix is not None else os.environ.get("VALKEY_PREFIX", "praisonai:"), - ttl=ttl, - password=password if password is not None else os.environ.get("VALKEY_PASSWORD"), - ) - - return ValkeyBackend diff --git a/src/praisonai/praisonai/storage/dynamodb_adapter.py b/src/praisonai/praisonai/storage/dynamodb_adapter.py deleted file mode 100644 index b02221e00..000000000 --- a/src/praisonai/praisonai/storage/dynamodb_adapter.py +++ /dev/null @@ -1,287 +0,0 @@ -""" -DynamoDB Storage Adapter for PraisonAI. - -Implements StorageBackendProtocol using AWS DynamoDB for NoSQL storage. -This is the wrapper implementation that contains the heavy AWS SDK dependency. -""" - -import json -import time -from typing import Dict, Any, List, Optional - - -class DynamoDBStorageAdapter: - """ - DynamoDB-based storage backend adapter. - - Uses AWS DynamoDB for scalable NoSQL data storage. - Implements StorageBackendProtocol from praisonaiagents.storage.protocols. - - Example: - ```python - from praisonai.storage import DynamoDBStorageAdapter - - adapter = DynamoDBStorageAdapter( - table_name="praisonai-storage", - region_name="us-east-1" - ) - adapter.save("session_123", {"messages": []}) - data = adapter.load("session_123") - ``` - """ - - def __init__( - self, - table_name: str = "praisonai-storage", - region_name: str = "us-east-1", - aws_access_key_id: Optional[str] = None, - aws_secret_access_key: Optional[str] = None, - aws_session_token: Optional[str] = None, - endpoint_url: Optional[str] = None, - read_capacity_units: int = 5, - write_capacity_units: int = 5, - auto_create_table: bool = True, - ): - """ - Initialize the DynamoDB storage adapter. - - Args: - table_name: DynamoDB table name - region_name: AWS region name - aws_access_key_id: AWS access key ID (optional, can use env/IAM) - aws_secret_access_key: AWS secret access key (optional, can use env/IAM) - aws_session_token: AWS session token (optional) - endpoint_url: Custom endpoint URL (for local DynamoDB) - read_capacity_units: Read capacity units for table creation - write_capacity_units: Write capacity units for table creation - auto_create_table: Whether to auto-create table if it doesn't exist - """ - self.table_name = table_name - self.region_name = region_name - self.aws_access_key_id = aws_access_key_id - self.aws_secret_access_key = aws_secret_access_key - self.aws_session_token = aws_session_token - self.endpoint_url = endpoint_url - self.read_capacity_units = read_capacity_units - self.write_capacity_units = write_capacity_units - self.auto_create_table = auto_create_table - self._dynamodb = None - self._table = None - - def _get_table(self): - """Lazy initialize DynamoDB client and table.""" - if self._table is None: - try: - import boto3 - from botocore.exceptions import ClientError - except ImportError: - raise ImportError( - "DynamoDB storage adapter requires the 'boto3' package. " - "Install with: pip install 'praisonai[dynamodb]'" - ) - - # Build session/client parameters - session_kwargs = {} - if self.aws_access_key_id: - session_kwargs["aws_access_key_id"] = self.aws_access_key_id - if self.aws_secret_access_key: - session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key - if self.aws_session_token: - session_kwargs["aws_session_token"] = self.aws_session_token - if self.region_name: - session_kwargs["region_name"] = self.region_name - - # Create session and client - session = boto3.Session(**session_kwargs) - client_kwargs = {} - if self.endpoint_url: - client_kwargs["endpoint_url"] = self.endpoint_url - - self._dynamodb = session.resource("dynamodb", **client_kwargs) - self._table = self._dynamodb.Table(self.table_name) - - # Check if table exists and create if needed - if self.auto_create_table: - try: - self._table.load() - except ClientError as e: - if e.response["Error"]["Code"] == "ResourceNotFoundException": - self._create_table() - else: - raise RuntimeError(f"Failed to access DynamoDB table: {e}") from e - - return self._table - - def _create_table(self) -> None: - """Create DynamoDB table if it doesn't exist.""" - try: - table = self._dynamodb.create_table( - TableName=self.table_name, - KeySchema=[ - {"AttributeName": "key", "KeyType": "HASH"}, # Partition key - ], - AttributeDefinitions=[ - {"AttributeName": "key", "AttributeType": "S"}, - ], - BillingMode="PROVISIONED", - ProvisionedThroughput={ - "ReadCapacityUnits": self.read_capacity_units, - "WriteCapacityUnits": self.write_capacity_units, - }, - ) - - # Wait for table to be created - table.wait_until_exists() - self._table = table - - except Exception as e: - raise RuntimeError(f"Failed to create DynamoDB table: {e}") from e - - def save(self, key: str, data: Dict[str, Any]) -> None: - """Save data with the given key.""" - table = self._get_table() - - try: - # Convert data to JSON string for storage - json_data = json.dumps(data, default=str, ensure_ascii=False) - timestamp = int(time.time()) - - table.put_item( - Item={ - "key": key, - "data": json_data, - "updated_at": timestamp, - "created_at": timestamp, - } - ) - - except Exception as e: - raise RuntimeError(f"Failed to save data to DynamoDB: {e}") from e - - def load(self, key: str) -> Optional[Dict[str, Any]]: - """Load data by key.""" - table = self._get_table() - - try: - response = table.get_item(Key={"key": key}) - - if "Item" in response: - item = response["Item"] - if "data" in item: - try: - return json.loads(item["data"]) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON data for key '{key}': {e}") from e - return None - - except Exception as e: - raise RuntimeError(f"Failed to load data from DynamoDB: {e}") from e - - def delete(self, key: str) -> bool: - """Delete data by key.""" - table = self._get_table() - - try: - response = table.delete_item( - Key={"key": key}, - ReturnValues="ALL_OLD" - ) - - # Return True if item was actually deleted - return "Attributes" in response - - except Exception as e: - raise RuntimeError(f"Failed to delete data from DynamoDB: {e}") from e - - def list_keys(self, prefix: str = "") -> List[str]: - """List all keys, optionally filtered by prefix.""" - table = self._get_table() - - try: - from boto3.dynamodb.conditions import Attr - - keys = [] - scan_kwargs: Dict[str, Any] = { - "ProjectionExpression": "#k", - "ExpressionAttributeNames": {"#k": "key"}, - } - - if prefix: - scan_kwargs["FilterExpression"] = Attr("key").begins_with(prefix) - - # Scan table (note: can be expensive for large tables) - response = table.scan(**scan_kwargs) - keys.extend([item["key"] for item in response["Items"]]) - - # Handle pagination - while "LastEvaluatedKey" in response: - scan_kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"] - response = table.scan(**scan_kwargs) - keys.extend([item["key"] for item in response["Items"]]) - - return sorted(keys) - - except Exception as e: - raise RuntimeError(f"Failed to list keys from DynamoDB: {e}") from e - - def exists(self, key: str) -> bool: - """Check if a key exists.""" - table = self._get_table() - - try: - response = table.get_item( - Key={"key": key}, - ProjectionExpression="#k", - ExpressionAttributeNames={"#k": "key"} - ) - - return "Item" in response - - except Exception as e: - raise RuntimeError(f"Failed to check key existence in DynamoDB: {e}") from e - - def clear(self) -> int: - """Clear all data. Returns number of items deleted.""" - table = self._get_table() - - try: - # First scan to get all keys - scan_response = table.scan(ProjectionExpression="#k", ExpressionAttributeNames={"#k": "key"}) - items_to_delete = scan_response["Items"] - - # Handle pagination - while "LastEvaluatedKey" in scan_response: - scan_response = table.scan( - ProjectionExpression="#k", - ExpressionAttributeNames={"#k": "key"}, - ExclusiveStartKey=scan_response["LastEvaluatedKey"] - ) - items_to_delete.extend(scan_response["Items"]) - - count = len(items_to_delete) - - # Delete items in batches (DynamoDB batch_writer handles batching) - with table.batch_writer() as batch: - for item in items_to_delete: - batch.delete_item(Key={"key": item["key"]}) - - return count - - except Exception as e: - raise RuntimeError(f"Failed to clear data from DynamoDB: {e}") from e - - def ping(self) -> bool: - """Test connection to DynamoDB.""" - try: - table = self._get_table() - # Try to describe the table - table.load() - return True - except Exception: - return False - - def close(self) -> None: - """Close DynamoDB resources.""" - # DynamoDB client connections are managed by boto3, no explicit close needed - self._table = None - self._dynamodb = None \ No newline at end of file diff --git a/src/praisonai/praisonai/storage/mongodb_adapter.py b/src/praisonai/praisonai/storage/mongodb_adapter.py deleted file mode 100644 index 161a975a8..000000000 --- a/src/praisonai/praisonai/storage/mongodb_adapter.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -MongoDB Storage Adapter for PraisonAI. - -Implements StorageBackendProtocol using MongoDB for document storage. -This is the wrapper implementation that contains the heavy MongoDB dependency. -""" - -import json -import re -import time -from typing import Dict, Any, List, Optional - - -class MongoDBStorageAdapter: - """ - MongoDB-based storage backend adapter. - - Uses MongoDB for document-oriented data storage. - Implements StorageBackendProtocol from praisonaiagents.storage.protocols. - - Example: - ```python - from praisonai.storage import MongoDBStorageAdapter - - adapter = MongoDBStorageAdapter( - url="mongodb://localhost:27017/", - database="praisonai" - ) - adapter.save("session_123", {"messages": []}) - data = adapter.load("session_123") - ``` - """ - - def __init__( - self, - url: str = "mongodb://localhost:27017/", - database: str = "praisonai", - collection: str = "storage", - max_pool_size: int = 50, - min_pool_size: int = 5, - max_idle_time_ms: int = 30000, - server_selection_timeout_ms: int = 5000, - username: Optional[str] = None, - password: Optional[str] = None, - ): - """ - Initialize the MongoDB storage adapter. - - Args: - url: MongoDB connection URL - database: Database name - collection: Collection name for storing data - max_pool_size: Maximum connection pool size - min_pool_size: Minimum connection pool size - max_idle_time_ms: Maximum idle time for connections - server_selection_timeout_ms: Server selection timeout - username: Optional username for authentication - password: Optional password for authentication - """ - self.url = url - self.database = database - self.collection_name = collection - self.max_pool_size = max_pool_size - self.min_pool_size = min_pool_size - self.max_idle_time_ms = max_idle_time_ms - self.server_selection_timeout_ms = server_selection_timeout_ms - self.username = username - self.password = password - self._client = None - self._collection = None - - def _get_collection(self): - """Lazy initialize MongoDB client and collection.""" - if self._collection is None: - try: - import pymongo - from pymongo import MongoClient - except ImportError: - raise ImportError( - "MongoDB storage adapter requires the 'pymongo' package. " - "Install with: pip install 'praisonai[mongodb]'" - ) - - # Build connection parameters - client_kwargs = { - "maxPoolSize": self.max_pool_size, - "minPoolSize": self.min_pool_size, - "maxIdleTimeMS": self.max_idle_time_ms, - "serverSelectionTimeoutMS": self.server_selection_timeout_ms, - "retryWrites": True, - "retryReads": True, - } - - if self.username and self.password: - client_kwargs["username"] = self.username - client_kwargs["password"] = self.password - - self._client = MongoClient(self.url, **client_kwargs) - db = self._client[self.database] - self._collection = db[self.collection_name] - - # Create index for better performance - try: - self._collection.create_index("_id", unique=True) - self._collection.create_index("updated_at") - except Exception: - # Index creation can fail if it already exists, which is fine - pass - - return self._collection - - def save(self, key: str, data: Dict[str, Any]) -> None: - """Save data with the given key (upsert).""" - collection = self._get_collection() - - try: - # Store as JSON string for consistency - json_data = json.dumps(data, default=str, ensure_ascii=False) - now = time.time() - - collection.update_one( - {"_id": key}, - { - "$set": { - "data": json_data, - "updated_at": now, - }, - "$setOnInsert": { - "created_at": now, - }, - }, - upsert=True, - ) - except Exception as e: - raise RuntimeError(f"Failed to save data to MongoDB: {e}") from e - - def load(self, key: str) -> Optional[Dict[str, Any]]: - """Load data by key.""" - collection = self._get_collection() - - try: - doc = collection.find_one({"_id": key}) - if doc and "data" in doc: - try: - return json.loads(doc["data"]) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON data for key '{key}': {e}") from e - return None - except Exception as e: - raise RuntimeError(f"Failed to load data from MongoDB: {e}") from e - - def delete(self, key: str) -> bool: - """Delete data by key.""" - collection = self._get_collection() - - try: - result = collection.delete_one({"_id": key}) - return result.deleted_count > 0 - except Exception as e: - raise RuntimeError(f"Failed to delete data from MongoDB: {e}") from e - - def list_keys(self, prefix: str = "") -> List[str]: - """List all keys, optionally filtered by prefix.""" - collection = self._get_collection() - - try: - if prefix: - # Escape regex metacharacters to prevent ReDoS / unexpected matches - escaped = re.escape(prefix) - cursor = collection.find( - {"_id": {"$regex": f"^{escaped}"}}, - {"_id": 1} - ).sort("_id", 1) - else: - cursor = collection.find({}, {"_id": 1}).sort("_id", 1) - - return [doc["_id"] for doc in cursor] - except Exception as e: - raise RuntimeError(f"Failed to list keys from MongoDB: {e}") from e - - def exists(self, key: str) -> bool: - """Check if a key exists.""" - collection = self._get_collection() - - try: - return collection.count_documents({"_id": key}, limit=1) > 0 - except Exception as e: - raise RuntimeError(f"Failed to check key existence in MongoDB: {e}") from e - - def clear(self) -> int: - """Clear all data. Returns number of items deleted.""" - collection = self._get_collection() - - try: - result = collection.delete_many({}) - return result.deleted_count - except Exception as e: - raise RuntimeError(f"Failed to clear data from MongoDB: {e}") from e - - def ping(self) -> bool: - """Test connection to MongoDB.""" - try: - collection = self._get_collection() - # Try to run a simple command - collection.database.client.admin.command('ping') - return True - except Exception: - return False - - def close(self) -> None: - """Close the MongoDB connection.""" - if self._client: - try: - self._client.close() - finally: - self._client = None - self._collection = None \ No newline at end of file diff --git a/src/praisonai/praisonai/storage/postgresql_adapter.py b/src/praisonai/praisonai/storage/postgresql_adapter.py deleted file mode 100644 index cd4e31cc3..000000000 --- a/src/praisonai/praisonai/storage/postgresql_adapter.py +++ /dev/null @@ -1,353 +0,0 @@ -""" -PostgreSQL Storage Adapter for PraisonAI. - -Implements StorageBackendProtocol using PostgreSQL for relational storage. -This is the wrapper implementation that contains the heavy PostgreSQL dependency. -""" - -import json -import re -import time -import threading -from typing import Dict, Any, List, Optional - -_SAFE_IDENTIFIER_RE = re.compile(r'^[A-Za-z_][A-Za-z0-9_]{0,62}$') - - -def _validate_identifier(name: str, kind: str = "identifier") -> str: - """Validate that a SQL identifier is safe (alphanumeric + underscore, no spaces).""" - if not _SAFE_IDENTIFIER_RE.match(name): - raise ValueError( - f"Invalid SQL {kind} {name!r}. " - "Must start with a letter or underscore and contain only letters, digits, or underscores." - ) - return name - - -class PostgreSQLStorageAdapter: - """ - PostgreSQL-based storage backend adapter. - - Uses PostgreSQL for robust, ACID-compliant data storage. - Implements StorageBackendProtocol from praisonaiagents.storage.protocols. - - Example: - ```python - from praisonai.storage import PostgreSQLStorageAdapter - - adapter = PostgreSQLStorageAdapter( - host="localhost", - database="praisonai", - user="postgres", - password="password" - ) - adapter.save("session_123", {"messages": []}) - data = adapter.load("session_123") - ``` - """ - - def __init__( - self, - host: str = "localhost", - port: int = 5432, - database: str = "praisonai", - user: str = "postgres", - password: str = "", - table: str = "praisonai_storage", - sslmode: str = "prefer", - connect_timeout: int = 10, - command_timeout: int = 30, - max_connections: int = 20, - ): - """ - Initialize the PostgreSQL storage adapter. - - Args: - host: PostgreSQL server host - port: PostgreSQL server port - database: Database name - user: Database user - password: Database password - table: Table name for storing data - sslmode: SSL mode (disable, allow, prefer, require, verify-ca, verify-full) - connect_timeout: Connection timeout in seconds - command_timeout: Command timeout in seconds - max_connections: Maximum number of connections in pool - """ - self.host = host - self.port = port - self.database = database - self.user = user - self.password = password - # Validate table name to prevent SQL injection - self.table = _validate_identifier(table, "table name") - self.sslmode = sslmode - self.connect_timeout = connect_timeout - self.command_timeout = command_timeout - self.max_connections = max_connections - self._pool = None - self._lock = threading.Lock() - - def _get_pool(self): - """Lazy initialize connection pool.""" - if self._pool is None: - with self._lock: - if self._pool is None: # Double-check locking - try: - import psycopg2 - from psycopg2 import pool - except ImportError: - raise ImportError( - "PostgreSQL storage adapter requires the 'psycopg2' package. " - "Install with: pip install 'praisonai[postgresql]'" - ) - - try: - self._pool = pool.ThreadedConnectionPool( - minconn=1, - maxconn=self.max_connections, - host=self.host, - port=self.port, - database=self.database, - user=self.user, - password=self.password, - sslmode=self.sslmode, - connect_timeout=self.connect_timeout, - ) - - # Create table if it doesn't exist - self._create_table() - - except Exception as e: - raise RuntimeError(f"Failed to create PostgreSQL connection pool: {e}") from e - - return self._pool - - def _create_table(self) -> None: - """Create storage table if it doesn't exist.""" - pool = self._pool - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - # Create table with proper indexing - cur.execute(f""" - CREATE TABLE IF NOT EXISTS {self.table} ( - key VARCHAR(255) PRIMARY KEY, - data JSONB NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - """) - - # Create indexes for better performance - cur.execute(f""" - CREATE INDEX IF NOT EXISTS idx_{self.table}_updated_at - ON {self.table}(updated_at); - """) - - # Create a function and trigger for updated_at - cur.execute(f""" - CREATE OR REPLACE FUNCTION update_updated_at_column() - RETURNS TRIGGER AS $$ - BEGIN - NEW.updated_at = CURRENT_TIMESTAMP; - RETURN NEW; - END; - $$ language 'plpgsql'; - """) - - cur.execute(f""" - DROP TRIGGER IF EXISTS update_{self.table}_updated_at ON {self.table}; - CREATE TRIGGER update_{self.table}_updated_at - BEFORE UPDATE ON {self.table} - FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column(); - """) - - conn.commit() - - except Exception as e: - if conn: - conn.rollback() - raise RuntimeError(f"Failed to create PostgreSQL table: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def save(self, key: str, data: Dict[str, Any]) -> None: - """Save data with the given key (upsert).""" - pool = self._get_pool() - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - # Convert to JSON for storage - json_data = json.dumps(data, default=str, ensure_ascii=False) - - # Use ON CONFLICT for upsert behavior - cur.execute(f""" - INSERT INTO {self.table} (key, data, created_at, updated_at) - VALUES (%s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (key) DO UPDATE SET - data = EXCLUDED.data, - updated_at = CURRENT_TIMESTAMP; - """, (key, json_data)) - - conn.commit() - - except Exception as e: - if conn: - conn.rollback() - raise RuntimeError(f"Failed to save data to PostgreSQL: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def load(self, key: str) -> Optional[Dict[str, Any]]: - """Load data by key.""" - pool = self._get_pool() - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - cur.execute(f""" - SELECT data FROM {self.table} WHERE key = %s; - """, (key,)) - - row = cur.fetchone() - if row: - try: - # Data is stored as JSON string - return json.loads(row[0]) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON data for key '{key}': {e}") from e - return None - - except Exception as e: - raise RuntimeError(f"Failed to load data from PostgreSQL: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def delete(self, key: str) -> bool: - """Delete data by key.""" - pool = self._get_pool() - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - cur.execute(f""" - DELETE FROM {self.table} WHERE key = %s; - """, (key,)) - - deleted = cur.rowcount > 0 - conn.commit() - return deleted - - except Exception as e: - if conn: - conn.rollback() - raise RuntimeError(f"Failed to delete data from PostgreSQL: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def list_keys(self, prefix: str = "") -> List[str]: - """List all keys, optionally filtered by prefix.""" - pool = self._get_pool() - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - if prefix: - cur.execute(f""" - SELECT key FROM {self.table} - WHERE key LIKE %s - ORDER BY key; - """, (f"{prefix}%",)) - else: - cur.execute(f""" - SELECT key FROM {self.table} - ORDER BY key; - """) - - return [row[0] for row in cur.fetchall()] - - except Exception as e: - raise RuntimeError(f"Failed to list keys from PostgreSQL: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def exists(self, key: str) -> bool: - """Check if a key exists.""" - pool = self._get_pool() - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - cur.execute(f""" - SELECT 1 FROM {self.table} WHERE key = %s LIMIT 1; - """, (key,)) - - return cur.fetchone() is not None - - except Exception as e: - raise RuntimeError(f"Failed to check key existence in PostgreSQL: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def clear(self) -> int: - """Clear all data. Returns number of items deleted.""" - pool = self._get_pool() - conn = None - - try: - conn = pool.getconn() - with conn.cursor() as cur: - # Count first - cur.execute(f"SELECT COUNT(*) FROM {self.table};") - count = cur.fetchone()[0] - - # Then delete all - cur.execute(f"DELETE FROM {self.table};") - conn.commit() - - return count - - except Exception as e: - if conn: - conn.rollback() - raise RuntimeError(f"Failed to clear data from PostgreSQL: {e}") from e - finally: - if conn: - pool.putconn(conn) - - def ping(self) -> bool: - """Test connection to PostgreSQL.""" - try: - pool = self._get_pool() - conn = pool.getconn() - try: - with conn.cursor() as cur: - cur.execute("SELECT 1;") - return cur.fetchone() is not None - finally: - pool.putconn(conn) - except Exception: - return False - - def close(self) -> None: - """Close all connections in the pool.""" - if self._pool: - try: - self._pool.closeall() - finally: - self._pool = None \ No newline at end of file diff --git a/src/praisonai/praisonai/storage/redis_adapter.py b/src/praisonai/praisonai/storage/redis_adapter.py deleted file mode 100644 index 90f93184f..000000000 --- a/src/praisonai/praisonai/storage/redis_adapter.py +++ /dev/null @@ -1,197 +0,0 @@ -""" -Redis Storage Adapter for PraisonAI. - -Implements StorageBackendProtocol using Redis for high-speed storage. -This is the wrapper implementation that contains the heavy Redis dependency. -""" - -import json -from typing import Dict, Any, List, Optional - - -class RedisStorageAdapter: - """ - Redis-based storage backend adapter. - - Uses Redis for high-speed caching and ephemeral data storage. - Implements StorageBackendProtocol from praisonaiagents.storage.protocols. - - Example: - ```python - from praisonai.storage import RedisStorageAdapter - - adapter = RedisStorageAdapter(url="redis://localhost:6379") - adapter.save("session_123", {"messages": []}) - data = adapter.load("session_123") - ``` - """ - - def __init__( - self, - url: str = "redis://localhost:6379", - prefix: str = "praisonai:", - ttl: Optional[int] = None, - db: int = 0, - password: Optional[str] = None, - socket_timeout: float = 5.0, - socket_connect_timeout: float = 5.0, - retry_on_timeout: bool = True, - ): - """ - Initialize the Redis storage adapter. - - Args: - url: Redis connection URL - prefix: Key prefix for all stored data - ttl: Optional TTL in seconds for all keys - db: Redis database number - password: Optional Redis password - socket_timeout: Socket timeout in seconds - socket_connect_timeout: Socket connection timeout in seconds - retry_on_timeout: Whether to retry on timeout - """ - self.url = url - self.prefix = prefix - self.ttl = ttl - self.db = db - self.password = password - self.socket_timeout = socket_timeout - self.socket_connect_timeout = socket_connect_timeout - self.retry_on_timeout = retry_on_timeout - self._client = None - - def _get_client(self): - """Lazy initialize Redis client.""" - if self._client is None: - try: - import redis - except ImportError: - raise ImportError( - "Redis storage adapter requires the 'redis' package. " - "Install with: pip install 'praisonai[redis]'" - ) - - # Parse URL and add additional parameters - self._client = redis.from_url( - self.url, - db=self.db, - password=self.password, - socket_timeout=self.socket_timeout, - socket_connect_timeout=self.socket_connect_timeout, - retry_on_timeout=self.retry_on_timeout, - decode_responses=False, # We handle encoding/decoding manually - ) - return self._client - - def _make_key(self, key: str) -> str: - """Create prefixed key.""" - return f"{self.prefix}{key}" - - def save(self, key: str, data: Dict[str, Any]) -> None: - """Save data with the given key.""" - client = self._get_client() - full_key = self._make_key(key) - json_data = json.dumps(data, default=str, ensure_ascii=False).encode('utf-8') - - try: - if self.ttl: - client.setex(full_key, self.ttl, json_data) - else: - client.set(full_key, json_data) - except Exception as e: - raise RuntimeError(f"Failed to save data to Redis: {e}") from e - - def load(self, key: str) -> Optional[Dict[str, Any]]: - """Load data by key.""" - client = self._get_client() - full_key = self._make_key(key) - - try: - value = client.get(full_key) - if value: - try: - # Handle both bytes and string values - if isinstance(value, bytes): - value = value.decode('utf-8') - return json.loads(value) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON data for key '{key}': {e}") from e - return None - except Exception as e: - raise RuntimeError(f"Failed to load data from Redis: {e}") from e - - def delete(self, key: str) -> bool: - """Delete data by key.""" - client = self._get_client() - full_key = self._make_key(key) - - try: - return client.delete(full_key) > 0 - except Exception as e: - raise RuntimeError(f"Failed to delete data from Redis: {e}") from e - - def list_keys(self, prefix: str = "") -> List[str]: - """List all keys, optionally filtered by prefix.""" - client = self._get_client() - pattern = self._make_key(f"{prefix}*") - - try: - keys = [] - for key in client.keys(pattern): - # Remove the prefix to return clean keys - key_str = key.decode('utf-8') if isinstance(key, bytes) else key - clean_key = key_str[len(self.prefix):] - keys.append(clean_key) - - return sorted(keys) - except Exception as e: - raise RuntimeError(f"Failed to list keys from Redis: {e}") from e - - def exists(self, key: str) -> bool: - """Check if a key exists.""" - client = self._get_client() - full_key = self._make_key(key) - - try: - return client.exists(full_key) > 0 - except Exception as e: - raise RuntimeError(f"Failed to check key existence in Redis: {e}") from e - - def clear(self) -> int: - """Clear all data with our prefix. Returns number of items deleted.""" - client = self._get_client() - pattern = self._make_key("*") - - try: - keys = list(client.keys(pattern)) - if keys: - return client.delete(*keys) - return 0 - except Exception as e: - raise RuntimeError(f"Failed to clear data from Redis: {e}") from e - - def set_ttl(self, key: str, ttl: int) -> bool: - """Set TTL on a specific key.""" - client = self._get_client() - full_key = self._make_key(key) - - try: - return client.expire(full_key, ttl) - except Exception as e: - raise RuntimeError(f"Failed to set TTL in Redis: {e}") from e - - def ping(self) -> bool: - """Test connection to Redis.""" - try: - client = self._get_client() - return client.ping() - except Exception: - return False - - def close(self) -> None: - """Close the Redis connection.""" - if self._client: - try: - self._client.close() - finally: - self._client = None \ No newline at end of file diff --git a/src/praisonai/praisonai/storage/valkey_adapter.py b/src/praisonai/praisonai/storage/valkey_adapter.py index d39843aee..1f0001632 100644 --- a/src/praisonai/praisonai/storage/valkey_adapter.py +++ b/src/praisonai/praisonai/storage/valkey_adapter.py @@ -40,7 +40,7 @@ class ValkeyStorageAdapter: Example: ```python - from praisonai.storage import ValkeyStorageAdapter + from praisonai.storage.valkey_adapter import ValkeyStorageAdapter adapter = ValkeyStorageAdapter(host="localhost", port=6379) adapter.save("session_123", {"messages": []}) @@ -219,7 +219,7 @@ class ValkeySearchBackend: Example: ```python - from praisonai.storage import ValkeySearchBackend + from praisonai.storage.valkey_adapter import ValkeySearchBackend backend = ValkeySearchBackend(host="localhost", port=6379, vector_dim=1536) backend.create_index() diff --git a/src/praisonai/tests/unit/test_agents_generator_safe_loader.py b/src/praisonai/tests/unit/test_agents_generator_safe_loader.py deleted file mode 100644 index be910e6bb..000000000 --- a/src/praisonai/tests/unit/test_agents_generator_safe_loader.py +++ /dev/null @@ -1,17 +0,0 @@ -"""load_tools_from_module must use gated safe loader.""" - -from __future__ import annotations - -from unittest.mock import patch - -import pytest - - -def test_load_tools_from_module_returns_empty_when_blocked(): - from praisonai.agents_generator import AgentsGenerator - - gen = object.__new__(AgentsGenerator) - with patch( - "praisonai._safe_loader.load_user_module", return_value=None - ): - assert gen.load_tools_from_module("/tmp/evil_tools.py") == {}