diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index bccd32d4..b924edd7 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -22,7 +22,7 @@ import random import time import uuid -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union from ldai import AIAgentConfig, AIJudgeConfig, AIJudgeConfigDefault, LDAIClient from ldai.models import LDMessage, ModelConfig @@ -49,6 +49,7 @@ LDApiClient, ) from ldai_optimizer.prompts import ( + _acceptance_criteria_implies_cost_optimization, _acceptance_criteria_implies_duration_optimization, build_message_history_text, build_new_variation_prompt, @@ -57,6 +58,7 @@ from ldai_optimizer.util import ( RedactionFilter, await_if_needed, + estimate_cost, extract_json_from_response, generate_slug, interpolate_variables, @@ -89,18 +91,40 @@ def _find_model_config( return global_match if global_match is not None else matching[0] +# Known provider prefixes used by the LD API (e.g. "Anthropic.claude-3"). +# Only strip the first segment when it is one of these known values so that +# model IDs whose first dotted segment is NOT a provider — such as Bedrock +# cross-region inference IDs like "us.amazon.nova-pro-v1:0" — are left intact. +_KNOWN_PROVIDER_PREFIXES: frozenset = frozenset({ + "Anthropic", + "Bedrock", + "Cohere", + "Google", + "Groq", + "Meta", + "Mistral", + "OpenAI", + "Perplexity", +}) + + def _strip_provider_prefix(model: str) -> str: """Strip the provider prefix from a model identifier returned by the LD API. API model keys are formatted as "Provider.model-name" (e.g. "OpenAI.gpt-5", - "Anthropic.claude-opus-4.6"). Only the part after the first period is needed - by the underlying LLM clients. If no period is present the string is returned - unchanged. + "Anthropic.claude-opus-4.6"). Only the segment before the first period is + stripped, and only when it is a recognised provider name. This prevents + incorrectly stripping region prefixes from Bedrock cross-region inference + IDs such as "us.amazon.nova-pro-v1:0". :param model: Raw model string from the API. - :return: Model name with provider prefix removed. + :return: Model name with provider prefix removed, or the original string if + the first segment is not a known provider. """ - return model.split(".", 1)[-1] + prefix, _, rest = model.partition(".") + if prefix in _KNOWN_PROVIDER_PREFIXES and rest: + return rest + return model def _compute_validation_count(pool_size: int) -> int: @@ -128,6 +152,33 @@ def _compute_validation_count(pool_size: int) -> int: # under 80% of the baseline — i.e. at least 20% improvement. _DURATION_TOLERANCE = 0.80 +# Cost gate: a candidate must cost at most this fraction of the baseline +# (history[0].estimated_cost_usd) to pass when acceptance criteria imply a +# cost reduction goal. 0.90 means at least 10% cheaper than the baseline. +_COST_TOLERANCE = 0.90 + +# Maximum number of history items retained in the standard (non-GT) optimizer. +# Since user inputs are randomly selected there is no "full pass" concept, so +# a small fixed window is sufficient context for variation generation. +_MAX_STANDARD_HISTORY_LENGTH = 5 + + +def _trim_history( + history: List["OptimizationContext"], max_len: int +) -> List["OptimizationContext"]: + """Trim history to at most max_len of the most recent items. + + The duration/cost baselines are captured explicitly in ``_baseline_duration_ms`` + and ``_baseline_cost_usd`` so the oldest entry no longer needs to be preserved. + + :param history: Current accumulated history list. + :param max_len: Maximum number of items to retain (must be >= 1). + :return: Trimmed history list, or the original list if already within limit. + """ + if len(history) <= max_len: + return history + return history[-max_len:] + # Maps SDK status strings to the API status/activity values expected by # agent_optimization_result records. Defined at module level to avoid # allocating the dict on every on_status_update invocation. @@ -160,6 +211,8 @@ def __init__(self, ldClient: LDAIClient) -> None: self._last_optimization_result_id: Optional[str] = None self._initial_tool_keys: List[str] = [] self._total_token_usage: int = 0 + self._model_configs: List[Dict[str, Any]] = [] + self._last_batch_size: int = 1 if os.environ.get("LAUNCHDARKLY_API_KEY"): self._has_api_key = True @@ -187,6 +240,57 @@ def _initialize_class_members_from_config( agent_config.model.name if agent_config.model else None ) self._history: List[OptimizationContext] = [] + # Explicit baseline captured from the first iteration ever appended to history. + # Stored separately so that history truncation can be a pure slice without + # having to preserve history[0] as an anchor. + self._baseline_duration_ms: Optional[float] = None + self._baseline_cost_usd: Optional[float] = None + + def _record_baseline(self, ctx: OptimizationContext) -> None: + """Capture duration/cost baseline from a single context. + + Used by the standard (non-GT) optimization loop where each iteration + produces one result. Called once per run (subsequent calls are no-ops + once both values are set). Storing these explicitly lets + ``_trim_history`` use a simple tail slice without needing to preserve + ``history[0]`` as an anchor. + """ + if self._baseline_duration_ms is None and ctx.duration_ms is not None: + self._baseline_duration_ms = ctx.duration_ms + if self._baseline_cost_usd is None and ctx.estimated_cost_usd is not None: + self._baseline_cost_usd = ctx.estimated_cost_usd + + def _record_baseline_from_batch(self, attempt_results: List[OptimizationContext]) -> None: + """Capture duration/cost baseline as the average across a GT batch. + + Used by the GT optimization loop. The first attempt's N samples form + the baseline; averaging them gives a more stable reference than a + single sample and ensures comparisons in subsequent attempts reflect + the typical performance of the original configuration rather than an + outlier measurement. + + Called once per run (subsequent calls are no-ops once both values are + set). + + :param attempt_results: All completed sample contexts from the first + GT attempt. + """ + if not attempt_results: + return + if self._baseline_duration_ms is None: + durations = [ + ctx.duration_ms for ctx in attempt_results if ctx.duration_ms is not None + ] + if durations: + self._baseline_duration_ms = sum(durations) / len(durations) + if self._baseline_cost_usd is None: + costs = [ + ctx.estimated_cost_usd + for ctx in attempt_results + if ctx.estimated_cost_usd is not None + ] + if costs: + self._baseline_cost_usd = sum(costs) / len(costs) def _build_agent_config_for_context( self, ctx: OptimizationContext, skip_interpolation: bool = False @@ -252,6 +356,7 @@ def _create_optimization_context( user_input=user_input, history=tuple(flat_history), iteration=iteration, + accumulated_token_usage=self._total_token_usage if self._total_token_usage > 0 else None, ) @property @@ -392,6 +497,7 @@ async def _call_judges( agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, agent_duration_ms: Optional[float] = None, + agent_usage: Optional[Any] = None, ) -> Dict[str, JudgeResult]: """ Call all judges in parallel (auto-path). @@ -411,6 +517,8 @@ async def _call_judges( :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. Forwarded to acceptance judges whose statement implies a latency goal so they can mention the duration change in their rationale. + :param agent_usage: Token usage from the agent call. Forwarded to acceptance judges + whose statement implies a cost goal so they can mention token usage in their rationale. :return: Dictionary of judge results (score and rationale) """ if not self._options.judges: @@ -464,6 +572,7 @@ async def _call_judges( agent_tools=resolved_agent_tools, expected_response=expected_response, agent_duration_ms=agent_duration_ms, + agent_usage=agent_usage, ) judge_results[judge_key] = result @@ -682,6 +791,7 @@ async def _evaluate_acceptance_judge( agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, agent_duration_ms: Optional[float] = None, + agent_usage: Optional[Any] = None, ) -> JudgeResult: """ Evaluate using an acceptance statement judge. @@ -699,6 +809,8 @@ async def _evaluate_acceptance_judge( :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. When the acceptance statement implies a latency goal, the judge is instructed to mention the duration change in its rationale. + :param agent_usage: Token usage from the agent call. When the acceptance statement + implies a cost goal, the judge is instructed to mention token usage and cost. :return: The judge result with score and rationale """ if not optimization_judge.acceptance_statement: @@ -740,11 +852,7 @@ async def _evaluate_acceptance_judge( {judge_key: optimization_judge} ) ): - baseline_ms = ( - self._history[0].duration_ms - if self._history and self._history[0].duration_ms is not None - else None - ) + baseline_ms = self._baseline_duration_ms instructions += ( f"\n\nThe acceptance criteria for this judge includes a latency/duration goal. " f"The agent's response took {agent_duration_ms:.0f}ms to generate. " @@ -757,8 +865,46 @@ async def _evaluate_acceptance_judge( f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. " ) instructions += ( - "Please mention the duration and any change from baseline in your rationale." + "In your rationale, state the duration and any change from baseline. " + "If the latency goal is not yet met, include specific, actionable suggestions " + "for how the agent's instructions or model choice could be changed to reduce " + "response time — for example: switching to a faster model, shortening the " + "system prompt, or removing instructions that cause multi-step reasoning. " + "These suggestions will be used directly to generate the next variation." + ) + + if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}): + current_cost = estimate_cost( + agent_usage, + _find_model_config(self._current_model or "", self._model_configs), ) + baseline_cost = self._baseline_cost_usd + if current_cost is not None: + instructions += ( + f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. " + ) + if agent_usage is not None: + instructions += ( + f"The agent's response used {agent_usage.input} input tokens " + f"and {agent_usage.output} output tokens " + f"(estimated cost: ${current_cost:.6f}). " + ) + if baseline_cost is not None: + delta = current_cost - baseline_cost + direction = "less" if delta < 0 else "more" + instructions += ( + f"The baseline cost (first iteration) was ${baseline_cost:.6f}. " + f"This response cost ${abs(delta):.6f} {direction} than the baseline. " + ) + instructions += ( + "In your rationale, state the token usage and cost, and any change from baseline. " + "If the cost goal is not yet met, include specific, actionable suggestions " + "for how the agent's instructions or model choice could be changed to reduce " + "cost — for example: switching to a cheaper model, shortening the system prompt " + "to reduce input tokens, removing unnecessary output instructions, or tightening " + "response length constraints. " + "These suggestions will be used directly to generate the next variation." + ) if resolved_variables: instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}" @@ -978,6 +1124,7 @@ async def _run_ground_truth_optimization( self._last_succeeded_context = None self._last_optimization_result_id = None self._total_token_usage = 0 + self._last_batch_size = 1 self._initialize_class_members_from_config(agent_config) # Seed from the first model choice on the first iteration @@ -1043,27 +1190,13 @@ async def _run_ground_truth_optimization( expected_response=sample.expected_response, ) self._accumulate_tokens(optimize_context) - if self._is_token_limit_exceeded(): - logger.error( - "[GT Attempt %d] -> Token limit exceeded on sample %d (total=%d)", - attempt, - i + 1, - self._total_token_usage, - ) - attempt_results.append(optimize_context) - self._last_run_succeeded = False - self._last_succeeded_context = None - self._safe_status_update("failure", optimize_context, linear_iter) - if self._options.on_failing_result: - try: - self._options.on_failing_result(optimize_context) - except Exception: - logger.exception( - "[GT Attempt %d] -> on_failing_result callback failed", attempt - ) - return attempt_results + optimize_context = dataclasses.replace( + optimize_context, accumulated_token_usage=self._total_token_usage + ) - # Per-sample pass/fail check + # Per-sample pass/fail check — evaluated before the token limit gate so + # that a sample which passed is not incorrectly stamped as FAILED simply + # because the budget was exhausted after its scores were computed. if self._options.on_turn is not None: try: sample_passed = self._options.on_turn(optimize_context) @@ -1077,10 +1210,14 @@ async def _run_ground_truth_optimization( else: sample_passed = self._evaluate_response(optimize_context) - if sample_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - sample_passed = self._evaluate_duration(optimize_context) + sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context) + sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context) + + # Flush gate scores to the API for this sample. Without this, + # the next sample's "generating" event closes out this record + # with a status-only PATCH before gate scores are sent, so only + # the last sample would ever show latency/cost gate entries. + self._safe_status_update("evaluating", optimize_context, linear_iter) if not sample_passed: logger.info( @@ -1101,6 +1238,10 @@ async def _run_ground_truth_optimization( attempt_results.append(optimize_context) + # Persist the completed sample so every API record gets its scores, + # generation tokens, and accumulated_total — not just the final one. + self._safe_status_update("turn completed", optimize_context, linear_iter) + if gt_options.on_sample_result is not None: try: gt_options.on_sample_result(optimize_context) @@ -1111,6 +1252,42 @@ async def _run_ground_truth_optimization( i + 1, ) + # Token limit check after pass/fail so the terminal status reflects + # whether the samples actually passed, not just that the budget was hit. + # Mark success only when every sample in this attempt was processed and + # all passed; stopping mid-batch is always a failure even if the + # partially-processed samples looked good. + if self._is_token_limit_exceeded(): + logger.error( + "[GT Attempt %d] -> Token limit exceeded on sample %d (total=%d)", + attempt, + i + 1, + self._total_token_usage, + ) + if all_passed and i == n - 1: + self._last_run_succeeded = True + self._last_succeeded_context = optimize_context + self._safe_status_update("success", optimize_context, linear_iter) + if self._options.on_passing_result: + try: + self._options.on_passing_result(optimize_context) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_passing_result callback failed", attempt + ) + else: + self._last_run_succeeded = False + self._last_succeeded_context = None + self._safe_status_update("failure", optimize_context, linear_iter) + if self._options.on_failing_result: + try: + self._options.on_failing_result(optimize_context) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_failing_result callback failed", attempt + ) + return attempt_results + last_ctx = attempt_results[-1] if all_passed: @@ -1150,8 +1327,15 @@ async def _run_ground_truth_optimization( return attempt_results # Append all N results to history so the variation generator has full context - # from all of the previous samples + # from all of the previous samples, then trim to one full attempt's worth so + # judge prompts don't grow unboundedly across many failed attempts. + if attempt_results: + self._record_baseline_from_batch(attempt_results) self._history.extend(attempt_results) + self._history = _trim_history(self._history, n) + # Track batch size so _all_judges_passing checks every sample in this + # attempt, not just the last one. + self._last_batch_size = n logger.info( "[GT Attempt %d] -> %d/%d samples failed — generating new variation", @@ -1227,12 +1411,19 @@ def _apply_new_variation_response( # This is a deterministic safety net for when the LLM ignores the prompt # instructions and hardcodes a concrete value (e.g. "user-123") instead # of the placeholder ("{{user_id}}"). + # Only check the variables that were actually used for this invocation so + # we don't spuriously replace values that happen to appear in other choices. + active_variables = ( + [variation_ctx.current_variables] + if variation_ctx.current_variables + else self._options.variable_choices + ) self._current_instructions, placeholder_warnings = restore_variable_placeholders( self._current_instructions, - self._options.variable_choices, + active_variables, ) for msg in placeholder_warnings: - logger.warning("[Iteration %d] -> %s", iteration, msg) + logger.debug("[Iteration %d] -> %s", iteration, msg) self._current_parameters = response_data["current_parameters"] @@ -1321,6 +1512,10 @@ async def _generate_new_variation( optimize_for_duration = _acceptance_criteria_implies_duration_optimization( self._options.judges ) + optimize_for_cost = _acceptance_criteria_implies_cost_optimization( + self._options.judges + ) + quality_already_passing = self._all_judges_passing() instructions = build_new_variation_prompt( self._history, self._options.judges, @@ -1331,6 +1526,8 @@ async def _generate_new_variation( self._options.variable_choices, self._initial_instructions, optimize_for_duration=optimize_for_duration, + optimize_for_cost=optimize_for_cost, + quality_already_passing=quality_already_passing, ) # Create a flat history list (without nested history) to avoid exponential growth @@ -1367,6 +1564,8 @@ async def _generate_new_variation( False, ) variation_response: OptimizationResponse = await await_if_needed(result) + if variation_response.usage is not None: + self._total_token_usage += variation_response.usage.total or 0 response_str = variation_response.output try: response_data = extract_json_from_response(response_str) @@ -1424,6 +1623,7 @@ async def optimize_from_config( model_configs = api_client.get_model_configs(options.project_key) except Exception as exc: logger.debug("Could not pre-fetch model configs: %s", exc) + self._model_configs = model_configs context = random.choice(options.context_choices) # _get_agent_config calls _initialize_class_members_from_config internally; @@ -1633,11 +1833,18 @@ def _persist_and_forward( if snapshot.duration_ms is not None: patch["generationLatency"] = int(snapshot.duration_ms) if snapshot.usage is not None: - patch["generationTokens"] = { + gen_tokens: Dict[str, Any] = { "total": snapshot.usage.total, "input": snapshot.usage.input, "output": snapshot.usage.output, } + if snapshot.accumulated_token_usage is not None: + gen_tokens["accumulated_total"] = snapshot.accumulated_token_usage + patch["generationTokens"] = gen_tokens + elif snapshot.accumulated_token_usage is not None: + patch["generationTokens"] = { + "accumulated_total": snapshot.accumulated_token_usage + } eval_latencies = { k: v.duration_ms for k, v in snapshot.scores.items() @@ -1793,18 +2000,24 @@ async def _execute_agent_turn( agent_tools=agent_tools, expected_response=expected_response, agent_duration_ms=agent_duration_ms, + agent_usage=agent_response.usage, ) # Build the fully-populated result context before firing the evaluating event so # the PATCH includes scores, generationLatency, and completionResponse. This is # particularly important for non-final GT samples which receive no further status # events — without this, those fields would never be written to their API records. + agent_cost = estimate_cost( + agent_response.usage, + _find_model_config(self._current_model or "", self._model_configs), + ) result_ctx = dataclasses.replace( optimize_context, completion_response=completion_response, scores=scores, duration_ms=agent_duration_ms, usage=agent_response.usage, + estimated_cost_usd=agent_cost, ) if self._options.judges: @@ -1829,13 +2042,13 @@ def _accumulate_tokens(self, optimize_context: OptimizationContext) -> None: def _is_token_limit_exceeded(self) -> bool: """Return True if the accumulated token usage has met or exceeded the configured limit. - Returns False when no token limit is set so callers can use this as a - simple guard without needing to check for ``None`` themselves. + Returns False when no token limit is set, or when the limit is 0 (which is + treated as "no limit" — a sentinel value meaning the field was left unset). - :return: True if token limit is set and ``_total_token_usage >= token_limit``. + :return: True if a positive token limit is set and ``_total_token_usage >= token_limit``. """ limit: Optional[int] = getattr(self._options, "token_limit", None) - return limit is not None and self._total_token_usage >= limit + return bool(limit) and self._total_token_usage >= limit def _evaluate_response(self, optimize_context: OptimizationContext) -> bool: """ @@ -1868,22 +2081,22 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: """ Check whether the candidate's duration meets the improvement target vs. the baseline. - The baseline is history[0].duration_ms — the very first completed iteration, - representing the original unoptimized configuration's latency. The candidate - must be at least _DURATION_TOLERANCE faster (default: 20% improvement). + The baseline is the duration_ms from the very first iteration appended to history, + captured in ``_baseline_duration_ms``. The candidate must be at least + _DURATION_TOLERANCE faster (default: 20% improvement). - Returns True without blocking when no baseline is available (empty history or - history[0].duration_ms is None), or when the candidate's duration_ms was not - captured. This avoids penalising configurations when timing data is missing. + Returns True without blocking when no baseline is available or when the candidate's + duration_ms was not captured. This avoids penalising configurations when timing data + is missing. :param optimize_context: The completed turn context containing duration_ms :return: True if the duration requirement is met or cannot be checked """ - if not self._history or self._history[0].duration_ms is None: + if self._baseline_duration_ms is None: return True if optimize_context.duration_ms is None: return True - baseline = self._history[0].duration_ms + baseline = self._baseline_duration_ms passed = optimize_context.duration_ms < baseline * _DURATION_TOLERANCE if not passed: logger.warning( @@ -1896,6 +2109,181 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: ) return passed + def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: + """ + Check whether the candidate's estimated cost meets the improvement target vs. the baseline. + + The baseline is the estimated_cost_usd from the very first iteration appended to + history, captured in ``_baseline_cost_usd``. The candidate must be at least + _COST_TOLERANCE cheaper (default: 10% improvement). + + The cost value is in USD when model pricing data is available, or raw total token + count as a proxy when pricing is absent. Both are comparable relative to their + own baselines. + + Returns True without blocking when no baseline is available or when the candidate's + cost was not captured. This avoids penalising configurations when cost data is missing. + + :param optimize_context: The completed turn context containing estimated_cost_usd + :return: True if the cost requirement is met or cannot be checked + """ + if self._baseline_cost_usd is None: + return True + if optimize_context.estimated_cost_usd is None: + return True + baseline = self._baseline_cost_usd + passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE + if not passed: + logger.warning( + "[Iteration %d] -> Cost check failed: %.6f >= baseline %.6f * %.0f%% (%.6f)", + optimize_context.iteration, + optimize_context.estimated_cost_usd, + baseline, + _COST_TOLERANCE * 100, + baseline * _COST_TOLERANCE, + ) + return passed + + def _all_judges_passing(self) -> bool: + """Return True if every user-configured judge passed in every sample of the most recent batch. + + In ground-truth mode the last ``_last_batch_size`` entries in ``_history`` + correspond to the samples from the latest attempt. All of them must pass; + checking only the last entry would incorrectly return True when a middle sample + failed but the final sample passed. + + In single-sample (non-GT) mode ``_last_batch_size`` is 1, so only the most + recent entry is inspected (original behaviour). + + Synthetic gate entries (keys beginning with ``_``) are skipped. + Returns False when history is empty or any judge score does not meet its threshold. + + This is used to decide whether variation generation should preserve the current + behaviour and only optimise for cost, rather than trying to improve quality further. + """ + if not self._history or not self._options.judges: + return False + batch = self._history[-self._last_batch_size:] + for ctx in batch: + if not ctx.scores: + return False + for key, judge in self._options.judges.items(): + result = ctx.scores.get(key) + if result is None: + return False + threshold = judge.threshold if judge.threshold is not None else 1.0 + if not judge_passed(result.score, threshold, judge.is_inverted): + return False + return True + + def _apply_duration_gate( + self, passed_so_far: bool, ctx: OptimizationContext + ) -> Tuple[bool, OptimizationContext]: + """Apply the latency improvement gate and record its result in ctx.scores. + + When the gate is active (any acceptance statement implies latency optimization), + evaluates whether the candidate's duration improved by at least + _DURATION_TOLERANCE vs the baseline. A synthetic ``_latency_gate`` entry is + always added to scores with score=1.0 on pass or score=0.0 on fail so the + outcome is visible in the API result and UI for every iteration. + + The gate score is recorded even when ``passed_so_far`` is False (quality + judges already failed) so that latency telemetry is visible on all + iterations, not just passing ones. In that case it is informational only + and cannot block the iteration further. + + The gate is skipped entirely (no score entry added) only when no acceptance + statement implies latency optimization. + + :param passed_so_far: Whether all prior checks for this sample passed. + :param ctx: Current optimization context. + :return: (passed, updated_ctx) where passed reflects gate outcome. + """ + if not _acceptance_criteria_implies_duration_optimization(self._options.judges): + return passed_so_far, ctx + passed = self._evaluate_duration(ctx) + if passed: + if self._baseline_duration_ms is not None and ctx.duration_ms is not None: + rationale = ( + f"Latency improvement gate passed: {ctx.duration_ms:.0f}ms is at least " + f"{int((1 - _DURATION_TOLERANCE) * 100)}% faster than baseline " + f"{self._baseline_duration_ms:.0f}ms." + ) + else: + rationale = "Latency gate passed (no baseline)." + score = 1.0 + else: + rationale = ( + f"Latency improvement gate failed: {ctx.duration_ms:.0f}ms did not improve " + f"by {int((1 - _DURATION_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_duration_ms:.0f}ms " + f"(required < {self._baseline_duration_ms * _DURATION_TOLERANCE:.0f}ms)." + ) + score = 0.0 + ctx = dataclasses.replace( + ctx, + scores={**ctx.scores, "_latency_gate": JudgeResult( + score=score, + rationale=rationale, + duration_ms=ctx.duration_ms, + )}, + ) + return passed_so_far and passed, ctx + + def _apply_cost_gate( + self, passed_so_far: bool, ctx: OptimizationContext + ) -> Tuple[bool, OptimizationContext]: + """Apply the cost improvement gate and record its result in ctx.scores. + + When the gate is active (any acceptance statement implies cost optimization), + evaluates whether the candidate's estimated cost improved by at least + _COST_TOLERANCE vs the baseline. A synthetic ``_cost_gate`` entry is always + added to scores with score=1.0 on pass or score=0.0 on fail. + + The gate score is recorded even when ``passed_so_far`` is False (quality + judges already failed) so that cost telemetry is visible on all iterations, + not just passing ones. In that case it is informational only and cannot + block the iteration further. + + The gate is skipped entirely (no score entry added) only when no acceptance + statement implies cost optimization. + + :param passed_so_far: Whether all prior checks for this sample passed. + :param ctx: Current optimization context. + :return: (passed, updated_ctx) where passed reflects gate outcome. + """ + if not _acceptance_criteria_implies_cost_optimization(self._options.judges): + return passed_so_far, ctx + passed = self._evaluate_cost(ctx) + if passed: + if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: + rationale = ( + f"Cost improvement gate passed: {ctx.estimated_cost_usd:.6f} is at least " + f"{int((1 - _COST_TOLERANCE) * 100)}% cheaper than baseline " + f"{self._baseline_cost_usd:.6f}." + ) + else: + rationale = "Cost gate passed (no baseline)." + score = 1.0 + else: + rationale = ( + f"Cost improvement gate failed: {ctx.estimated_cost_usd:.6f} did not improve " + f"by {int((1 - _COST_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_cost_usd:.6f} " + f"(required < {self._baseline_cost_usd * _COST_TOLERANCE:.6f})." + ) + score = 0.0 + ctx = dataclasses.replace( + ctx, + scores={**ctx.scores, "_cost_gate": JudgeResult( + score=score, + rationale=rationale, + duration_ms=ctx.duration_ms, + estimated_cost_usd=ctx.estimated_cost_usd, + )}, + ) + return passed_so_far and passed, ctx + def _handle_success( self, optimize_context: OptimizationContext, iteration: int ) -> Any: @@ -2149,15 +2537,12 @@ async def _run_validation_phase( self._safe_status_update("generating", val_ctx, val_iter) val_ctx = await self._execute_agent_turn(val_ctx, val_iter) self._accumulate_tokens(val_ctx) - if self._is_token_limit_exceeded(): - logger.error( - "[Validation %d/%d] -> Token limit exceeded (total=%d)", - i + 1, - validation_count, - self._total_token_usage, - ) - return False, val_ctx + val_ctx = dataclasses.replace( + val_ctx, accumulated_token_usage=self._total_token_usage + ) + # Evaluate pass/fail before the token limit check so a passing validation + # sample is not incorrectly treated as a failure due to budget exhaustion. if options.on_turn is not None: try: sample_passed = options.on_turn(val_ctx) @@ -2169,10 +2554,17 @@ async def _run_validation_phase( else: sample_passed = self._evaluate_response(val_ctx) - if sample_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - sample_passed = self._evaluate_duration(val_ctx) + sample_passed, val_ctx = self._apply_duration_gate(sample_passed, val_ctx) + sample_passed, val_ctx = self._apply_cost_gate(sample_passed, val_ctx) + + if self._is_token_limit_exceeded(): + logger.error( + "[Validation %d/%d] -> Token limit exceeded (total=%d)", + i + 1, + validation_count, + self._total_token_usage, + ) + return False, val_ctx last_ctx = val_ctx @@ -2214,6 +2606,7 @@ async def _run_optimization( self._last_succeeded_context = None self._last_optimization_result_id = None self._total_token_usage = 0 + self._last_batch_size = 1 self._initialize_class_members_from_config(agent_config) # If the LD flag doesn't carry a model name, seed from the first model choice @@ -2260,13 +2653,9 @@ async def _run_optimization( optimize_context, iteration ) self._accumulate_tokens(optimize_context) - if self._is_token_limit_exceeded(): - logger.error( - "[Iteration %d] -> Token limit exceeded (total=%d)", - iteration, - self._total_token_usage, - ) - return self._handle_failure(optimize_context, iteration) + optimize_context = dataclasses.replace( + optimize_context, accumulated_token_usage=self._total_token_usage + ) # Manual path: on_turn callback gives caller full control over pass/fail if self._options.on_turn is not None: @@ -2293,10 +2682,18 @@ async def _run_optimization( iteration, ) - if initial_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - initial_passed = self._evaluate_duration(optimize_context) + initial_passed, optimize_context = self._apply_duration_gate(initial_passed, optimize_context) + initial_passed, optimize_context = self._apply_cost_gate(initial_passed, optimize_context) + + # Token limit check after pass/fail evaluation so the persisted record + # correctly reflects whether the iteration passed before stopping the run. + if self._is_token_limit_exceeded(): + logger.error( + "[Iteration %d] -> Token limit exceeded (total=%d)", + iteration, + self._total_token_usage, + ) + return self._handle_failure(optimize_context, iteration) if initial_passed: all_valid, last_ctx = await self._run_validation_phase( @@ -2319,7 +2716,9 @@ async def _run_optimization( ) if iteration >= self._options.max_attempts: return self._handle_failure(optimize_context, iteration) + self._record_baseline(last_ctx) self._history.append(last_ctx) + self._history = _trim_history(self._history, _MAX_STANDARD_HISTORY_LENGTH) try: await self._generate_new_variation( iteration, last_ctx.current_variables @@ -2349,7 +2748,9 @@ async def _run_optimization( ) if iteration >= self._options.max_attempts: return self._handle_failure(optimize_context, iteration) + self._record_baseline(optimize_context) self._history.append(optimize_context) + self._history = _trim_history(self._history, _MAX_STANDARD_HISTORY_LENGTH) try: await self._generate_new_variation( iteration, optimize_context.current_variables diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index fab3ed72..6e308f67 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -43,6 +43,7 @@ class JudgeResult: rationale: Optional[str] = None duration_ms: Optional[float] = None usage: Optional[TokenUsage] = None + estimated_cost_usd: Optional[float] = None def to_json(self) -> Dict[str, Any]: """ @@ -61,6 +62,8 @@ def to_json(self) -> Dict[str, Any]: "input": self.usage.input, "output": self.usage.output, } + if self.estimated_cost_usd is not None: + result["estimated_cost_usd"] = self.estimated_cost_usd return result @@ -217,6 +220,8 @@ class OptimizationContext: iteration: int = 0 # current iteration number duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration + estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens + accumulated_token_usage: Optional[int] = None # single running total across ALL calls in this run (generation + judges + variation) def copy_without_history(self) -> OptimizationContext: """ @@ -236,6 +241,8 @@ def copy_without_history(self) -> OptimizationContext: iteration=self.iteration, duration_ms=self.duration_ms, usage=self.usage, + estimated_cost_usd=self.estimated_cost_usd, + accumulated_token_usage=self.accumulated_token_usage, ) def to_json(self) -> Dict[str, Any]: @@ -261,6 +268,8 @@ def to_json(self) -> Dict[str, Any]: "history": history_list, "iteration": self.iteration, "duration_ms": self.duration_ms, + "estimated_cost_usd": self.estimated_cost_usd, + "accumulated_token_usage": self.accumulated_token_usage, } if self.usage is not None: result["usage"] = { diff --git a/packages/optimization/src/ldai_optimizer/ld_api_client.py b/packages/optimization/src/ldai_optimizer/ld_api_client.py index 3efa725d..37f6549e 100644 --- a/packages/optimization/src/ldai_optimizer/ld_api_client.py +++ b/packages/optimization/src/ldai_optimizer/ld_api_client.py @@ -118,7 +118,7 @@ class AgentOptimizationResultPatch(TypedDict, total=False): completionResponse: str scores: Dict[str, Any] generationLatency: int - generationTokens: Dict[str, int] + generationTokens: Dict[str, Any] evaluationLatencies: Dict[str, float] evaluationTokens: Dict[str, Dict[str, int]] variation: Dict[str, Any] diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index 4aae5fee..0cc8a53d 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -16,6 +16,13 @@ re.IGNORECASE, ) +_COST_KEYWORDS = re.compile( + r"\b(cheap|cheaper|cheapest|costs?|costly|expensive|budget|affordable|" + r"spend|spending|economical|cost-effective|frugal|" + r"price|pricing|bill|billing)\b", + re.IGNORECASE, +) + def _acceptance_criteria_implies_duration_optimization( judges: Optional[Dict[str, OptimizationJudge]], @@ -39,6 +46,28 @@ def _acceptance_criteria_implies_duration_optimization( return False +def _acceptance_criteria_implies_cost_optimization( + judges: Optional[Dict[str, OptimizationJudge]], +) -> bool: + """Return True if any judge acceptance statement implies a cost reduction goal. + + Scans each judge's acceptance_statement for cost-related keywords. The + check is case-insensitive. Returns False when judges is None or no judge + carries an acceptance statement. + + :param judges: Judge configuration dict from OptimizationOptions, or None. + :return: True if cost optimization should be applied. + """ + if not judges: + return False + for judge in judges.values(): + if judge.acceptance_statement and _COST_KEYWORDS.search( + judge.acceptance_statement + ): + return True + return False + + def build_message_history_text( history: List[OptimizationContext], input_text: str, @@ -114,6 +143,8 @@ def build_new_variation_prompt( variable_choices: List[Dict[str, Any]], initial_instructions: str, optimize_for_duration: bool = False, + optimize_for_cost: bool = False, + quality_already_passing: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -133,6 +164,11 @@ def build_new_variation_prompt( :param initial_instructions: The original unmodified instructions template :param optimize_for_duration: When True, appends a duration optimization section instructing the LLM to prefer faster models and simpler instructions. + :param optimize_for_cost: When True, appends a cost optimization section + instructing the LLM to prefer cheaper models and reduce token usage. + :param quality_already_passing: When True, signals that all judge criteria are + currently passing and the cost optimization section should instruct the LLM + to preserve existing behavior while only reducing cost. :return: The assembled prompt string """ sections = [ @@ -147,6 +183,7 @@ def build_new_variation_prompt( history, model_choices, variable_choices, initial_instructions ), variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", + variation_prompt_cost_optimization(model_choices, quality_already_passing=quality_already_passing) if optimize_for_cost else "", ] return "\n\n".join(s for s in sections if s) @@ -248,6 +285,8 @@ def variation_prompt_configuration( lines.append(f"Agent response: {previous_ctx.completion_response}") if previous_ctx.duration_ms is not None: lines.append(f"Agent duration: {previous_ctx.duration_ms:.0f}ms") + if previous_ctx.estimated_cost_usd is not None: + lines.append(f"Estimated agent cost: ${previous_ctx.estimated_cost_usd:.6f}") return "\n".join(lines) else: return "\n".join( @@ -301,6 +340,8 @@ def variation_prompt_feedback( lines.append(feedback_line) if ctx.duration_ms is not None: lines.append(f"Agent duration: {ctx.duration_ms:.0f}ms") + if ctx.estimated_cost_usd is not None: + lines.append(f"Estimated agent cost: ${ctx.estimated_cost_usd:.6f}") return "\n".join(lines) @@ -556,3 +597,76 @@ def variation_prompt_duration_optimization(model_choices: List[str]) -> str: "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower latency.", ] ) + + +def variation_prompt_cost_optimization( + model_choices: List[str], + quality_already_passing: bool = False, +) -> str: + """ + Cost optimization section of the variation prompt. + + Included when acceptance criteria imply a cost reduction goal. Instructs + the LLM to treat token usage as a secondary objective — quality criteria + must still be met first — and provides concrete guidance on how to reduce + cost through model selection and instruction simplification. + + When ``quality_already_passing`` is True, the framing shifts: since all + judge criteria are already satisfied, the LLM is instructed to preserve + the existing behavior exactly and only apply changes that reduce cost + without affecting output quality. + + :param model_choices: List of model IDs the LLM may select from, so it can + apply its own knowledge of which models tend to be cheaper. + :param quality_already_passing: When True, signals that all judge criteria + are currently passing. The section will direct the LLM to preserve + output quality and focus exclusively on cost reduction strategies. + :return: The cost optimization prompt block. + """ + if quality_already_passing: + intent_lines = [ + "## Cost Optimization:", + "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", + "*** IMPORTANT: All quality acceptance criteria are currently passing. ***", + "The goal of this variation is to reduce cost WITHOUT changing the behavior or quality of the agent's responses.", + "Do NOT alter the instructions in ways that would change what the agent says or how it reasons.", + "Only apply changes that reduce token usage or switch to a cheaper model while preserving the same output quality.", + "If you cannot reduce cost without risking quality, keep the instructions unchanged and only consider a cheaper model.", + "", + ] + else: + intent_lines = [ + "## Cost Optimization:", + "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", + "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", + "", + ] + + shared_lines = [ + "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", + "Target both factors with the strategies below.", + "", + "### Reducing token usage (input tokens):", + "- Remove redundant, verbose, or repeated phrasing from the instructions.", + "- Collapse multi-sentence explanations into a single concise directive.", + "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", + "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", + "- Use bullet points instead of prose where possible — they are more token-efficient.", + "", + "### Reducing token usage (output tokens):", + "- Instruct the agent to be concise and avoid unnecessary elaboration.", + "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", + "- Set or reduce max_tokens if the current value allows longer responses than needed.", + "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", + "", + "### Reducing per-token cost via model selection:", + "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", + f" Available models: {model_choices}", + " Use your knowledge of relative model pricing to prefer lower-cost options.", + " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + "", + "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", + "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", + ] + + return "\n".join(intent_lines + shared_lines) diff --git a/packages/optimization/src/ldai_optimizer/util.py b/packages/optimization/src/ldai_optimizer/util.py index 46429e50..b725f227 100644 --- a/packages/optimization/src/ldai_optimizer/util.py +++ b/packages/optimization/src/ldai_optimizer/util.py @@ -5,7 +5,10 @@ import logging import random import re -from typing import Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union + +if TYPE_CHECKING: + from ldai.tracker import TokenUsage from ldai_optimizer._slug_words import _ADJECTIVES, _NOUNS @@ -313,3 +316,44 @@ def judge_passed(score: float, threshold: float, is_inverted: bool) -> bool: the score must stay at or below the threshold: ``score <= threshold``. """ return score <= threshold if is_inverted else score >= threshold + + +def estimate_cost( + usage: Optional["TokenUsage"], + model_config: Optional[Dict[str, Any]], +) -> Optional[float]: + """Estimate the monetary cost of a single agent call in USD. + + Uses ``costPerInputToken`` and ``costPerOutputToken`` from the model config. + Returns ``None`` when either ``usage`` is ``None`` or no pricing fields are + present on the model config — ensuring the return value is always in USD or + absent, never a raw token count. This prevents unit-mismatch bugs when + comparing costs across iterations where the model (and its pricing + availability) may differ. + + ``costPerCachedInputToken`` is intentionally ignored — the estimate uses + input/output tokens only. + + :param usage: Token usage from the agent call. When ``None``, returns ``None``. + :param model_config: Model config dict from ``get_model_configs()``, or ``None``. + :return: Estimated cost in USD, or ``None`` if usage or pricing data is absent, or if + both ``usage.input`` and ``usage.output`` are ``None`` (no token counts available). + """ + if usage is None: + return None + + input_price = model_config.get("costPerInputToken") if model_config else None + output_price = model_config.get("costPerOutputToken") if model_config else None + + if input_price is None and output_price is None: + return None + + cost = 0.0 + computed = False + if input_price is not None and usage.input is not None: + cost += usage.input * input_price + computed = True + if output_price is not None and usage.output is not None: + cost += usage.output * output_price + computed = True + return cost if computed else None diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 46f2d876..6d94d3df 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -1,5 +1,6 @@ """Tests for OptimizationClient.""" +import dataclasses import json from typing import Any, Dict from unittest.mock import AsyncMock, MagicMock, patch @@ -10,7 +11,14 @@ from ldai.tracker import TokenUsage from ldclient import Context -from ldai_optimizer.client import OptimizationClient, _compute_validation_count, _find_model_config +from ldai_optimizer.client import ( + OptimizationClient, + _MAX_STANDARD_HISTORY_LENGTH, + _compute_validation_count, + _find_model_config, + _strip_provider_prefix, + _trim_history, +) from ldai_optimizer.util import judge_passed from ldai_optimizer.dataclasses import ( AIJudgeCallConfig, @@ -26,15 +34,17 @@ ToolDefinition, ) from ldai_optimizer.prompts import ( + _acceptance_criteria_implies_cost_optimization, _acceptance_criteria_implies_duration_optimization, build_new_variation_prompt, variation_prompt_acceptance_criteria, + variation_prompt_cost_optimization, variation_prompt_feedback, variation_prompt_improvement_instructions, variation_prompt_overfit_warning, variation_prompt_preamble, ) -from ldai_optimizer.util import interpolate_variables +from ldai_optimizer.util import estimate_cost, interpolate_variables from ldai_optimizer.util import ( restore_variable_placeholders, ) @@ -120,6 +130,40 @@ def _make_client(ldai: MagicMock | None = None) -> OptimizationClient: # --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- +# _strip_provider_prefix +# --------------------------------------------------------------------------- + + +class TestStripProviderPrefix: + def test_strips_known_anthropic_prefix(self): + assert _strip_provider_prefix("Anthropic.claude-opus-4-5") == "claude-opus-4-5" + + def test_strips_known_openai_prefix(self): + assert _strip_provider_prefix("OpenAI.gpt-4o") == "gpt-4o" + + def test_strips_known_bedrock_prefix(self): + # "Bedrock.us.amazon.nova-pro-v1:0" → region prefix is preserved + assert _strip_provider_prefix("Bedrock.us.amazon.nova-pro-v1:0") == "us.amazon.nova-pro-v1:0" + + def test_does_not_strip_bedrock_region_prefix(self): + # Raw Bedrock cross-region ID has no provider prefix — must be unchanged + assert _strip_provider_prefix("us.amazon.nova-pro-v1:0") == "us.amazon.nova-pro-v1:0" + + def test_does_not_strip_eu_region_prefix(self): + assert _strip_provider_prefix("eu.anthropic.claude-3-5-sonnet-20241022-v2:0") == "eu.anthropic.claude-3-5-sonnet-20241022-v2:0" + + def test_no_period_returns_unchanged(self): + assert _strip_provider_prefix("gpt-4o") == "gpt-4o" + + def test_empty_string_returns_unchanged(self): + assert _strip_provider_prefix("") == "" + + def test_preserves_dots_in_model_name_after_stripping(self): + # Multiple dots after the provider prefix are preserved + assert _strip_provider_prefix("Anthropic.claude-3.5-sonnet") == "claude-3.5-sonnet" + + # --------------------------------------------------------------------------- # _find_model_config # --------------------------------------------------------------------------- @@ -235,6 +279,43 @@ def test_converts_object_with_to_dict(self): # --------------------------------------------------------------------------- +class TestIsTokenLimitExceeded: + def _client_with_limit(self, limit): + client = _make_client() + client._options = _make_options(token_limit=limit) + return client + + def test_no_limit_returns_false(self): + client = self._client_with_limit(None) + client._total_token_usage = 9999 + assert client._is_token_limit_exceeded() is False + + def test_zero_limit_treated_as_no_limit(self): + client = self._client_with_limit(0) + client._total_token_usage = 0 + assert client._is_token_limit_exceeded() is False + + def test_zero_limit_with_high_usage_returns_false(self): + client = self._client_with_limit(0) + client._total_token_usage = 100_000 + assert client._is_token_limit_exceeded() is False + + def test_positive_limit_not_yet_reached(self): + client = self._client_with_limit(1000) + client._total_token_usage = 999 + assert client._is_token_limit_exceeded() is False + + def test_positive_limit_exactly_reached(self): + client = self._client_with_limit(1000) + client._total_token_usage = 1000 + assert client._is_token_limit_exceeded() is True + + def test_positive_limit_exceeded(self): + client = self._client_with_limit(1000) + client._total_token_usage = 1001 + assert client._is_token_limit_exceeded() is True + + class TestEvaluateResponse: def setup_method(self): self.client = _make_client() @@ -496,10 +577,10 @@ async def test_duration_context_added_to_instructions_when_latency_keyword_prese ) _, config, _, _ = self.handle_judge_call.call_args.args assert "1500ms" in config.instructions - assert "mention the duration" in config.instructions + assert "state the duration" in config.instructions async def test_duration_context_includes_baseline_comparison_when_history_present(self): - """When history[0] has a duration, the judge instructions include a baseline comparison.""" + """When a baseline duration is captured, the judge instructions include a baseline comparison.""" self.client._history = [ OptimizationContext( scores={}, @@ -511,6 +592,7 @@ async def test_duration_context_includes_baseline_comparison_when_history_presen duration_ms=2000.0, ) ] + self.client._baseline_duration_ms = 2000.0 judge = OptimizationJudge( threshold=0.8, acceptance_statement="Responses should have low latency.", @@ -542,6 +624,7 @@ async def test_duration_context_says_slower_when_candidate_is_slower(self): duration_ms=1000.0, ) ] + self.client._baseline_duration_ms = 1000.0 judge = OptimizationJudge( threshold=0.8, acceptance_statement="The response must be fast.", @@ -1803,11 +1886,11 @@ async def test_apply_variation_response_calls_restore_and_logs_warning(self): with patch("ldai_optimizer.client.logger") as mock_logger: await client._generate_new_variation(iteration=1, variables={}) - warning_calls = [ - call for call in mock_logger.warning.call_args_list + debug_calls = [ + call for call in mock_logger.debug.call_args_list if "user-123" in str(call) or "business" in str(call) ] - assert len(warning_calls) >= 1 + assert len(debug_calls) >= 1 assert "{{user_id}}" in client._current_instructions assert "user-123" not in client._current_instructions @@ -2647,7 +2730,12 @@ async def test_gt_stops_when_token_limit_exceeded_on_first_sample(self): assert len(results) == 1 async def test_gt_stops_mid_batch_when_limit_exceeded_on_second_sample(self): - """Token limit exceeded on the second of two samples stops after that sample.""" + """Token limit exceeded on the final sample of a batch — all passed — marks run successful. + + With 2 samples and the budget exhausted after sample 2 (the last one), and + both samples passing their judges, the run should be marked succeeded rather + than failed, because the token limit was hit on the final successful iteration. + """ agent_responses = [ OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), @@ -2660,13 +2748,14 @@ async def test_gt_stops_mid_batch_when_limit_exceeded_on_second_sample(self): opts = _make_gt_options( handle_agent_call=handle_agent_call, handle_judge_call=handle_judge_call, - token_limit=250, # 100 + 200 = 300 ≥ 250 → trip on second sample + token_limit=250, # 100 + 200 = 300 ≥ 250 → trip on second (final) sample max_attempts=5, ) results = await client.optimize_from_ground_truth_options("test-agent", opts) - assert client._last_run_succeeded is False + # All samples passed; token limit hit only after the final sample → success + assert client._last_run_succeeded is True assert handle_agent_call.call_count == 2 - # Both samples processed so far are in the results + # Both samples are in the results assert len(results) == 2 async def test_gt_on_failing_result_called_on_token_limit(self): @@ -2773,6 +2862,300 @@ async def test_total_token_usage_resets_between_runs(self): assert client._last_run_succeeded is True +# --------------------------------------------------------------------------- +# History truncation +# --------------------------------------------------------------------------- + + +class TestHistoryTruncation: + """Tests that _trim_history and the optimizer correctly cap _history growth.""" + + def test_trim_history_no_op_when_under_limit(self): + ctx = OptimizationContext( + scores={}, completion_response="r", current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=1, + ) + history = [ctx, ctx, ctx] + assert _trim_history(history, 5) is history + + def test_trim_history_keeps_most_recent_items(self): + """_trim_history is a pure tail slice — it keeps the most recent max_len items.""" + ctxs = [ + OptimizationContext( + scores={}, completion_response=str(i), current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=i, + ) + for i in range(10) + ] + result = _trim_history(ctxs, 4) + assert len(result) == 4 + # The four MOST RECENT items are retained; the oldest are discarded. + assert result[0] is ctxs[6] + assert result[-1] is ctxs[-1] + + def test_trim_history_max_len_1(self): + ctxs = [ + OptimizationContext( + scores={}, completion_response=str(i), current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=i, + ) + for i in range(5) + ] + result = _trim_history(ctxs, 1) + assert len(result) == 1 + assert result[0] is ctxs[-1] + + @pytest.mark.asyncio + async def test_gt_history_trimmed_to_n_after_each_attempt(self): + """After each GT attempt _history must not exceed n (sample count).""" + mock_ldai = _make_ldai_client() + client = _make_client(mock_ldai) + + # Three attempts, 2 samples each (n=2); judge fails on every attempt so + # we cycle through all max_attempts and history never exceeds 2 items. + opts = _make_gt_options( + handle_judge_call=AsyncMock(return_value=OptimizationResponse(output=JUDGE_FAIL_RESPONSE)), + max_attempts=3, + ) + + history_lengths: list[int] = [] + original_extend = list.extend + + def spy_extend(self_list, iterable): + original_extend(self_list, iterable) + + with patch.object(client, "_generate_new_variation", new_callable=AsyncMock): + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + # _history should be capped at n=2 (the number of samples) + assert len(client._history) <= 2 + + @pytest.mark.asyncio + async def test_standard_history_trimmed_to_max_standard_length(self): + """After each standard optimizer iteration _history must not exceed _MAX_STANDARD_HISTORY_LENGTH.""" + mock_ldai = _make_ldai_client() + client = _make_client(mock_ldai) + + # Enough failing attempts to push history beyond the cap. + attempts = _MAX_STANDARD_HISTORY_LENGTH + 3 + opts = _make_options( + handle_judge_call=AsyncMock(return_value=OptimizationResponse(output=JUDGE_FAIL_RESPONSE)), + max_attempts=attempts, + ) + + with patch.object(client, "_generate_new_variation", new_callable=AsyncMock): + await client.optimize_from_options("test-agent", opts) + + assert len(client._history) <= _MAX_STANDARD_HISTORY_LENGTH + + +# --------------------------------------------------------------------------- +# Accumulated token usage +# --------------------------------------------------------------------------- + + +class TestAccumulatedTokenUsage: + """Tests that total token usage is tracked across agent, judge, and variation calls.""" + + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + @pytest.mark.asyncio + async def test_variation_tokens_counted_in_total(self): + """Tokens consumed by variation generation are rolled into _total_token_usage.""" + agent_usage = TokenUsage(total=100, input=60, output=40) + variation_usage = TokenUsage(total=50, input=30, output=20) + + # Build a minimal GT run (no validation phase) so we can control exactly + # which handle_agent_call responses map to agent turns vs variation. + # GT flow for one failed attempt then success: + # Attempt 1: 2 samples → 2 agent calls → judge fails on sample 1 → all_passed=False + # Variation: 1 handle_agent_call with variation_usage + # Attempt 2: 2 samples → 2 agent calls → all pass → success + agent_responses = [ + OptimizationResponse(output="Answer 1.", usage=agent_usage), # attempt 1, sample 1 + OptimizationResponse(output="Answer 2.", usage=agent_usage), # attempt 1, sample 2 + OptimizationResponse(output=VARIATION_RESPONSE, usage=variation_usage), # variation + OptimizationResponse(output="Answer 1.", usage=agent_usage), # attempt 2, sample 1 + OptimizationResponse(output="Answer 2.", usage=agent_usage), # attempt 2, sample 2 + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + judge_responses = [ + OptimizationResponse(output=JUDGE_FAIL_RESPONSE), # attempt 1, sample 1 — fail + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 1, sample 2 — pass + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 2, sample 1 — pass + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 2, sample 2 — pass + ] + handle_judge_call = AsyncMock(side_effect=judge_responses) + + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + max_attempts=3, + ) + await client.optimize_from_ground_truth_options("test-agent", opts) + + # _total_token_usage must include tokens from all 4 agent calls + 1 variation call + expected_min = agent_usage.total * 4 + variation_usage.total + assert client._total_token_usage >= expected_min + + @pytest.mark.asyncio + async def test_accumulated_token_usage_stamped_on_context(self): + """accumulated_token_usage on the returned context equals _total_token_usage.""" + agent_usage = TokenUsage(total=200, input=120, output=80) + handle_agent_call = AsyncMock( + return_value=OptimizationResponse(output="The answer is 4.", usage=agent_usage) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + + client = _make_client(self.mock_ldai) + opts = _make_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + ) + results = await client.optimize_from_options("test-agent", opts) + + assert results is not None + last_ctx = results if isinstance(results, OptimizationContext) else None + # Check the internal total includes at least the agent tokens + assert client._total_token_usage >= agent_usage.total + + @pytest.mark.asyncio + async def test_gt_accumulated_token_usage_on_final_success(self): + """On a passing GT run the last result's accumulated_token_usage == _total_token_usage.""" + agent_usage = TokenUsage(total=100, input=60, output=40) + handle_agent_call = AsyncMock( + return_value=OptimizationResponse(output="correct", usage=agent_usage) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert isinstance(results, list) and len(results) > 0 + final_ctx = results[-1] + assert final_ctx.accumulated_token_usage is not None + assert final_ctx.accumulated_token_usage == client._total_token_usage + + +# --------------------------------------------------------------------------- +# GT token budget exhausted on final successful iteration +# --------------------------------------------------------------------------- + + +class TestGTTokenBudgetOnFinalIteration: + """Token budget exhausted on the very last GT sample should mark the run successful + when all samples passed, and failed when mid-batch or a sample failed.""" + + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + @pytest.mark.asyncio + async def test_token_limit_on_last_sample_all_passed_marks_success(self): + """Exhausting the budget on the final sample of a batch where all passed → success.""" + agent_responses = [ + OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), + OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, # 100+200=300 ≥ 250 → budget hit on final (2nd) sample + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is True + assert len(results) == 2 + + @pytest.mark.asyncio + async def test_token_limit_on_last_sample_one_failed_marks_failure(self): + """Budget hit on the final sample but a prior sample failed → failure.""" + agent_responses = [ + # Sample 1 fails judge + OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), + # Sample 2 passes judge but pushes over limit + OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + judge_responses = [ + OptimizationResponse(output=JUDGE_FAIL_RESPONSE), # sample 1 fails + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # sample 2 passes + ] + handle_judge_call = AsyncMock(side_effect=judge_responses) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is False + + @pytest.mark.asyncio + async def test_token_limit_mid_batch_marks_failure(self): + """Budget hit mid-batch (not on the final sample) → always failure.""" + handle_agent_call = AsyncMock( + return_value=OptimizationResponse( + output="Answer.", usage=TokenUsage(total=600, input=400, output=200) + ) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=500, # 600 > 500 → trip on first of two samples + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is False + assert len(results) == 1 + + @pytest.mark.asyncio + async def test_on_passing_result_called_on_budget_exhaustion_success(self): + """on_passing_result fires when token budget exhausted but all GT samples passed.""" + on_passing = MagicMock() + agent_responses = [ + OptimizationResponse(output="A1.", usage=TokenUsage(total=100, input=60, output=40)), + OptimizationResponse(output="A2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, + max_attempts=5, + on_passing_result=on_passing, + ) + await client.optimize_from_ground_truth_options("test-agent", opts) + + on_passing.assert_called_once() + + # --------------------------------------------------------------------------- # optimize_from_config # --------------------------------------------------------------------------- @@ -3479,43 +3862,43 @@ def test_returns_true_when_baseline_duration_is_none(self): assert self.client._evaluate_duration(self._ctx(5000, iteration=2)) is True def test_returns_true_when_candidate_duration_is_none(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(None, iteration=2)) is True def test_passes_when_candidate_is_more_than_20_percent_faster(self): # baseline=2000ms, threshold=1600ms, candidate=1500ms → 1500 < 1600 → pass - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1500, iteration=2)) is True def test_fails_when_candidate_is_exactly_at_threshold(self): # baseline=2000ms, threshold=1600ms, candidate=1600ms → not strictly less → fail - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1600, iteration=2)) is False def test_fails_when_improvement_is_less_than_20_percent(self): # baseline=2000ms, threshold=1600ms, candidate=1800ms → 1800 >= 1600 → fail - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1800, iteration=2)) is False def test_fails_when_candidate_matches_baseline(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(2000, iteration=2)) is False def test_fails_when_candidate_is_slower_than_baseline(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(2500, iteration=2)) is False - def test_uses_history_index_zero_as_baseline_not_last(self): - # history[0] is 2000ms (baseline), history[-1] is 500ms (fast, but not the baseline) - first = self._ctx(2000, iteration=1) - later = self._ctx(500, iteration=2) - self.client._history = [first, later] - # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses history[0], not history[-1]) + def test_uses_explicit_baseline_not_most_recent_history(self): + # _baseline_duration_ms is 2000ms even though the most recent history item has 500ms. + # The explicit baseline is what must be used, not any history item. + self.client._baseline_duration_ms = 2000.0 + self.client._history = [self._ctx(2000, iteration=1), self._ctx(500, iteration=2)] + # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses explicit baseline, not history[-1]) assert self.client._evaluate_duration(self._ctx(1500, iteration=3)) is True def test_pass_boundary_just_below_threshold(self): # baseline=1000ms, threshold=800ms, candidate=799ms → pass - self.client._history = [self._ctx(1000, iteration=1)] + self.client._baseline_duration_ms = 1000.0 assert self.client._evaluate_duration(self._ctx(799, iteration=2)) is True @@ -3626,8 +4009,8 @@ async def test_no_duration_gate_when_acceptance_criteria_has_no_latency_keywords with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - # Manually seed history so _evaluate_duration would fire if incorrectly triggered - client._history = [self._ctx_with(duration_ms=2000, iteration=0)] + # Manually seed baseline so _evaluate_duration would fire if incorrectly triggered + client._baseline_duration_ms = 2000.0 result = await client.optimize_from_options("test-agent", opts) assert result is not None @@ -4656,3 +5039,882 @@ def test_mixed_judges_feedback_reflects_correct_pass_fail(self): # Both should be PASSED — relevance high enough, toxicity low enough assert result.count("PASSED") == 2 assert "FAILED" not in result + + +# --------------------------------------------------------------------------- +# estimate_cost helper +# --------------------------------------------------------------------------- + + +class TestEstimateCost: + def _usage(self, total=100, inp=60, out=40) -> TokenUsage: + return TokenUsage(total=total, input=inp, output=out) + + def test_returns_none_when_usage_is_none(self): + assert estimate_cost(None, {"costPerInputToken": 0.001}) is None + + def test_uses_pricing_when_available(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001 + 40 * 0.002) + + def test_uses_only_input_price_when_output_absent(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerInputToken": 0.001} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001) + + def test_uses_only_output_price_when_input_absent(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerOutputToken": 0.002} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(40 * 0.002) + + def test_returns_none_when_no_pricing_in_config(self): + usage = self._usage(total=100) + assert estimate_cost(usage, {}) is None + + def test_returns_none_when_model_config_none(self): + usage = self._usage(total=250) + assert estimate_cost(usage, None) is None + + def test_ignores_cached_input_token_price(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = { + "costPerInputToken": 0.001, + "costPerOutputToken": 0.002, + "costPerCachedInputToken": 0.0005, + } + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001 + 40 * 0.002) + + def test_zero_usage_with_pricing_returns_zero(self): + usage = TokenUsage(total=0, input=0, output=0) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(0.0) + + def test_returns_none_when_both_token_counts_are_none(self): + # Pricing exists but both input and output are None — no token counts to + # compute from, so we must return None rather than 0.0 to avoid + # cost-gate treating unknown cost as zero cost. + usage = TokenUsage(total=None, input=None, output=None) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) is None + + def test_returns_partial_cost_when_only_input_count_is_none(self): + # Only output count available — should still compute a partial cost. + usage = TokenUsage(total=40, input=None, output=40) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(40 * 0.002) + + def test_returns_partial_cost_when_only_output_count_is_none(self): + # Only input count available — should still compute a partial cost. + usage = TokenUsage(total=60, input=60, output=None) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(60 * 0.001) + + +# --------------------------------------------------------------------------- +# _acceptance_criteria_implies_cost_optimization +# --------------------------------------------------------------------------- + + +class TestAcceptanceCriteriaImpliesCostOptimization: + def _judge(self, statement: str) -> Dict[str, OptimizationJudge]: + return {"j": OptimizationJudge(threshold=0.9, acceptance_statement=statement)} + + def test_returns_false_when_judges_none(self): + assert _acceptance_criteria_implies_cost_optimization(None) is False + + def test_returns_false_when_no_acceptance_statements(self): + judges = {"j": OptimizationJudge(threshold=0.9, judge_key="some-judge")} + assert _acceptance_criteria_implies_cost_optimization(judges) is False + + def test_detects_cheap(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Keep it cheap.")) + + def test_detects_cost(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Reduce overall cost.")) + + def test_detects_costs_plural(self): + assert _acceptance_criteria_implies_cost_optimization( + self._judge("Keep the costs stable or lower them.") + ) + + def test_detects_budget(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Stay within budget.")) + + def test_does_not_detect_token_to_avoid_false_positives(self): + assert not _acceptance_criteria_implies_cost_optimization(self._judge("Generate a valid authentication token.")) + + def test_detects_billing(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Minimize billing.")) + + def test_detects_spend(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Reduce spend on API calls.")) + + def test_case_insensitive(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("BUDGET FRIENDLY response")) + + def test_no_match_on_unrelated_statement(self): + assert not _acceptance_criteria_implies_cost_optimization( + self._judge("Respond accurately and concisely.") + ) + + def test_multiple_judges_one_matches(self): + judges = { + "j1": OptimizationJudge(threshold=0.9, acceptance_statement="Be accurate."), + "j2": OptimizationJudge(threshold=0.9, acceptance_statement="Keep costs low."), + } + assert _acceptance_criteria_implies_cost_optimization(judges) + + +# --------------------------------------------------------------------------- +# _evaluate_cost +# --------------------------------------------------------------------------- + + +class TestEvaluateCost: + def setup_method(self): + self.client = _make_client() + self.client._agent_key = "test-agent" + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._options = _make_options() + + def _ctx(self, cost: float, iteration: int = 2) -> OptimizationContext: + return OptimizationContext( + scores={}, + completion_response="ok", + current_instructions="inst", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def _seed_history(self, baseline_cost: float): + self.client._history = [self._ctx(baseline_cost, iteration=1)] + self.client._baseline_cost_usd = baseline_cost + + def test_passes_when_cost_improved_beyond_tolerance(self): + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(0.007)) is True + + def test_fails_when_cost_not_improved_enough(self): + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(0.0095)) is False + + def test_passes_at_exact_tolerance_boundary(self): + self._seed_history(0.010) + # 0.010 * 0.90 ≈ 0.009; must be strictly less than the threshold + assert self.client._evaluate_cost(self._ctx(0.0089)) is True + # 0.0091 is above the threshold → fail + assert self.client._evaluate_cost(self._ctx(0.0091)) is False + + def test_skips_gracefully_when_history_empty(self): + self.client._history = [] + assert self.client._evaluate_cost(self._ctx(0.005)) is True + + def test_skips_gracefully_when_baseline_cost_none(self): + self.client._history = [self._ctx(None)] # type: ignore[arg-type] + assert self.client._evaluate_cost(self._ctx(0.005)) is True + + def test_skips_gracefully_when_candidate_cost_none(self): + self._seed_history(0.010) + ctx = self._ctx(None) # type: ignore[arg-type] + assert self.client._evaluate_cost(ctx) is True + + def test_skips_gracefully_when_units_differ_across_model_switch(self): + # If baseline was captured with pricing (USD) but candidate has no pricing, + # candidate cost is None and the gate skips rather than comparing incompatible units. + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(None)) is True + + +# --------------------------------------------------------------------------- +# _record_baseline_from_batch +# --------------------------------------------------------------------------- + + +class TestRecordBaselineFromBatch: + def setup_method(self): + self.client = _make_client() + self.client._initialize_class_members_from_config(_make_agent_config()) + + def _ctx(self, duration_ms=None, cost=None): + ctx = self.client._create_optimization_context(iteration=1, variables={}) + return dataclasses.replace(ctx, duration_ms=duration_ms, estimated_cost_usd=cost) + + def test_averages_duration_across_batch(self): + results = [self._ctx(duration_ms=1000), self._ctx(duration_ms=2000), self._ctx(duration_ms=3000)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms == 2000.0 + + def test_averages_cost_across_batch(self): + results = [self._ctx(cost=0.01), self._ctx(cost=0.02), self._ctx(cost=0.03)] + self.client._record_baseline_from_batch(results) + assert abs(self.client._baseline_cost_usd - 0.02) < 1e-9 + + def test_skips_none_values_in_average(self): + results = [self._ctx(duration_ms=1000), self._ctx(duration_ms=None), self._ctx(duration_ms=3000)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms == 2000.0 + + def test_noop_when_already_set(self): + self.client._baseline_duration_ms = 999.0 + results = [self._ctx(duration_ms=1000), self._ctx(duration_ms=2000)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms == 999.0 + + def test_noop_on_empty_list(self): + self.client._record_baseline_from_batch([]) + assert self.client._baseline_duration_ms is None + assert self.client._baseline_cost_usd is None + + def test_noop_when_all_values_none(self): + results = [self._ctx(duration_ms=None), self._ctx(duration_ms=None)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms is None + + +# --------------------------------------------------------------------------- +# _apply_duration_gate +# --------------------------------------------------------------------------- + + +class TestApplyDurationGate: + """Unit tests for the _apply_duration_gate wrapper method.""" + + def _make_judges_with_latency(self): + return { + "latency": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be faster and reduce latency.", + ) + } + + def _make_judges_no_latency(self): + return { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + + def _ctx(self, duration_ms=None, iteration=2): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + duration_ms=duration_ms, + ) + + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options(judges=self._make_judges_with_latency()) + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._baseline_duration_ms = 2000.0 + + def test_no_entry_added_when_gate_not_active(self): + self.client._options = _make_options(judges=self._make_judges_no_latency()) + ctx = self._ctx(1000) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is True + assert "_latency_gate" not in updated.scores + + def test_gate_recorded_even_when_already_failed(self): + # Gate score is always written for telemetry; it cannot block an + # iteration that was already failing (passed_so_far=False). + ctx = self._ctx(1000) + passed, updated = self.client._apply_duration_gate(False, ctx) + assert passed is False + assert "_latency_gate" in updated.scores + + def test_gate_pass_adds_score_1(self): + # baseline=2000ms, threshold=1600ms, candidate=1500ms → pass + ctx = self._ctx(1500) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is True + assert "_latency_gate" in updated.scores + assert updated.scores["_latency_gate"].score == 1.0 + assert "passed" in updated.scores["_latency_gate"].rationale.lower() + assert "1500" in updated.scores["_latency_gate"].rationale + assert "2000" in updated.scores["_latency_gate"].rationale + + def test_gate_fail_adds_score_0(self): + # baseline=2000ms, threshold=1600ms, candidate=1800ms → fail + ctx = self._ctx(1800) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is False + assert "_latency_gate" in updated.scores + assert updated.scores["_latency_gate"].score == 0.0 + assert "failed" in updated.scores["_latency_gate"].rationale.lower() + assert "1800" in updated.scores["_latency_gate"].rationale + + def test_gate_pass_no_baseline_gives_fallback_rationale(self): + self.client._baseline_duration_ms = None + ctx = self._ctx(None) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert "_latency_gate" in updated.scores + assert "no baseline" in updated.scores["_latency_gate"].rationale.lower() + + def test_existing_scores_are_preserved(self): + ctx = OptimizationContext( + scores={"accuracy": JudgeResult(score=1.0, rationale="ok")}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + duration_ms=1500, + ) + _, updated = self.client._apply_duration_gate(True, ctx) + assert "accuracy" in updated.scores + assert "_latency_gate" in updated.scores + + def test_no_threshold_field_on_judge_result(self): + ctx = self._ctx(1500) + _, updated = self.client._apply_duration_gate(True, ctx) + gate_result = updated.scores["_latency_gate"] + assert not hasattr(gate_result, "threshold") or gate_result.threshold is None # type: ignore[union-attr] + + +# --------------------------------------------------------------------------- +# _apply_cost_gate +# --------------------------------------------------------------------------- + + +class TestApplyCostGate: + """Unit tests for the _apply_cost_gate wrapper method.""" + + def _make_judges_with_cost(self): + return { + "cost": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be cheaper and reduce cost.", + ) + } + + def _make_judges_no_cost(self): + return { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + + def _ctx(self, cost=None, iteration=2): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options(judges=self._make_judges_with_cost()) + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._baseline_cost_usd = 0.010 + + def test_no_entry_added_when_gate_not_active(self): + self.client._options = _make_options(judges=self._make_judges_no_cost()) + ctx = self._ctx(0.005) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is True + assert "_cost_gate" not in updated.scores + + def test_gate_recorded_even_when_already_failed(self): + # Gate score is always written for telemetry; it cannot block an + # iteration that was already failing (passed_so_far=False). + ctx = self._ctx(0.005) + passed, updated = self.client._apply_cost_gate(False, ctx) + assert passed is False + assert "_cost_gate" in updated.scores + + def test_gate_pass_adds_score_1(self): + # baseline=0.010, threshold=0.009, candidate=0.007 → pass + ctx = self._ctx(0.007) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is True + assert "_cost_gate" in updated.scores + assert updated.scores["_cost_gate"].score == 1.0 + assert "passed" in updated.scores["_cost_gate"].rationale.lower() + + def test_gate_fail_adds_score_0(self): + # baseline=0.010, threshold=0.009, candidate=0.0095 → fail + ctx = self._ctx(0.0095) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is False + assert "_cost_gate" in updated.scores + assert updated.scores["_cost_gate"].score == 0.0 + assert "failed" in updated.scores["_cost_gate"].rationale.lower() + + def test_gate_pass_no_baseline_gives_fallback_rationale(self): + self.client._baseline_cost_usd = None + ctx = self._ctx(None) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert "_cost_gate" in updated.scores + assert "no baseline" in updated.scores["_cost_gate"].rationale.lower() + + def test_existing_scores_are_preserved(self): + ctx = OptimizationContext( + scores={"accuracy": JudgeResult(score=1.0, rationale="ok")}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + estimated_cost_usd=0.007, + ) + _, updated = self.client._apply_cost_gate(True, ctx) + assert "accuracy" in updated.scores + assert "_cost_gate" in updated.scores + + def test_both_gates_active_compose_cleanly(self): + """Duration + cost gate can both fire on the same context.""" + self.client._options = _make_options( + judges={ + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be faster, reduce latency, and cheaper cost.", + ) + } + ) + self.client._baseline_duration_ms = 2000.0 + self.client._baseline_cost_usd = 0.010 + ctx = OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + duration_ms=1500, + estimated_cost_usd=0.007, + ) + passed, ctx = self.client._apply_duration_gate(True, ctx) + passed, ctx = self.client._apply_cost_gate(passed, ctx) + assert passed is True + assert "_latency_gate" in ctx.scores + assert "_cost_gate" in ctx.scores + assert ctx.scores["_latency_gate"].score == 1.0 + assert ctx.scores["_cost_gate"].score == 1.0 + + +# --------------------------------------------------------------------------- +# variation_prompt_cost_optimization +# --------------------------------------------------------------------------- + + +class TestVariationPromptCostOptimization: + def test_section_header_present(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"]) + assert "## Cost Optimization:" in result + + def test_mentions_available_models(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"]) + assert "gpt-4o" in result + + def test_mentions_quality_primary(self): + result = variation_prompt_cost_optimization(["gpt-4o"]) + assert "primary objective" in result.lower() + + def test_mentions_token_reduction(self): + result = variation_prompt_cost_optimization(["gpt-4o"]) + assert "token" in result.lower() + + # quality_already_passing=False (default) — standard framing + def test_default_framing_says_improve_quality_too(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=False) + assert "In addition to improving quality" in result + + def test_default_framing_does_not_mention_quality_passing(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=False) + assert "currently passing" not in result + + # quality_already_passing=True — preserve-behavior framing + def test_passing_framing_says_criteria_already_passing(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "currently passing" in result + + def test_passing_framing_says_do_not_change_behavior(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "preserve" in result.lower() or "do not" in result.lower() or "NOT" in result + + def test_passing_framing_still_includes_token_guidance(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "token" in result.lower() + + def test_passing_framing_still_includes_model_choices(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"], quality_already_passing=True) + assert "gpt-4o" in result + + def test_passing_framing_still_warns_quality_is_primary(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "primary objective" in result.lower() or "do not sacrifice" in result.lower() + + +# --------------------------------------------------------------------------- +# _all_judges_passing +# --------------------------------------------------------------------------- + + +class TestAllJudgesPassing: + def _ctx_with_scores(self, scores, iteration=1): + return OptimizationContext( + scores=scores, + completion_response="ok", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + ) + + def setup_method(self): + self.client = _make_client() + self.client._initialize_class_members_from_config(_make_agent_config()) + + def test_returns_false_when_history_empty(self): + self.client._history = [] + self.client._options = _make_options() + assert self.client._all_judges_passing() is False + + def test_returns_false_when_no_judges(self): + self.client._options = _make_options(judges={}) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="ok")})] + assert self.client._all_judges_passing() is False + + def test_returns_false_when_judge_score_missing(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({})] + assert self.client._all_judges_passing() is False + + def test_returns_true_when_all_judges_pass(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")})] + assert self.client._all_judges_passing() is True + + def test_returns_false_when_one_judge_below_threshold(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=0.7, rationale="bad")})] + assert self.client._all_judges_passing() is False + + def test_returns_false_when_one_of_multiple_judges_fails(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + "tone": OptimizationJudge(threshold=0.9, acceptance_statement="friendly tone"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "tone": JudgeResult(score=0.5, rationale="failed"), + })] + assert self.client._all_judges_passing() is False + + def test_returns_true_when_all_of_multiple_judges_pass(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + "tone": OptimizationJudge(threshold=0.9, acceptance_statement="friendly tone"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "tone": JudgeResult(score=0.95, rationale="ok"), + })] + assert self.client._all_judges_passing() is True + + def test_gate_scores_do_not_affect_result(self): + """Synthetic _latency_gate / _cost_gate keys must not count as judge failures.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "_latency_gate": JudgeResult(score=0.0, rationale="gate failed"), + "_cost_gate": JudgeResult(score=0.0, rationale="gate failed"), + })] + # Gate failures should not prevent _all_judges_passing from returning True + assert self.client._all_judges_passing() is True + + def test_uses_most_recent_history_entry(self): + """In non-GT mode (_last_batch_size=1) only the last history entry is inspected.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 1 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.5, rationale="early fail")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="later pass")}, iteration=2), + ] + assert self.client._all_judges_passing() is True + + def test_inverted_judge_passes_when_score_below_threshold(self): + self.client._options = _make_options(judges={ + "toxicity": OptimizationJudge(threshold=0.2, acceptance_statement="low toxicity", is_inverted=True), + }) + self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.1, rationale="clean")})] + assert self.client._all_judges_passing() is True + + def test_inverted_judge_fails_when_score_above_threshold(self): + self.client._options = _make_options(judges={ + "toxicity": OptimizationJudge(threshold=0.2, acceptance_statement="low toxicity", is_inverted=True), + }) + self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.5, rationale="toxic")})] + assert self.client._all_judges_passing() is False + + # --- GT batch tests --- + + def test_gt_batch_last_sample_passes_but_earlier_fails_returns_false(self): + """Core GT bug: if any sample in the batch failed, must return False even if the last passed.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 3 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.3, rationale="fail")}, iteration=1), # FAILS + self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")}, iteration=2), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=3), + ] + assert self.client._all_judges_passing() is False + + def test_gt_batch_all_samples_pass_returns_true(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 3 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.85, rationale="ok")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.90, rationale="ok")}, iteration=2), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=3), + ] + assert self.client._all_judges_passing() is True + + def test_gt_batch_middle_sample_fails_returns_false(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 3 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.20, rationale="fail")}, iteration=2), # FAILS + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=3), + ] + assert self.client._all_judges_passing() is False + + def test_gt_batch_size_respected_ignores_older_batches(self): + """Entries outside the current batch window should not influence the result.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 2 + # 4 entries; batch covers last 2; first 2 are stale (from a previous attempt) + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.1, rationale="old fail")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.1, rationale="old fail")}, iteration=2), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")}, iteration=3), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")}, iteration=4), + ] + assert self.client._all_judges_passing() is True + + +class TestBuildNewVariationPromptCost: + def _make_history(self) -> list: + return [ + OptimizationContext( + scores={}, + completion_response="response", + current_instructions="instructions", + current_parameters={}, + current_variables={}, + iteration=1, + ) + ] + + def test_cost_section_absent_by_default(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst" + ) + assert "Cost Optimization" not in result + + def test_cost_section_included_when_flag_set(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + optimize_for_cost=True, + ) + assert "Cost Optimization" in result + + def test_duration_and_cost_sections_both_present(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + optimize_for_duration=True, + optimize_for_cost=True, + ) + assert "Duration Optimization" in result + assert "Cost Optimization" in result + + +# --------------------------------------------------------------------------- +# variation_prompt_feedback shows estimated_cost_usd +# --------------------------------------------------------------------------- + + +class TestVariationPromptFeedbackCost: + def _make_ctx(self, cost: float | None, iteration: int = 1) -> OptimizationContext: + return OptimizationContext( + scores={"judge": JudgeResult(score=0.9)}, + completion_response="ok", + current_instructions="inst", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def test_cost_shown_when_present(self): + ctx = self._make_ctx(0.001234) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx], judges) + assert "Estimated agent cost: $0.001234" in result + + def test_cost_omitted_when_none(self): + ctx = self._make_ctx(None) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx], judges) + assert "Estimated agent cost" not in result + + def test_cost_shown_per_iteration(self): + ctx1 = self._make_ctx(0.001, iteration=1) + ctx2 = self._make_ctx(0.0007, iteration=2) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx1, ctx2], judges) + assert "$0.001000" in result + assert "$0.000700" in result + + +# --------------------------------------------------------------------------- +# _evaluate_acceptance_judge cost augmentation +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestEvaluateAcceptanceJudgeCostAugmentation: + def setup_method(self): + self.mock_ldai = _make_ldai_client() + self.client = _make_client(self.mock_ldai) + agent_config = _make_agent_config() + self.client._agent_key = "test-agent" + self.client._agent_config = agent_config + self.client._initialize_class_members_from_config(agent_config) + handle_judge_call = AsyncMock(return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE)) + self.client._options = _make_options(handle_judge_call=handle_judge_call) + self.client._model_configs = [] + + def _cost_judge(self) -> OptimizationJudge: + return OptimizationJudge( + threshold=0.9, + acceptance_statement="Keep costs low and stay within budget.", + ) + + def _set_pricing(self): + """Give the client a model config with pricing so estimate_cost returns USD.""" + self.client._current_model = "gpt-4o" + self.client._model_configs = [ + {"id": "gpt-4o", "costPerInputToken": 0.000005, "costPerOutputToken": 0.000015} + ] + + async def test_cost_context_injected_into_instructions(self): + self._set_pricing() + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + await self.client._evaluate_acceptance_judge( + judge_key="cost-judge", + optimization_judge=self._cost_judge(), + completion_response="response", + iteration=1, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured, "handle_judge_call was not called" + instructions = captured[0] + assert "60 input tokens" in instructions + assert "40 output tokens" in instructions + + async def test_cost_context_not_injected_for_non_cost_judge(self): + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + non_cost_judge = OptimizationJudge( + threshold=0.9, + acceptance_statement="Be accurate and concise.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="quality-judge", + optimization_judge=non_cost_judge, + completion_response="response", + iteration=1, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured + instructions = captured[0] + # The cost-specific augmentation phrase should not appear + assert "cost/token-usage goal" not in instructions + + async def test_baseline_cost_shown_when_history_present(self): + self._set_pricing() + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + baseline_ctx = OptimizationContext( + scores={}, + completion_response="", + current_instructions="", + current_parameters={}, + current_variables={}, + iteration=1, + estimated_cost_usd=500.0, + ) + self.client._history = [baseline_ctx] + self.client._baseline_cost_usd = 500.0 + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + await self.client._evaluate_acceptance_judge( + judge_key="cost-judge", + optimization_judge=self._cost_judge(), + completion_response="response", + iteration=2, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured + instructions = captured[0] + assert "baseline" in instructions.lower()