diff --git a/src/praisonai-agents/praisonaiagents/__init__.py b/src/praisonai-agents/praisonaiagents/__init__.py index 2205fbfd6..d4e43c96a 100644 --- a/src/praisonai-agents/praisonaiagents/__init__.py +++ b/src/praisonai-agents/praisonaiagents/__init__.py @@ -232,6 +232,7 @@ def _get_lazy_cache(): # Agent classes 'Agent': ('praisonaiagents.agent.agent', 'Agent'), + 'RetryBackoffConfig': ('praisonaiagents.agent.retry_utils', 'RetryBackoffConfig'), 'BudgetExceededError': ('praisonaiagents.errors', 'BudgetExceededError'), # Error hierarchy - structured exception handling @@ -783,6 +784,7 @@ def warmup(include_litellm: bool = False, include_openai: bool = True) -> dict: # Core classes - the essentials 'Agent', + 'RetryBackoffConfig', 'AgentTeam', # Primary class for multi-agent coordination (v1.0+) 'AgentManager', # Silent alias for AgentTeam 'Agents', # Deprecated alias for AgentTeam (emits warning) diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index f7a8a6ce0..4dfff40d6 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -259,6 +259,9 @@ def _get_default_server_registry() -> ServerRegistry: # Import structured error from central errors module from ..errors import BudgetExceededError +# Import retry configuration +from .retry_utils import RetryBackoffConfig + class Agent(SteeringMixin, SandboxMixin, UnifiedExecutionMixin, ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin, AsyncMemoryMixin): # Class-level counter for generating unique display names for nameless agents _agent_counter = 0 @@ -595,6 +598,7 @@ def __init__( tool_search: Optional[Union[bool, str, Dict[str, Any], 'ToolSearchConfig']] = False, # Progressive tool disclosure message_steering: Optional[Union[bool, 'MessageSteeringProtocol']] = False, # Real-time message steering during execution sandbox: Optional[Union[bool, 'SandboxConfig']] = None, # Sandbox for safe code execution + retry: Optional[Union[bool, Dict[str, Any], 'RetryBackoffConfig']] = None, # Retry configuration with exponential backoff ): """Initialize an Agent instance. @@ -792,6 +796,8 @@ def __init__( if autonomy is None: # AutonomyConfig is in agent/autonomy.py - use dict for config defaults autonomy = apply_config_defaults("autonomy", autonomy, None) + if retry is None: + retry = apply_config_defaults("retry", retry, RetryBackoffConfig) # ============================================================ # DEPRECATION WARNINGS for params consolidated into configs @@ -1978,6 +1984,16 @@ def __init__( # Store tool retry policy for tool execution with exponential backoff self._tool_retry_policy = _tool_config.retry_policy if _tool_config else None + # Retry configuration with jittered exponential backoff + if isinstance(retry, RetryBackoffConfig): + self._retry_config = retry + elif isinstance(retry, dict): + self._retry_config = RetryBackoffConfig(**retry) + elif retry is True: + self._retry_config = RetryBackoffConfig() # Use defaults + else: + self._retry_config = None # No retry configuration + # Cache for system prompts and formatted tools with eager thread-safe lock # Use OrderedDict for LRU behavior self._system_prompt_cache = OrderedDict() @@ -2201,6 +2217,9 @@ def clone_for_channel(self) -> "Agent": # Tool configuration - use consolidated config when available 'tool_config': getattr(self, '_tool_config', None), + # Retry configuration + 'retry': getattr(self, '_retry_config', None), + # CLI backend 'cli_backend': getattr(self, '_cli_backend', None), diff --git a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py index 8e5497bee..db149d524 100644 --- a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py +++ b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py @@ -1039,7 +1039,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r try: # First attempt: try with streaming enabled for better user experience stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None - final_response = self._execute_unified_chat_completion( + final_response = self._chat_completion_with_retry( messages=messages, temperature=temperature, tools=formatted_tools, @@ -1060,7 +1060,11 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r stream = False # Set for the main execution below else: raise # Re-raise if it's a different ValueError - except Exception: + except Exception as e: + from ..errors import LLMError + # Don't retry if it's an LLMError that has exhausted retries + if isinstance(e, LLMError): + raise # Re-raise LLMErrors immediately to avoid double retry # For any other exception, fall back to non-streaming logging.debug(f"{self.name}: Streaming attempt failed, falling back to non-streaming") stream = False # Set for the main execution below @@ -1071,7 +1075,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r # UNIFIED: Single protocol-driven dispatch path (fixes DRY violation) # All LLM providers now go through unified dispatcher for consistency and maintainability stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None - final_response = self._execute_unified_chat_completion( + final_response = self._chat_completion_with_retry( messages=messages, temperature=temperature, tools=formatted_tools, @@ -3003,7 +3007,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda response_format = self._build_response_format(schema_model) # Use composition instead of runtime class mutation for safety - response = await self._execute_unified_achat_completion( + response = await self._achat_completion_with_retry( messages=messages, temperature=temperature, tools=formatted_tools, @@ -3175,7 +3179,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda {"role": "user", "content": "Now regenerate your response using the reflection you made"} ] - new_response = await self._execute_unified_achat_completion( + new_response = await self._achat_completion_with_retry( messages=regenerate_messages, temperature=temperature, tools=formatted_tools, @@ -4058,3 +4062,156 @@ async def _apply_context_compaction_async(self, messages, hook_event_class): raise logging.debug(f"[compaction] skipped (non-fatal): {_ce}") return False + + def _chat_completion_with_retry(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, stream_callback=None, emit_events=True): + """ + Wrapper for _execute_unified_chat_completion that adds jittered exponential backoff retry logic. + + This method wraps the unified chat completion call and adds retry capability for + transient failures like rate limits, network errors, and service outages. + """ + retry_config = getattr(self, '_retry_config', None) + if not retry_config: + return self._execute_unified_chat_completion(messages, temperature, tools, stream, reasoning_steps, + task_name, task_description, task_id, response_format, + stream_callback=stream_callback, emit_events=emit_events) + + from .retry_utils import jittered_backoff + from ..hooks import HookEvent, OnRetryInput + import time + + max_attempts = retry_config.max_retries + 1 + + for attempt in range(max_attempts): + try: + # Call the underlying unified chat completion directly to avoid infinite recursion + return self._execute_unified_chat_completion(messages, temperature, tools, stream, reasoning_steps, + task_name, task_description, task_id, response_format, + stream_callback=stream_callback, emit_events=emit_events) + + except Exception as e: + from ..errors import LLMError + + # Only retry LLMErrors that are marked as retryable + if not isinstance(e, LLMError) or not e.is_retryable: + raise # Re-raise non-retryable errors immediately + + # If this is the last attempt, re-raise the error + if attempt >= max_attempts - 1: + raise + + # Calculate delay for this retry attempt + delay = jittered_backoff( + attempt, + base_delay=retry_config.base_delay, + max_delay=retry_config.max_delay, + jitter_ratio=retry_config.jitter_ratio, + ) + + # Fire OnRetry hook with delay information + retry_input = OnRetryInput( + session_id=getattr(self, '_session_id', 'default'), + cwd=os.getcwd(), + event_name=HookEvent.ON_RETRY, + timestamp=str(time.time()), + agent_name=self.name, + retry_count=attempt + 1, + max_retries=retry_config.max_retries, + error_message=str(e), + operation="llm_request", + delay_seconds=delay, + attempt=attempt + ) + self._hook_runner.execute_sync(HookEvent.ON_RETRY, retry_input) + + # Log retry attempt (buffered to avoid spam during transient failures) + logger.debug(f"[{self.name}] Retry {attempt + 1}/{max_attempts} after {delay:.1f}s: {str(e)[:100]}") + + # Sleep with interrupt awareness - make interruption terminal + interrupt_fn = getattr(self, '_is_interrupted', lambda: False) + sleep_start = time.time() + while time.time() - sleep_start < delay: + if interrupt_fn(): + # Interruption is terminal - stop retrying + raise RuntimeError("Agent interrupted during retry backoff") + time.sleep(min(0.2, delay - (time.time() - sleep_start))) + + # This should never be reached, but just in case + raise RuntimeError("Retry loop completed without returning or raising an exception") + + async def _achat_completion_with_retry(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, stream_callback=None, emit_events=True): + """ + Async wrapper for _execute_unified_achat_completion that adds jittered exponential backoff retry logic. + + This method wraps the async chat completion call and adds retry capability for + transient failures like rate limits, network errors, and service outages. + """ + retry_config = getattr(self, '_retry_config', None) + if not retry_config: + return await self._execute_unified_achat_completion( + messages, temperature, tools, stream, reasoning_steps, + task_name, task_description, task_id, response_format, + stream_callback=stream_callback, emit_events=emit_events + ) + + from .retry_utils import jittered_backoff, interruptible_sleep + from ..hooks import HookEvent, OnRetryInput + import time + import asyncio + + max_attempts = retry_config.max_retries + 1 + + for attempt in range(max_attempts): + try: + # Call the underlying unified chat completion directly to avoid infinite recursion + return await self._execute_unified_achat_completion( + messages, temperature, tools, stream, reasoning_steps, + task_name, task_description, task_id, response_format, + stream_callback=stream_callback, emit_events=emit_events + ) + + except Exception as e: + from ..errors import LLMError + + # Only retry LLMErrors that are marked as retryable + if not isinstance(e, LLMError) or not e.is_retryable: + raise # Re-raise non-retryable errors immediately + + # If this is the last attempt, re-raise the error + if attempt >= max_attempts - 1: + raise + + # Calculate delay for this retry attempt + delay = jittered_backoff( + attempt, + base_delay=retry_config.base_delay, + max_delay=retry_config.max_delay, + jitter_ratio=retry_config.jitter_ratio, + ) + + # Fire OnRetry hook with delay information + retry_input = OnRetryInput( + session_id=getattr(self, '_session_id', 'default'), + cwd=os.getcwd(), + event_name=HookEvent.ON_RETRY, + timestamp=str(time.time()), + agent_name=self.name, + retry_count=attempt + 1, + max_retries=retry_config.max_retries, + error_message=str(e), + operation="async_llm_request", + delay_seconds=delay, + attempt=attempt + ) + await self._hook_runner.execute_async(HookEvent.ON_RETRY, retry_input) + + # Log retry attempt + logger.debug(f"[{self.name}] Async retry {attempt + 1}/{max_attempts} after {delay:.1f}s: {str(e)[:100]}") + + # Async sleep with interrupt awareness using the helper + interrupt_fn = getattr(self, '_is_interrupted', lambda: False) + if not await interruptible_sleep(delay, interrupt_fn=interrupt_fn): + raise RuntimeError("Agent interrupted during retry backoff") + + # This should never be reached, but just in case + raise RuntimeError("Async retry loop completed without returning or raising an exception") diff --git a/src/praisonai-agents/praisonaiagents/agent/retry_utils.py b/src/praisonai-agents/praisonaiagents/agent/retry_utils.py new file mode 100644 index 000000000..e54ef7383 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/agent/retry_utils.py @@ -0,0 +1,101 @@ +""" +Retry utilities with jittered exponential backoff for the PraisonAI SDK. + +Provides interrupt-aware retry mechanisms for LLM API calls and tool execution +with configurable backoff strategies, jitter, and buffered status reporting. +""" + +import asyncio +import random +import time +from dataclasses import dataclass +from typing import Callable, Optional + + +@dataclass +class RetryBackoffConfig: + """Configuration for jittered exponential backoff retry behavior.""" + base_delay: float = 5.0 # Base delay in seconds + max_delay: float = 120.0 # Maximum delay in seconds + jitter_ratio: float = 0.5 # Jitter as fraction of delay added on top of base (0.0-1.0) + max_retries: int = 3 # Maximum retry attempts + + def __post_init__(self): + """Validate configuration parameters.""" + if self.base_delay <= 0: + raise ValueError("base_delay must be > 0") + if self.max_delay < self.base_delay: + raise ValueError("max_delay must be >= base_delay") + if not (0 <= self.jitter_ratio <= 1): + raise ValueError("jitter_ratio must be between 0 and 1") + if self.max_retries < 0: + raise ValueError("max_retries must be >= 0") + + +def jittered_backoff( + attempt: int, + *, + base_delay: float = 5.0, + max_delay: float = 120.0, + jitter_ratio: float = 0.5, +) -> float: + """ + Calculate delay for jittered exponential backoff. + + Args: + attempt: Current attempt number (0-based) + base_delay: Base delay in seconds + max_delay: Maximum delay cap in seconds + jitter_ratio: Jitter ratio (0.0-1.0), fraction of delay added as positive jitter + + Returns: + Delay in seconds with jitter applied + + Example: + >>> # Attempt 0: ~5s, Attempt 1: ~10s, Attempt 2: ~20s + >>> delay = jittered_backoff(1, base_delay=5.0, max_delay=120.0, jitter_ratio=0.5) + """ + # Exponential backoff: base * 2^attempt + delay = min(base_delay * (2 ** max(0, attempt)), max_delay) + + # Apply jitter: delay + uniform(0, jitter_ratio * delay) for additive positive jitter + if jitter_ratio > 0: + jitter_range = delay * jitter_ratio + jitter = random.uniform(0, jitter_range) # Positive additive jitter only + delay = max(0.1, min(delay + jitter, max_delay)) # Clamp again after jitter + + return delay + + +async def interruptible_sleep( + seconds: float, + check_interval: float = 0.2, + interrupt_fn: Optional[Callable[[], bool]] = None, +) -> bool: + """ + Sleep with periodic interruption checks. + + Args: + seconds: Total sleep duration in seconds + check_interval: How often to check for interruption (seconds) + interrupt_fn: Function that returns True if sleep should be interrupted + + Returns: + True if completed full sleep, False if interrupted + + Example: + >>> interrupted = await interruptible_sleep(30.0, interrupt_fn=lambda: agent.is_stopped()) + """ + if interrupt_fn is None: + interrupt_fn = lambda: False + + elapsed = 0.0 + while elapsed < seconds: + if interrupt_fn(): + return False # Interrupted + + sleep_time = min(check_interval, seconds - elapsed) + await asyncio.sleep(sleep_time) + elapsed += sleep_time + + return True # Completed full sleep \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/hooks/events.py b/src/praisonai-agents/praisonaiagents/hooks/events.py index ec538a695..da06b619e 100644 --- a/src/praisonai-agents/praisonaiagents/hooks/events.py +++ b/src/praisonai-agents/praisonaiagents/hooks/events.py @@ -190,6 +190,8 @@ class OnRetryInput(HookInput): max_retries: int = 3 error_message: str = "" operation: str = "" # tool_call, llm_request, etc. + delay_seconds: float = 0.0 # Delay before retry + attempt: int = 0 # Current attempt number (0-based) def to_dict(self) -> Dict[str, Any]: base = super().to_dict() @@ -204,7 +206,9 @@ def to_dict(self) -> Dict[str, Any]: "retry_count": self.retry_count, "max_retries": self.max_retries, "error_message": self.error_message, - "operation": self.operation + "operation": self.operation, + "delay_seconds": self.delay_seconds, + "attempt": self.attempt }) return base