Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/praisonai-agents/praisonaiagents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def _get_lazy_cache():

# Agent classes
'Agent': ('praisonaiagents.agent.agent', 'Agent'),
'RetryBackoffConfig': ('praisonaiagents.agent.retry_utils', 'RetryBackoffConfig'),
'BudgetExceededError': ('praisonaiagents.errors', 'BudgetExceededError'),

# Error hierarchy - structured exception handling
Expand Down Expand Up @@ -783,6 +784,7 @@ def warmup(include_litellm: bool = False, include_openai: bool = True) -> dict:

# Core classes - the essentials
'Agent',
'RetryBackoffConfig',
'AgentTeam', # Primary class for multi-agent coordination (v1.0+)
'AgentManager', # Silent alias for AgentTeam
'Agents', # Deprecated alias for AgentTeam (emits warning)
Expand Down
19 changes: 19 additions & 0 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ def _get_default_server_registry() -> ServerRegistry:
# Import structured error from central errors module
from ..errors import BudgetExceededError

# Import retry configuration
from .retry_utils import RetryBackoffConfig

class Agent(SteeringMixin, SandboxMixin, UnifiedExecutionMixin, ToolExecutionMixin, ChatHandlerMixin, SessionManagerMixin, ChatMixin, ExecutionMixin, MemoryMixin, AsyncMemoryMixin):
# Class-level counter for generating unique display names for nameless agents
_agent_counter = 0
Expand Down Expand Up @@ -595,6 +598,7 @@ def __init__(
tool_search: Optional[Union[bool, str, Dict[str, Any], 'ToolSearchConfig']] = False, # Progressive tool disclosure
message_steering: Optional[Union[bool, 'MessageSteeringProtocol']] = False, # Real-time message steering during execution
sandbox: Optional[Union[bool, 'SandboxConfig']] = None, # Sandbox for safe code execution
retry: Optional[Union[bool, Dict[str, Any], 'RetryBackoffConfig']] = None, # Retry configuration with exponential backoff
):
"""Initialize an Agent instance.

Expand Down Expand Up @@ -792,6 +796,8 @@ def __init__(
if autonomy is None:
# AutonomyConfig is in agent/autonomy.py - use dict for config defaults
autonomy = apply_config_defaults("autonomy", autonomy, None)
if retry is None:
retry = apply_config_defaults("retry", retry, RetryBackoffConfig)

# ============================================================
# DEPRECATION WARNINGS for params consolidated into configs
Expand Down Expand Up @@ -1978,6 +1984,16 @@ def __init__(
# Store tool retry policy for tool execution with exponential backoff
self._tool_retry_policy = _tool_config.retry_policy if _tool_config else None

# Retry configuration with jittered exponential backoff
if isinstance(retry, RetryBackoffConfig):
self._retry_config = retry
elif isinstance(retry, dict):
self._retry_config = RetryBackoffConfig(**retry)
elif retry is True:
self._retry_config = RetryBackoffConfig() # Use defaults
else:
self._retry_config = None # No retry configuration

# Cache for system prompts and formatted tools with eager thread-safe lock
# Use OrderedDict for LRU behavior
self._system_prompt_cache = OrderedDict()
Expand Down Expand Up @@ -2201,6 +2217,9 @@ def clone_for_channel(self) -> "Agent":
# Tool configuration - use consolidated config when available
'tool_config': getattr(self, '_tool_config', None),

# Retry configuration
'retry': getattr(self, '_retry_config', None),

# CLI backend
'cli_backend': getattr(self, '_cli_backend', None),

Expand Down
167 changes: 162 additions & 5 deletions src/praisonai-agents/praisonaiagents/agent/chat_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r
try:
# First attempt: try with streaming enabled for better user experience
stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None
final_response = self._execute_unified_chat_completion(
final_response = self._chat_completion_with_retry(
messages=messages,
temperature=temperature,
tools=formatted_tools,
Expand All @@ -1060,7 +1060,11 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r
stream = False # Set for the main execution below
else:
raise # Re-raise if it's a different ValueError
except Exception:
except Exception as e:
from ..errors import LLMError
# Don't retry if it's an LLMError that has exhausted retries
if isinstance(e, LLMError):
raise # Re-raise LLMErrors immediately to avoid double retry
# For any other exception, fall back to non-streaming
logging.debug(f"{self.name}: Streaming attempt failed, falling back to non-streaming")
stream = False # Set for the main execution below
Expand All @@ -1071,7 +1075,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r
# UNIFIED: Single protocol-driven dispatch path (fixes DRY violation)
# All LLM providers now go through unified dispatcher for consistency and maintainability
stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None
final_response = self._execute_unified_chat_completion(
final_response = self._chat_completion_with_retry(
messages=messages,
temperature=temperature,
tools=formatted_tools,
Expand Down Expand Up @@ -3003,7 +3007,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda
response_format = self._build_response_format(schema_model)

# Use composition instead of runtime class mutation for safety
response = await self._execute_unified_achat_completion(
response = await self._achat_completion_with_retry(
messages=messages,
temperature=temperature,
tools=formatted_tools,
Expand Down Expand Up @@ -3175,7 +3179,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda
{"role": "user", "content": "Now regenerate your response using the reflection you made"}
]

new_response = await self._execute_unified_achat_completion(
new_response = await self._achat_completion_with_retry(
messages=regenerate_messages,
temperature=temperature,
tools=formatted_tools,
Expand Down Expand Up @@ -4058,3 +4062,156 @@ async def _apply_context_compaction_async(self, messages, hook_event_class):
raise
logging.debug(f"[compaction] skipped (non-fatal): {_ce}")
return False

def _chat_completion_with_retry(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, stream_callback=None, emit_events=True):
"""
Wrapper for _execute_unified_chat_completion that adds jittered exponential backoff retry logic.

This method wraps the unified chat completion call and adds retry capability for
transient failures like rate limits, network errors, and service outages.
"""
retry_config = getattr(self, '_retry_config', None)
if not retry_config:
return self._execute_unified_chat_completion(messages, temperature, tools, stream, reasoning_steps,
task_name, task_description, task_id, response_format,
stream_callback=stream_callback, emit_events=emit_events)

from .retry_utils import jittered_backoff
from ..hooks import HookEvent, OnRetryInput
import time

max_attempts = retry_config.max_retries + 1

for attempt in range(max_attempts):
try:
# Call the underlying unified chat completion directly to avoid infinite recursion
return self._execute_unified_chat_completion(messages, temperature, tools, stream, reasoning_steps,
task_name, task_description, task_id, response_format,
stream_callback=stream_callback, emit_events=emit_events)

except Exception as e:
from ..errors import LLMError

# Only retry LLMErrors that are marked as retryable
if not isinstance(e, LLMError) or not e.is_retryable:
raise # Re-raise non-retryable errors immediately
Comment on lines +4066 to +4097

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Sync retry loop is dead code — raw exceptions bypass the LLMError gate

_execute_unified_chat_completion (line 1607-1609) re-raises every dispatcher error as a raw exception (openai.RateLimitError, httpx.RemoteProtocolError, etc.) — never as LLMError. The guard on line 4096 (not isinstance(e, LLMError)) therefore always matches, so _chat_completion_with_retry immediately re-raises without retrying. The exception then propagates into _chat_completion's own except Exception handler (line 1162), which converts it to LLMError and applies its own _retry_depth ≤ 2 mechanism — completely ignoring the user's RetryBackoffConfig.

LLMError is created in _chat_completion's handler after the exception has already escaped _chat_completion_with_retry; by that point the retry loop is unreachable. To make the new config effective, the inner call in the retry loop needs to be _execute_unified_chat_completion wrapped in the same error-classification logic that _chat_completion uses (i.e. catch raw exceptions, classify, wrap in LLMError, then let the retry loop decide).


# If this is the last attempt, re-raise the error
if attempt >= max_attempts - 1:
raise

# Calculate delay for this retry attempt
delay = jittered_backoff(
attempt,
base_delay=retry_config.base_delay,
max_delay=retry_config.max_delay,
jitter_ratio=retry_config.jitter_ratio,
)

# Fire OnRetry hook with delay information
retry_input = OnRetryInput(
session_id=getattr(self, '_session_id', 'default'),
cwd=os.getcwd(),
event_name=HookEvent.ON_RETRY,
timestamp=str(time.time()),
agent_name=self.name,
retry_count=attempt + 1,
max_retries=retry_config.max_retries,
error_message=str(e),
operation="llm_request",
delay_seconds=delay,
attempt=attempt
)
self._hook_runner.execute_sync(HookEvent.ON_RETRY, retry_input)

# Log retry attempt (buffered to avoid spam during transient failures)
logger.debug(f"[{self.name}] Retry {attempt + 1}/{max_attempts} after {delay:.1f}s: {str(e)[:100]}")

# Sleep with interrupt awareness - make interruption terminal
interrupt_fn = getattr(self, '_is_interrupted', lambda: False)
sleep_start = time.time()
while time.time() - sleep_start < delay:
if interrupt_fn():
# Interruption is terminal - stop retrying
raise RuntimeError("Agent interrupted during retry backoff")
time.sleep(min(0.2, delay - (time.time() - sleep_start)))

# This should never be reached, but just in case
raise RuntimeError("Retry loop completed without returning or raising an exception")

async def _achat_completion_with_retry(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, stream_callback=None, emit_events=True):
"""
Async wrapper for _execute_unified_achat_completion that adds jittered exponential backoff retry logic.

This method wraps the async chat completion call and adds retry capability for
transient failures like rate limits, network errors, and service outages.
"""
retry_config = getattr(self, '_retry_config', None)
if not retry_config:
return await self._execute_unified_achat_completion(
messages, temperature, tools, stream, reasoning_steps,
task_name, task_description, task_id, response_format,
stream_callback=stream_callback, emit_events=emit_events
)

from .retry_utils import jittered_backoff, interruptible_sleep
from ..hooks import HookEvent, OnRetryInput
import time
import asyncio

max_attempts = retry_config.max_retries + 1

for attempt in range(max_attempts):
try:
# Call the underlying unified chat completion directly to avoid infinite recursion
return await self._execute_unified_achat_completion(
messages, temperature, tools, stream, reasoning_steps,
task_name, task_description, task_id, response_format,
stream_callback=stream_callback, emit_events=emit_events
)

except Exception as e:
from ..errors import LLMError

# Only retry LLMErrors that are marked as retryable
if not isinstance(e, LLMError) or not e.is_retryable:
raise # Re-raise non-retryable errors immediately
Comment on lines +4164 to +4178

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Async path causes double-retry: _handle_async_llm_error already retries before _achat_completion_with_retry sees LLMError

_execute_unified_achat_completion (lines 1690-1696) catches every raw exception and hands it to _handle_async_llm_error, which already retries up to _retry_depth=2 times (sleeping with its own backoff). Only after all internal retries fail does it raise LLMError with is_retryable=True. At that point _achat_completion_with_retry's except block catches that LLMError, passes the isinstance check, and starts another full retry loop.

With the defaults (max_retries=3), a rate-limit error triggers: 3 attempts per _achat_completion_with_retry iteration × 4 outer iterations = up to 12 total LLM calls, not the expected 4. Backoff delays compound from both layers, so the total wall-clock penalty can be 3–4× what the user configured.


# If this is the last attempt, re-raise the error
if attempt >= max_attempts - 1:
raise

# Calculate delay for this retry attempt
delay = jittered_backoff(
attempt,
base_delay=retry_config.base_delay,
max_delay=retry_config.max_delay,
jitter_ratio=retry_config.jitter_ratio,
)

# Fire OnRetry hook with delay information
retry_input = OnRetryInput(
session_id=getattr(self, '_session_id', 'default'),
cwd=os.getcwd(),
event_name=HookEvent.ON_RETRY,
timestamp=str(time.time()),
agent_name=self.name,
retry_count=attempt + 1,
max_retries=retry_config.max_retries,
error_message=str(e),
operation="async_llm_request",
delay_seconds=delay,
attempt=attempt
)
await self._hook_runner.execute_async(HookEvent.ON_RETRY, retry_input)

# Log retry attempt
logger.debug(f"[{self.name}] Async retry {attempt + 1}/{max_attempts} after {delay:.1f}s: {str(e)[:100]}")

# Async sleep with interrupt awareness using the helper
interrupt_fn = getattr(self, '_is_interrupted', lambda: False)
if not await interruptible_sleep(delay, interrupt_fn=interrupt_fn):
raise RuntimeError("Agent interrupted during retry backoff")

# This should never be reached, but just in case
raise RuntimeError("Async retry loop completed without returning or raising an exception")
101 changes: 101 additions & 0 deletions src/praisonai-agents/praisonaiagents/agent/retry_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Retry utilities with jittered exponential backoff for the PraisonAI SDK.

Provides interrupt-aware retry mechanisms for LLM API calls and tool execution
with configurable backoff strategies, jitter, and buffered status reporting.
"""

import asyncio
import random
import time
from dataclasses import dataclass
from typing import Callable, Optional


@dataclass
class RetryBackoffConfig:
"""Configuration for jittered exponential backoff retry behavior."""
base_delay: float = 5.0 # Base delay in seconds
max_delay: float = 120.0 # Maximum delay in seconds
jitter_ratio: float = 0.5 # Jitter as fraction of delay added on top of base (0.0-1.0)
max_retries: int = 3 # Maximum retry attempts

def __post_init__(self):
"""Validate configuration parameters."""
if self.base_delay <= 0:
raise ValueError("base_delay must be > 0")
if self.max_delay < self.base_delay:
raise ValueError("max_delay must be >= base_delay")
if not (0 <= self.jitter_ratio <= 1):
raise ValueError("jitter_ratio must be between 0 and 1")
if self.max_retries < 0:
raise ValueError("max_retries must be >= 0")


def jittered_backoff(
attempt: int,
*,
base_delay: float = 5.0,
max_delay: float = 120.0,
jitter_ratio: float = 0.5,
) -> float:
"""
Calculate delay for jittered exponential backoff.

Args:
attempt: Current attempt number (0-based)
base_delay: Base delay in seconds
max_delay: Maximum delay cap in seconds
jitter_ratio: Jitter ratio (0.0-1.0), fraction of delay added as positive jitter

Returns:
Delay in seconds with jitter applied

Example:
>>> # Attempt 0: ~5s, Attempt 1: ~10s, Attempt 2: ~20s
>>> delay = jittered_backoff(1, base_delay=5.0, max_delay=120.0, jitter_ratio=0.5)
"""
# Exponential backoff: base * 2^attempt
delay = min(base_delay * (2 ** max(0, attempt)), max_delay)

# Apply jitter: delay + uniform(0, jitter_ratio * delay) for additive positive jitter
if jitter_ratio > 0:
jitter_range = delay * jitter_ratio
jitter = random.uniform(0, jitter_range) # Positive additive jitter only
delay = max(0.1, min(delay + jitter, max_delay)) # Clamp again after jitter

return delay


async def interruptible_sleep(
seconds: float,
check_interval: float = 0.2,
interrupt_fn: Optional[Callable[[], bool]] = None,
) -> bool:
"""
Sleep with periodic interruption checks.

Args:
seconds: Total sleep duration in seconds
check_interval: How often to check for interruption (seconds)
interrupt_fn: Function that returns True if sleep should be interrupted

Returns:
True if completed full sleep, False if interrupted

Example:
>>> interrupted = await interruptible_sleep(30.0, interrupt_fn=lambda: agent.is_stopped())
"""
if interrupt_fn is None:
interrupt_fn = lambda: False

elapsed = 0.0
while elapsed < seconds:
if interrupt_fn():
return False # Interrupted

sleep_time = min(check_interval, seconds - elapsed)
await asyncio.sleep(sleep_time)
elapsed += sleep_time

return True # Completed full sleep
Comment thread
greptile-apps[bot] marked this conversation as resolved.
6 changes: 5 additions & 1 deletion src/praisonai-agents/praisonaiagents/hooks/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ class OnRetryInput(HookInput):
max_retries: int = 3
error_message: str = ""
operation: str = "" # tool_call, llm_request, etc.
delay_seconds: float = 0.0 # Delay before retry
attempt: int = 0 # Current attempt number (0-based)

def to_dict(self) -> Dict[str, Any]:
base = super().to_dict()
Expand All @@ -204,7 +206,9 @@ def to_dict(self) -> Dict[str, Any]:
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"error_message": self.error_message,
"operation": self.operation
"operation": self.operation,
"delay_seconds": self.delay_seconds,
"attempt": self.attempt
})
return base

Expand Down
Loading