Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import contextlib
import threading
import concurrent.futures
import random
from typing import List, Optional, Any, Dict, Union, Literal, TYPE_CHECKING, Callable, Generator
from collections import OrderedDict
import inspect
Expand All @@ -16,7 +17,7 @@
from .execution_mixin import ExecutionMixin
from .memory_mixin import MemoryMixin
from .async_memory_mixin import AsyncMemoryMixin
from .tool_execution import ToolExecutionMixin
from .tool_execution import ToolExecutionMixin, BackoffPolicy
from .chat_handler import ChatHandlerMixin
from .session_manager import SessionManagerMixin
from .async_safety import AsyncSafeState
Expand Down Expand Up @@ -1024,7 +1025,7 @@ def __init__(
_on_budget_exceeded = 'stop'
# Default to False when no ExecutionConfig provided
parallel_tool_calls = False
# (already set from parameter, no need to override)
_exec_config = ExecutionConfig() # Default config for non-execution case

# ─────────────────────────────────────────────────────────────────────
# Resolve TEMPLATES param - FAST PATH
Expand Down Expand Up @@ -1763,6 +1764,8 @@ def __init__(
self.allow_code_execution = allow_code_execution
self.max_retry_limit = max_retry_limit
self.code_execution_mode = code_execution_mode
# Store execution config for guardrail retry backoff
self._execution_config = _exec_config
self.embedder_config = embedder_config
self.knowledge = knowledge
self.use_system_prompt = use_system_prompt
Expand Down Expand Up @@ -4973,6 +4976,22 @@ def _apply_guardrail_with_retry(self, response_text, prompt, temperature=1.0, to
retry_count += 1
logging.warning(f"Agent {self.name}: Guardrail validation failed (retry {retry_count}/{self.max_guardrail_retries}): {error}")

# Add exponential backoff delay to avoid hammering the LLM
execution_config = getattr(self, '_execution_config', None)
if execution_config is not None:
total_delay = BackoffPolicy.delay(
retry_count,
execution_config.retry_initial_delay,
execution_config.retry_backoff_factor,
execution_config.retry_jitter
)
else:
# Fall back to simple backoff if no execution config
total_delay = 1.0 * (2.0 ** (retry_count - 1))

logging.info(f"Agent {self.name}: Waiting {total_delay:.2f}s before guardrail retry")
time.sleep(total_delay)

# Regenerate response for retry
try:
retry_prompt = f"{prompt}\n\nNote: Previous response failed validation due to: {error}. Please provide an improved response."
Expand Down Expand Up @@ -5010,6 +5029,22 @@ async def _aapply_guardrail_with_retry(self, response_text, prompt, temperature=
retry_count += 1
logging.warning(f"Agent {self.name}: Guardrail validation failed (retry {retry_count}/{self.max_guardrail_retries}): {error}")

# Add exponential backoff delay to avoid hammering the LLM
execution_config = getattr(self, '_execution_config', None)
if execution_config is not None:
total_delay = BackoffPolicy.delay(
retry_count,
execution_config.retry_initial_delay,
execution_config.retry_backoff_factor,
execution_config.retry_jitter
)
else:
# Fall back to simple backoff if no execution config
total_delay = 1.0 * (2.0 ** (retry_count - 1))

logging.info(f"Agent {self.name}: Waiting {total_delay:.2f}s before guardrail retry")
await asyncio.sleep(total_delay)

# Regenerate response for retry (async version)
try:
retry_prompt = f"{prompt}\n\nNote: Previous response failed validation due to: {error}. Please provide an improved response."
Expand Down
185 changes: 147 additions & 38 deletions src/praisonai-agents/praisonaiagents/agent/tool_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import inspect
import contextvars
import concurrent.futures
import random
from typing import List, Optional, Any, Dict, Union, TYPE_CHECKING
from ..errors import ToolExecutionError
from ..tools.trust import wrap_if_external
Expand All @@ -24,6 +25,30 @@
pass


class BackoffPolicy:
"""Exponential backoff policy for tool retries."""

@staticmethod
def delay(attempt: int, initial_delay: float, backoff_factor: float, jitter: float, max_delay: float = 60.0) -> float:
"""Calculate delay for a retry attempt.

Args:
attempt: Attempt number (1-based)
initial_delay: Initial delay in seconds
backoff_factor: Exponential backoff multiplier
jitter: Fraction of base delay to add as random jitter
max_delay: Maximum delay to cap exponential growth

Returns:
Delay in seconds
"""
base = initial_delay * (backoff_factor ** (attempt - 1))
# Cap the base delay to prevent excessively long waits
base = min(base, max_delay)
jitter_amount = random.uniform(0, jitter * base)
return base + jitter_amount


class ToolExecutionMixin:
"""Mixin providing toolexecution methods for the Agent class."""

Expand Down Expand Up @@ -273,32 +298,118 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_
if blocked_result is not None:
result = blocked_result
else:
# P8/G11: Apply tool timeout if configured
tool_timeout = getattr(self, '_tool_timeout', None)
if tool_timeout and tool_timeout > 0:
# Use copy_context to preserve injection context in executor thread
ctx = contextvars.copy_context()

def execute_with_context():
with with_injection_context(state):
return self._execute_tool_with_circuit_breaker(function_name, arguments)

# Use reusable executor to prevent resource leaks
if not hasattr(self, '_tool_executor'):
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix=f"tool-{self.name}"
)

future = self._tool_executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
future.cancel()
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
# Apply tool retry logic with exponential backoff
execution_config = getattr(self, '_execution_config', None)
if execution_config is None:
# Fall back to reading individual config attributes for backward compatibility
max_retry_limit = getattr(self, 'max_retry_limit', 2)
retry_initial_delay = 1.0
retry_backoff_factor = 2.0
retry_jitter = 0.1
else:
with with_injection_context(state):
result = self._execute_tool_with_circuit_breaker(function_name, arguments)
max_retry_limit = execution_config.max_retry_limit
retry_initial_delay = execution_config.retry_initial_delay
retry_backoff_factor = execution_config.retry_backoff_factor
retry_jitter = execution_config.retry_jitter

result = None
last_exception = None

# max_retry_limit is the number of retries (not total attempts)
# So total attempts = 1 (initial) + max_retry_limit (retries)
for attempt in range(1, max_retry_limit + 2):
try:
# P8/G11: Apply tool timeout if configured
tool_timeout = getattr(self, '_tool_timeout', None)
if tool_timeout and tool_timeout > 0:
# Use copy_context to preserve injection context in executor thread
ctx = contextvars.copy_context()

def execute_with_context():
with with_injection_context(state):
return self._execute_tool_with_circuit_breaker(function_name, arguments)

# Use reusable executor to prevent resource leaks
if not hasattr(self, '_tool_executor'):
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix=f"tool-{self.name}"
)

future = self._tool_executor.submit(ctx.run, execute_with_context)
try:
result = future.result(timeout=tool_timeout)
except concurrent.futures.TimeoutError:
future.cancel()
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
else:
with with_injection_context(state):
result = self._execute_tool_with_circuit_breaker(function_name, arguments)

# Check if the result indicates a retryable error
if isinstance(result, dict) and result.get("error"):
# Check if this is a circuit breaker error (always retryable)
if result.get("circuit_open"):
raise ToolExecutionError(
result["error"],
tool_name=function_name,
agent_id=self.name,
is_retryable=True,
)
# Check if this is a timeout error (retryable)
elif result.get("timeout"):
raise ToolExecutionError(
result["error"],
tool_name=function_name,
agent_id=self.name,
is_retryable=True,
)
# For other error dicts, treat as non-retryable unless specified
else:
# Success path - return the result
break
else:
# Success path - return the result
break

except ToolExecutionError as e:
last_exception = e
# Only retry if the error is marked as retryable and we have retries left
# attempt starts at 1, so (attempt - 1) gives us the retry count
if not e.is_retryable or (attempt - 1) >= max_retry_limit:
raise e

# Calculate delay for exponential backoff
delay = BackoffPolicy.delay(attempt, retry_initial_delay, retry_backoff_factor, retry_jitter)
logging.warning(
f"Tool {function_name} failed (attempt {attempt}/{max_retry_limit + 1}): {e}. "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)

except Exception as e:
# Wrap unexpected exceptions in ToolExecutionError
# Most tool errors are considered retryable unless they're programming errors
is_retryable = not isinstance(e, (ValueError, TypeError, AttributeError))
tool_error = ToolExecutionError(
f"Tool '{function_name}' failed: {e}",
tool_name=function_name,
agent_id=self.name,
is_retryable=is_retryable,
)
last_exception = tool_error

# attempt starts at 1, so (attempt - 1) gives us the retry count
if not is_retryable or (attempt - 1) >= max_retry_limit:
raise tool_error from e

# Calculate delay for exponential backoff
delay = BackoffPolicy.delay(attempt, retry_initial_delay, retry_backoff_factor, retry_jitter)
logging.warning(
f"Tool {function_name} failed (attempt {attempt}/{max_retry_limit + 1}): {e}. "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)

# Apply prompt injection protection for external tools
# Zero-cost for trusted tools, wraps external content in security markers
Expand Down Expand Up @@ -418,22 +529,20 @@ def execute_with_context():

return result
except Exception as e:
# Emit tool call end with error
# Emit tool call end with error for exceptions that escape the retry loop
_duration_ms = (_time.time() - _tool_start_time) * 1000
_trace_emitter.tool_call_end(self.name, function_name, None, _duration_ms, str(e))

# Preserve existing ToolExecutionError unchanged to maintain loop guard HALT behavior
if isinstance(e, ToolExecutionError):
raise

# Gap 3a fix: Wrap exceptions in ToolExecutionError for better observability
is_retryable = not isinstance(e, (ValueError, TypeError, AttributeError))
raise ToolExecutionError(
f"Tool '{function_name}' failed: {e}",
tool_name=function_name,
agent_id=self.name,
is_retryable=is_retryable,
) from e
# Wrap the exception if it's not already a ToolExecutionError
if not isinstance(e, ToolExecutionError):
is_retryable = not isinstance(e, (ValueError, TypeError, AttributeError))
raise ToolExecutionError(
f"Tool '{function_name}' failed: {e}",
tool_name=function_name,
agent_id=self.name,
is_retryable=is_retryable,
) from e
raise # Re-raise if already wrapped

def _trigger_after_agent_hook(self, prompt, response, start_time, tools_used=None):
"""Trigger AFTER_AGENT hook and return response."""
Expand Down
16 changes: 15 additions & 1 deletion src/praisonai-agents/praisonaiagents/config/feature_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ class ExecutionConfig:

# Retry settings
max_retry_limit: int = 2
retry_initial_delay: float = 1.0 # seconds
retry_backoff_factor: float = 2.0
retry_jitter: float = 0.1 # fraction of computed delay

# Tool call limits (loop protection)
max_tool_calls_per_turn: int = 10
Expand Down Expand Up @@ -766,7 +769,7 @@ class ExecutionConfig:
parallel_tool_calls: bool = False

def __post_init__(self) -> None:
"""Post-initialization processing with deprecation warnings."""
"""Post-initialization processing with deprecation warnings and validation."""
# Handle context_compaction serialization round-trip
if isinstance(self.context_compaction, dict):
Comment on lines 771 to 774

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Retry validation is silently bypassed on every normal instantiation. __post_init__ returns early (lines 784, 786, 801) whenever context_compaction is False (the default) and the caller is internal library code — which is the case for every ExecutionConfig() created during agent startup. The three raise ValueError guards at lines 815–820 are therefore never reached in practice, so callers can pass retry_initial_delay=-1.0 or retry_backoff_factor=0.5 without error. Moving the validation before the context_compaction guard block ensures it always runs.

Suggested change
def __post_init__(self) -> None:
"""Post-initialization processing with deprecation warnings."""
"""Post-initialization processing with deprecation warnings and validation."""
# Handle context_compaction serialization round-trip
if isinstance(self.context_compaction, dict):
def __post_init__(self) -> None:
"""Post-initialization processing with deprecation warnings and validation."""
# Validate retry configuration parameters (must run before any early returns)
if self.retry_initial_delay <= 0:
raise ValueError("ExecutionConfig.retry_initial_delay must be positive.")
if self.retry_backoff_factor < 1.0:
raise ValueError("ExecutionConfig.retry_backoff_factor must be >= 1.0.")
if self.retry_jitter < 0:
raise ValueError("ExecutionConfig.retry_jitter must be non-negative.")
# Handle context_compaction serialization round-trip
if isinstance(self.context_compaction, dict):

from ..context.policy import ContextCompactionPolicy
Expand Down Expand Up @@ -807,6 +810,14 @@ def __post_init__(self) -> None:
ExecutionConfig._context_compaction_warned = True
finally:
del frame

# Validate retry configuration parameters
if self.retry_initial_delay <= 0:
raise ValueError("ExecutionConfig.retry_initial_delay must be positive.")
if self.retry_backoff_factor < 1.0:
raise ValueError("ExecutionConfig.retry_backoff_factor must be >= 1.0.")
if self.retry_jitter < 0:
raise ValueError("ExecutionConfig.retry_jitter must be non-negative.")
Comment on lines +771 to +820

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Missing max_retry_limit lower-bound guard in __post_init__

The new validation checks the three delay fields but not max_retry_limit itself. A caller passing max_retry_limit=-1 would produce range(1, 1) — an empty loop — so result stays None and is silently returned to the LLM with no error and no retry. Adding if self.max_retry_limit < 0: raise ValueError(...) makes the contract explicit.

Suggested change
def __post_init__(self) -> None:
"""Validate retry configuration parameters."""
if self.retry_initial_delay <= 0:
raise ValueError("ExecutionConfig.retry_initial_delay must be positive.")
if self.retry_backoff_factor < 1.0:
raise ValueError("ExecutionConfig.retry_backoff_factor must be >= 1.0.")
if self.retry_jitter < 0:
raise ValueError("ExecutionConfig.retry_jitter must be non-negative.")
def __post_init__(self) -> None:
"""Validate retry configuration parameters."""
if self.retry_initial_delay <= 0:
raise ValueError("ExecutionConfig.retry_initial_delay must be positive.")
if self.retry_backoff_factor < 1.0:
raise ValueError("ExecutionConfig.retry_backoff_factor must be >= 1.0.")
if self.retry_jitter < 0:
raise ValueError("ExecutionConfig.retry_jitter must be non-negative.")
if self.max_retry_limit < 0:
raise ValueError("ExecutionConfig.max_retry_limit must be non-negative.")


def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
Expand All @@ -815,6 +826,9 @@ def to_dict(self) -> Dict[str, Any]:
"max_rpm": self.max_rpm,
"max_execution_time": self.max_execution_time,
"max_retry_limit": self.max_retry_limit,
"retry_initial_delay": self.retry_initial_delay,
"retry_backoff_factor": self.retry_backoff_factor,
"retry_jitter": self.retry_jitter,
"code_execution": self.code_execution,
"code_mode": self.code_mode,
"code_sandbox_mode": self.code_sandbox_mode,
Expand Down
Loading