-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix: Agent runtime lacks jittered exponential backoff for API and tool retries #2082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e9de0b1
550964a
a956338
d078e40
9a8ff3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1039,7 +1039,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r | |
| try: | ||
| # First attempt: try with streaming enabled for better user experience | ||
| stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None | ||
| final_response = self._execute_unified_chat_completion( | ||
| final_response = self._chat_completion_with_retry( | ||
| messages=messages, | ||
| temperature=temperature, | ||
| tools=formatted_tools, | ||
|
|
@@ -1060,7 +1060,11 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r | |
| stream = False # Set for the main execution below | ||
| else: | ||
| raise # Re-raise if it's a different ValueError | ||
| except Exception: | ||
| except Exception as e: | ||
| from ..errors import LLMError | ||
| # Don't retry if it's an LLMError that has exhausted retries | ||
| if isinstance(e, LLMError): | ||
| raise # Re-raise LLMErrors immediately to avoid double retry | ||
| # For any other exception, fall back to non-streaming | ||
| logging.debug(f"{self.name}: Streaming attempt failed, falling back to non-streaming") | ||
| stream = False # Set for the main execution below | ||
|
|
@@ -1071,7 +1075,7 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, r | |
| # UNIFIED: Single protocol-driven dispatch path (fixes DRY violation) | ||
| # All LLM providers now go through unified dispatcher for consistency and maintainability | ||
| stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None | ||
| final_response = self._execute_unified_chat_completion( | ||
| final_response = self._chat_completion_with_retry( | ||
| messages=messages, | ||
| temperature=temperature, | ||
| tools=formatted_tools, | ||
|
|
@@ -3003,7 +3007,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda | |
| response_format = self._build_response_format(schema_model) | ||
|
|
||
| # Use composition instead of runtime class mutation for safety | ||
| response = await self._execute_unified_achat_completion( | ||
| response = await self._achat_completion_with_retry( | ||
| messages=messages, | ||
| temperature=temperature, | ||
| tools=formatted_tools, | ||
|
|
@@ -3175,7 +3179,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda | |
| {"role": "user", "content": "Now regenerate your response using the reflection you made"} | ||
| ] | ||
|
|
||
| new_response = await self._execute_unified_achat_completion( | ||
| new_response = await self._achat_completion_with_retry( | ||
| messages=regenerate_messages, | ||
| temperature=temperature, | ||
| tools=formatted_tools, | ||
|
|
@@ -4058,3 +4062,156 @@ async def _apply_context_compaction_async(self, messages, hook_event_class): | |
| raise | ||
| logging.debug(f"[compaction] skipped (non-fatal): {_ce}") | ||
| return False | ||
|
|
||
| def _chat_completion_with_retry(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, stream_callback=None, emit_events=True): | ||
| """ | ||
| Wrapper for _execute_unified_chat_completion that adds jittered exponential backoff retry logic. | ||
|
|
||
| This method wraps the unified chat completion call and adds retry capability for | ||
| transient failures like rate limits, network errors, and service outages. | ||
| """ | ||
| retry_config = getattr(self, '_retry_config', None) | ||
| if not retry_config: | ||
| return self._execute_unified_chat_completion(messages, temperature, tools, stream, reasoning_steps, | ||
| task_name, task_description, task_id, response_format, | ||
| stream_callback=stream_callback, emit_events=emit_events) | ||
|
|
||
| from .retry_utils import jittered_backoff | ||
| from ..hooks import HookEvent, OnRetryInput | ||
| import time | ||
|
|
||
| max_attempts = retry_config.max_retries + 1 | ||
|
|
||
| for attempt in range(max_attempts): | ||
| try: | ||
| # Call the underlying unified chat completion directly to avoid infinite recursion | ||
| return self._execute_unified_chat_completion(messages, temperature, tools, stream, reasoning_steps, | ||
| task_name, task_description, task_id, response_format, | ||
| stream_callback=stream_callback, emit_events=emit_events) | ||
|
|
||
| except Exception as e: | ||
| from ..errors import LLMError | ||
|
|
||
| # Only retry LLMErrors that are marked as retryable | ||
| if not isinstance(e, LLMError) or not e.is_retryable: | ||
| raise # Re-raise non-retryable errors immediately | ||
|
|
||
| # If this is the last attempt, re-raise the error | ||
| if attempt >= max_attempts - 1: | ||
| raise | ||
|
|
||
| # Calculate delay for this retry attempt | ||
| delay = jittered_backoff( | ||
| attempt, | ||
| base_delay=retry_config.base_delay, | ||
| max_delay=retry_config.max_delay, | ||
| jitter_ratio=retry_config.jitter_ratio, | ||
| ) | ||
|
|
||
| # Fire OnRetry hook with delay information | ||
| retry_input = OnRetryInput( | ||
| session_id=getattr(self, '_session_id', 'default'), | ||
| cwd=os.getcwd(), | ||
| event_name=HookEvent.ON_RETRY, | ||
| timestamp=str(time.time()), | ||
| agent_name=self.name, | ||
| retry_count=attempt + 1, | ||
| max_retries=retry_config.max_retries, | ||
| error_message=str(e), | ||
| operation="llm_request", | ||
| delay_seconds=delay, | ||
| attempt=attempt | ||
| ) | ||
| self._hook_runner.execute_sync(HookEvent.ON_RETRY, retry_input) | ||
|
|
||
| # Log retry attempt (buffered to avoid spam during transient failures) | ||
| logger.debug(f"[{self.name}] Retry {attempt + 1}/{max_attempts} after {delay:.1f}s: {str(e)[:100]}") | ||
|
|
||
| # Sleep with interrupt awareness - make interruption terminal | ||
| interrupt_fn = getattr(self, '_is_interrupted', lambda: False) | ||
| sleep_start = time.time() | ||
| while time.time() - sleep_start < delay: | ||
| if interrupt_fn(): | ||
| # Interruption is terminal - stop retrying | ||
| raise RuntimeError("Agent interrupted during retry backoff") | ||
| time.sleep(min(0.2, delay - (time.time() - sleep_start))) | ||
|
|
||
| # This should never be reached, but just in case | ||
| raise RuntimeError("Retry loop completed without returning or raising an exception") | ||
|
|
||
| async def _achat_completion_with_retry(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, stream_callback=None, emit_events=True): | ||
| """ | ||
| Async wrapper for _execute_unified_achat_completion that adds jittered exponential backoff retry logic. | ||
|
|
||
| This method wraps the async chat completion call and adds retry capability for | ||
| transient failures like rate limits, network errors, and service outages. | ||
| """ | ||
| retry_config = getattr(self, '_retry_config', None) | ||
| if not retry_config: | ||
| return await self._execute_unified_achat_completion( | ||
| messages, temperature, tools, stream, reasoning_steps, | ||
| task_name, task_description, task_id, response_format, | ||
| stream_callback=stream_callback, emit_events=emit_events | ||
| ) | ||
|
|
||
| from .retry_utils import jittered_backoff, interruptible_sleep | ||
| from ..hooks import HookEvent, OnRetryInput | ||
| import time | ||
| import asyncio | ||
|
|
||
| max_attempts = retry_config.max_retries + 1 | ||
|
|
||
| for attempt in range(max_attempts): | ||
| try: | ||
| # Call the underlying unified chat completion directly to avoid infinite recursion | ||
| return await self._execute_unified_achat_completion( | ||
| messages, temperature, tools, stream, reasoning_steps, | ||
| task_name, task_description, task_id, response_format, | ||
| stream_callback=stream_callback, emit_events=emit_events | ||
| ) | ||
|
|
||
| except Exception as e: | ||
| from ..errors import LLMError | ||
|
|
||
| # Only retry LLMErrors that are marked as retryable | ||
| if not isinstance(e, LLMError) or not e.is_retryable: | ||
| raise # Re-raise non-retryable errors immediately | ||
|
Comment on lines
+4164
to
+4178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With the defaults ( |
||
|
|
||
| # If this is the last attempt, re-raise the error | ||
| if attempt >= max_attempts - 1: | ||
| raise | ||
|
|
||
| # Calculate delay for this retry attempt | ||
| delay = jittered_backoff( | ||
| attempt, | ||
| base_delay=retry_config.base_delay, | ||
| max_delay=retry_config.max_delay, | ||
| jitter_ratio=retry_config.jitter_ratio, | ||
| ) | ||
|
|
||
| # Fire OnRetry hook with delay information | ||
| retry_input = OnRetryInput( | ||
| session_id=getattr(self, '_session_id', 'default'), | ||
| cwd=os.getcwd(), | ||
| event_name=HookEvent.ON_RETRY, | ||
| timestamp=str(time.time()), | ||
| agent_name=self.name, | ||
| retry_count=attempt + 1, | ||
| max_retries=retry_config.max_retries, | ||
| error_message=str(e), | ||
| operation="async_llm_request", | ||
| delay_seconds=delay, | ||
| attempt=attempt | ||
| ) | ||
| await self._hook_runner.execute_async(HookEvent.ON_RETRY, retry_input) | ||
|
|
||
| # Log retry attempt | ||
| logger.debug(f"[{self.name}] Async retry {attempt + 1}/{max_attempts} after {delay:.1f}s: {str(e)[:100]}") | ||
|
|
||
| # Async sleep with interrupt awareness using the helper | ||
| interrupt_fn = getattr(self, '_is_interrupted', lambda: False) | ||
| if not await interruptible_sleep(delay, interrupt_fn=interrupt_fn): | ||
| raise RuntimeError("Agent interrupted during retry backoff") | ||
|
|
||
| # This should never be reached, but just in case | ||
| raise RuntimeError("Async retry loop completed without returning or raising an exception") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| """ | ||
| Retry utilities with jittered exponential backoff for the PraisonAI SDK. | ||
|
|
||
| Provides interrupt-aware retry mechanisms for LLM API calls and tool execution | ||
| with configurable backoff strategies, jitter, and buffered status reporting. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import random | ||
| import time | ||
| from dataclasses import dataclass | ||
| from typing import Callable, Optional | ||
|
|
||
|
|
||
| @dataclass | ||
| class RetryBackoffConfig: | ||
| """Configuration for jittered exponential backoff retry behavior.""" | ||
| base_delay: float = 5.0 # Base delay in seconds | ||
| max_delay: float = 120.0 # Maximum delay in seconds | ||
| jitter_ratio: float = 0.5 # Jitter as fraction of delay added on top of base (0.0-1.0) | ||
| max_retries: int = 3 # Maximum retry attempts | ||
|
|
||
| def __post_init__(self): | ||
| """Validate configuration parameters.""" | ||
| if self.base_delay <= 0: | ||
| raise ValueError("base_delay must be > 0") | ||
| if self.max_delay < self.base_delay: | ||
| raise ValueError("max_delay must be >= base_delay") | ||
| if not (0 <= self.jitter_ratio <= 1): | ||
| raise ValueError("jitter_ratio must be between 0 and 1") | ||
| if self.max_retries < 0: | ||
| raise ValueError("max_retries must be >= 0") | ||
|
|
||
|
|
||
| def jittered_backoff( | ||
| attempt: int, | ||
| *, | ||
| base_delay: float = 5.0, | ||
| max_delay: float = 120.0, | ||
| jitter_ratio: float = 0.5, | ||
| ) -> float: | ||
| """ | ||
| Calculate delay for jittered exponential backoff. | ||
|
|
||
| Args: | ||
| attempt: Current attempt number (0-based) | ||
| base_delay: Base delay in seconds | ||
| max_delay: Maximum delay cap in seconds | ||
| jitter_ratio: Jitter ratio (0.0-1.0), fraction of delay added as positive jitter | ||
|
|
||
| Returns: | ||
| Delay in seconds with jitter applied | ||
|
|
||
| Example: | ||
| >>> # Attempt 0: ~5s, Attempt 1: ~10s, Attempt 2: ~20s | ||
| >>> delay = jittered_backoff(1, base_delay=5.0, max_delay=120.0, jitter_ratio=0.5) | ||
| """ | ||
| # Exponential backoff: base * 2^attempt | ||
| delay = min(base_delay * (2 ** max(0, attempt)), max_delay) | ||
|
|
||
| # Apply jitter: delay + uniform(0, jitter_ratio * delay) for additive positive jitter | ||
| if jitter_ratio > 0: | ||
| jitter_range = delay * jitter_ratio | ||
| jitter = random.uniform(0, jitter_range) # Positive additive jitter only | ||
| delay = max(0.1, min(delay + jitter, max_delay)) # Clamp again after jitter | ||
|
|
||
| return delay | ||
|
|
||
|
|
||
| async def interruptible_sleep( | ||
| seconds: float, | ||
| check_interval: float = 0.2, | ||
| interrupt_fn: Optional[Callable[[], bool]] = None, | ||
| ) -> bool: | ||
| """ | ||
| Sleep with periodic interruption checks. | ||
|
|
||
| Args: | ||
| seconds: Total sleep duration in seconds | ||
| check_interval: How often to check for interruption (seconds) | ||
| interrupt_fn: Function that returns True if sleep should be interrupted | ||
|
|
||
| Returns: | ||
| True if completed full sleep, False if interrupted | ||
|
|
||
| Example: | ||
| >>> interrupted = await interruptible_sleep(30.0, interrupt_fn=lambda: agent.is_stopped()) | ||
| """ | ||
| if interrupt_fn is None: | ||
| interrupt_fn = lambda: False | ||
|
|
||
| elapsed = 0.0 | ||
| while elapsed < seconds: | ||
| if interrupt_fn(): | ||
| return False # Interrupted | ||
|
|
||
| sleep_time = min(check_interval, seconds - elapsed) | ||
| await asyncio.sleep(sleep_time) | ||
| elapsed += sleep_time | ||
|
|
||
| return True # Completed full sleep | ||
|
greptile-apps[bot] marked this conversation as resolved.
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LLMErrorgate_execute_unified_chat_completion(line 1607-1609) re-raises every dispatcher error as a raw exception (openai.RateLimitError,httpx.RemoteProtocolError, etc.) — never asLLMError. The guard on line 4096 (not isinstance(e, LLMError)) therefore always matches, so_chat_completion_with_retryimmediately re-raises without retrying. The exception then propagates into_chat_completion's ownexcept Exceptionhandler (line 1162), which converts it toLLMErrorand applies its own_retry_depth ≤ 2mechanism — completely ignoring the user'sRetryBackoffConfig.LLMErroris created in_chat_completion's handler after the exception has already escaped_chat_completion_with_retry; by that point the retry loop is unreachable. To make the new config effective, the inner call in the retry loop needs to be_execute_unified_chat_completionwrapped in the same error-classification logic that_chat_completionuses (i.e. catch raw exceptions, classify, wrap inLLMError, then let the retry loop decide).