From 94de596d4391d6ba9f5dd36a87300b0eaafa5e4f Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 6 May 2026 14:17:07 -0800 Subject: [PATCH 1/7] feat: adds ability to optimize for cost --- .../optimization/src/ldai_optimizer/client.py | 153 ++++++- .../src/ldai_optimizer/dataclasses.py | 3 + .../src/ldai_optimizer/prompts.py | 83 ++++ .../optimization/src/ldai_optimizer/util.py | 45 +- packages/optimization/tests/test_client.py | 416 +++++++++++++++++- 5 files changed, 691 insertions(+), 9 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index bccd32d4..bdd00611 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -49,6 +49,7 @@ LDApiClient, ) from ldai_optimizer.prompts import ( + _acceptance_criteria_implies_cost_optimization, _acceptance_criteria_implies_duration_optimization, build_message_history_text, build_new_variation_prompt, @@ -57,6 +58,7 @@ from ldai_optimizer.util import ( RedactionFilter, await_if_needed, + estimate_cost, extract_json_from_response, generate_slug, interpolate_variables, @@ -128,6 +130,11 @@ def _compute_validation_count(pool_size: int) -> int: # under 80% of the baseline — i.e. at least 20% improvement. _DURATION_TOLERANCE = 0.80 +# Cost gate: a candidate must cost at most this fraction of the baseline +# (history[0].estimated_cost_usd) to pass when acceptance criteria imply a +# cost reduction goal. 0.80 means at least 20% cheaper than the baseline. +_COST_TOLERANCE = 0.80 + # Maps SDK status strings to the API status/activity values expected by # agent_optimization_result records. Defined at module level to avoid # allocating the dict on every on_status_update invocation. @@ -160,6 +167,7 @@ def __init__(self, ldClient: LDAIClient) -> None: self._last_optimization_result_id: Optional[str] = None self._initial_tool_keys: List[str] = [] self._total_token_usage: int = 0 + self._model_configs: List[Dict[str, Any]] = [] if os.environ.get("LAUNCHDARKLY_API_KEY"): self._has_api_key = True @@ -392,6 +400,7 @@ async def _call_judges( agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, agent_duration_ms: Optional[float] = None, + agent_usage: Optional[Any] = None, ) -> Dict[str, JudgeResult]: """ Call all judges in parallel (auto-path). @@ -411,6 +420,8 @@ async def _call_judges( :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. Forwarded to acceptance judges whose statement implies a latency goal so they can mention the duration change in their rationale. + :param agent_usage: Token usage from the agent call. Forwarded to acceptance judges + whose statement implies a cost goal so they can mention token usage in their rationale. :return: Dictionary of judge results (score and rationale) """ if not self._options.judges: @@ -464,6 +475,7 @@ async def _call_judges( agent_tools=resolved_agent_tools, expected_response=expected_response, agent_duration_ms=agent_duration_ms, + agent_usage=agent_usage, ) judge_results[judge_key] = result @@ -682,6 +694,7 @@ async def _evaluate_acceptance_judge( agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, agent_duration_ms: Optional[float] = None, + agent_usage: Optional[Any] = None, ) -> JudgeResult: """ Evaluate using an acceptance statement judge. @@ -699,6 +712,8 @@ async def _evaluate_acceptance_judge( :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. When the acceptance statement implies a latency goal, the judge is instructed to mention the duration change in its rationale. + :param agent_usage: Token usage from the agent call. When the acceptance statement + implies a cost goal, the judge is instructed to mention token usage and cost. :return: The judge result with score and rationale """ if not optimization_judge.acceptance_statement: @@ -757,9 +772,64 @@ async def _evaluate_acceptance_judge( f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. " ) instructions += ( - "Please mention the duration and any change from baseline in your rationale." + "In your rationale, state the duration and any change from baseline. " + "If the latency goal is not yet met, include specific, actionable suggestions " + "for how the agent's instructions or model choice could be changed to reduce " + "response time — for example: switching to a faster model, shortening the " + "system prompt, or removing instructions that cause multi-step reasoning. " + "These suggestions will be used directly to generate the next variation." ) + if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}): + current_cost = estimate_cost( + agent_usage, + _find_model_config(self._current_model or "", self._model_configs), + ) + baseline_cost = ( + self._history[0].estimated_cost_usd + if self._history and self._history[0].estimated_cost_usd is not None + else None + ) + if current_cost is not None: + has_pricing = ( + _find_model_config(self._current_model or "", self._model_configs) or {} + ).get("costPerInputToken") is not None + if has_pricing: + cost_str = f"${current_cost:.6f}" + else: + cost_str = f"{int(current_cost)} tokens" + instructions += ( + f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. " + ) + if agent_usage is not None: + instructions += ( + f"The agent's response used {agent_usage.input} input tokens " + f"and {agent_usage.output} output tokens " + f"(estimated cost: {cost_str}). " + ) + if baseline_cost is not None: + delta = current_cost - baseline_cost + direction = "less" if delta < 0 else "more" + if has_pricing: + baseline_str = f"${baseline_cost:.6f}" + delta_str = f"${abs(delta):.6f}" + else: + baseline_str = f"{int(baseline_cost)} tokens" + delta_str = f"{int(abs(delta))} tokens" + instructions += ( + f"The baseline cost (first iteration) was {baseline_str}. " + f"This response cost {delta_str} {direction} than the baseline. " + ) + instructions += ( + "In your rationale, state the token usage and cost, and any change from baseline. " + "If the cost goal is not yet met, include specific, actionable suggestions " + "for how the agent's instructions or model choice could be changed to reduce " + "cost — for example: switching to a cheaper model, shortening the system prompt " + "to reduce input tokens, removing unnecessary output instructions, or tightening " + "response length constraints. " + "These suggestions will be used directly to generate the next variation." + ) + if resolved_variables: instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}" @@ -1082,6 +1152,11 @@ async def _run_ground_truth_optimization( ): sample_passed = self._evaluate_duration(optimize_context) + if sample_passed and _acceptance_criteria_implies_cost_optimization( + self._options.judges + ): + sample_passed = self._evaluate_cost(optimize_context) + if not sample_passed: logger.info( "[GT Attempt %d] -> Sample %d/%d FAILED", @@ -1227,12 +1302,19 @@ def _apply_new_variation_response( # This is a deterministic safety net for when the LLM ignores the prompt # instructions and hardcodes a concrete value (e.g. "user-123") instead # of the placeholder ("{{user_id}}"). + # Only check the variables that were actually used for this invocation so + # we don't spuriously replace values that happen to appear in other choices. + active_variables = ( + [variation_ctx.current_variables] + if variation_ctx.current_variables + else self._options.variable_choices + ) self._current_instructions, placeholder_warnings = restore_variable_placeholders( self._current_instructions, - self._options.variable_choices, + active_variables, ) for msg in placeholder_warnings: - logger.warning("[Iteration %d] -> %s", iteration, msg) + logger.debug("[Iteration %d] -> %s", iteration, msg) self._current_parameters = response_data["current_parameters"] @@ -1321,6 +1403,9 @@ async def _generate_new_variation( optimize_for_duration = _acceptance_criteria_implies_duration_optimization( self._options.judges ) + optimize_for_cost = _acceptance_criteria_implies_cost_optimization( + self._options.judges + ) instructions = build_new_variation_prompt( self._history, self._options.judges, @@ -1331,6 +1416,7 @@ async def _generate_new_variation( self._options.variable_choices, self._initial_instructions, optimize_for_duration=optimize_for_duration, + optimize_for_cost=optimize_for_cost, ) # Create a flat history list (without nested history) to avoid exponential growth @@ -1424,6 +1510,7 @@ async def optimize_from_config( model_configs = api_client.get_model_configs(options.project_key) except Exception as exc: logger.debug("Could not pre-fetch model configs: %s", exc) + self._model_configs = model_configs context = random.choice(options.context_choices) # _get_agent_config calls _initialize_class_members_from_config internally; @@ -1793,18 +1880,24 @@ async def _execute_agent_turn( agent_tools=agent_tools, expected_response=expected_response, agent_duration_ms=agent_duration_ms, + agent_usage=agent_response.usage, ) # Build the fully-populated result context before firing the evaluating event so # the PATCH includes scores, generationLatency, and completionResponse. This is # particularly important for non-final GT samples which receive no further status # events — without this, those fields would never be written to their API records. + agent_cost = estimate_cost( + agent_response.usage, + _find_model_config(self._current_model or "", self._model_configs), + ) result_ctx = dataclasses.replace( optimize_context, completion_response=completion_response, scores=scores, duration_ms=agent_duration_ms, usage=agent_response.usage, + estimated_cost_usd=agent_cost, ) if self._options.judges: @@ -1829,13 +1922,13 @@ def _accumulate_tokens(self, optimize_context: OptimizationContext) -> None: def _is_token_limit_exceeded(self) -> bool: """Return True if the accumulated token usage has met or exceeded the configured limit. - Returns False when no token limit is set so callers can use this as a - simple guard without needing to check for ``None`` themselves. + Returns False when no token limit is set, or when the limit is 0 (which is + treated as "no limit" — a sentinel value meaning the field was left unset). - :return: True if token limit is set and ``_total_token_usage >= token_limit``. + :return: True if a positive token limit is set and ``_total_token_usage >= token_limit``. """ limit: Optional[int] = getattr(self._options, "token_limit", None) - return limit is not None and self._total_token_usage >= limit + return bool(limit) and self._total_token_usage >= limit def _evaluate_response(self, optimize_context: OptimizationContext) -> bool: """ @@ -1896,6 +1989,42 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: ) return passed + def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: + """ + Check whether the candidate's estimated cost meets the improvement target vs. the baseline. + + The baseline is history[0].estimated_cost_usd — the very first completed iteration, + representing the original unoptimized configuration's cost. The candidate must be + at least _COST_TOLERANCE cheaper (default: 20% improvement). + + The cost value is in USD when model pricing data is available, or raw total token + count as a proxy when pricing is absent. Both are comparable relative to their + own baselines. + + Returns True without blocking when no baseline is available (empty history or + history[0].estimated_cost_usd is None), or when the candidate's cost was not + captured. This avoids penalising configurations when cost data is missing. + + :param optimize_context: The completed turn context containing estimated_cost_usd + :return: True if the cost requirement is met or cannot be checked + """ + if not self._history or self._history[0].estimated_cost_usd is None: + return True + if optimize_context.estimated_cost_usd is None: + return True + baseline = self._history[0].estimated_cost_usd + passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE + if not passed: + logger.warning( + "[Iteration %d] -> Cost check failed: %.6f >= baseline %.6f * %.0f%% (%.6f)", + optimize_context.iteration, + optimize_context.estimated_cost_usd, + baseline, + _COST_TOLERANCE * 100, + baseline * _COST_TOLERANCE, + ) + return passed + def _handle_success( self, optimize_context: OptimizationContext, iteration: int ) -> Any: @@ -2174,6 +2303,11 @@ async def _run_validation_phase( ): sample_passed = self._evaluate_duration(val_ctx) + if sample_passed and _acceptance_criteria_implies_cost_optimization( + self._options.judges + ): + sample_passed = self._evaluate_cost(val_ctx) + last_ctx = val_ctx if not sample_passed: @@ -2298,6 +2432,11 @@ async def _run_optimization( ): initial_passed = self._evaluate_duration(optimize_context) + if initial_passed and _acceptance_criteria_implies_cost_optimization( + self._options.judges + ): + initial_passed = self._evaluate_cost(optimize_context) + if initial_passed: all_valid, last_ctx = await self._run_validation_phase( optimize_context, iteration diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index fab3ed72..1f5e28c2 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -217,6 +217,7 @@ class OptimizationContext: iteration: int = 0 # current iteration number duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration + estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens def copy_without_history(self) -> OptimizationContext: """ @@ -236,6 +237,7 @@ def copy_without_history(self) -> OptimizationContext: iteration=self.iteration, duration_ms=self.duration_ms, usage=self.usage, + estimated_cost_usd=self.estimated_cost_usd, ) def to_json(self) -> Dict[str, Any]: @@ -261,6 +263,7 @@ def to_json(self) -> Dict[str, Any]: "history": history_list, "iteration": self.iteration, "duration_ms": self.duration_ms, + "estimated_cost_usd": self.estimated_cost_usd, } if self.usage is not None: result["usage"] = { diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index 4aae5fee..39909f93 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -16,6 +16,13 @@ re.IGNORECASE, ) +_COST_KEYWORDS = re.compile( + r"\b(cheap|cheaper|cheapest|costs?|costly|expensive|budget|affordable|" + r"tokens?|spend|spending|economical|cost-effective|frugal|" + r"price|pricing|bill|billing)\b", + re.IGNORECASE, +) + def _acceptance_criteria_implies_duration_optimization( judges: Optional[Dict[str, OptimizationJudge]], @@ -39,6 +46,28 @@ def _acceptance_criteria_implies_duration_optimization( return False +def _acceptance_criteria_implies_cost_optimization( + judges: Optional[Dict[str, OptimizationJudge]], +) -> bool: + """Return True if any judge acceptance statement implies a cost reduction goal. + + Scans each judge's acceptance_statement for cost-related keywords. The + check is case-insensitive. Returns False when judges is None or no judge + carries an acceptance statement. + + :param judges: Judge configuration dict from OptimizationOptions, or None. + :return: True if cost optimization should be applied. + """ + if not judges: + return False + for judge in judges.values(): + if judge.acceptance_statement and _COST_KEYWORDS.search( + judge.acceptance_statement + ): + return True + return False + + def build_message_history_text( history: List[OptimizationContext], input_text: str, @@ -114,6 +143,7 @@ def build_new_variation_prompt( variable_choices: List[Dict[str, Any]], initial_instructions: str, optimize_for_duration: bool = False, + optimize_for_cost: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -133,6 +163,8 @@ def build_new_variation_prompt( :param initial_instructions: The original unmodified instructions template :param optimize_for_duration: When True, appends a duration optimization section instructing the LLM to prefer faster models and simpler instructions. + :param optimize_for_cost: When True, appends a cost optimization section + instructing the LLM to prefer cheaper models and reduce token usage. :return: The assembled prompt string """ sections = [ @@ -147,6 +179,7 @@ def build_new_variation_prompt( history, model_choices, variable_choices, initial_instructions ), variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", + variation_prompt_cost_optimization(model_choices) if optimize_for_cost else "", ] return "\n\n".join(s for s in sections if s) @@ -248,6 +281,8 @@ def variation_prompt_configuration( lines.append(f"Agent response: {previous_ctx.completion_response}") if previous_ctx.duration_ms is not None: lines.append(f"Agent duration: {previous_ctx.duration_ms:.0f}ms") + if previous_ctx.estimated_cost_usd is not None: + lines.append(f"Estimated agent cost: ${previous_ctx.estimated_cost_usd:.6f}") return "\n".join(lines) else: return "\n".join( @@ -301,6 +336,8 @@ def variation_prompt_feedback( lines.append(feedback_line) if ctx.duration_ms is not None: lines.append(f"Agent duration: {ctx.duration_ms:.0f}ms") + if ctx.estimated_cost_usd is not None: + lines.append(f"Estimated agent cost: ${ctx.estimated_cost_usd:.6f}") return "\n".join(lines) @@ -556,3 +593,49 @@ def variation_prompt_duration_optimization(model_choices: List[str]) -> str: "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower latency.", ] ) + + +def variation_prompt_cost_optimization(model_choices: List[str]) -> str: + """ + Cost optimization section of the variation prompt. + + Included when acceptance criteria imply a cost reduction goal. Instructs + the LLM to treat token usage as a secondary objective — quality criteria + must still be met first — and provides concrete guidance on how to reduce + cost through model selection and instruction simplification. + + :param model_choices: List of model IDs the LLM may select from, so it can + apply its own knowledge of which models tend to be cheaper. + :return: The cost optimization prompt block. + """ + return "\n".join( + [ + "## Cost Optimization:", + "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", + "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", + "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", + "Target both factors with the strategies below.", + "", + "### Reducing token usage (input tokens):", + "- Remove redundant, verbose, or repeated phrasing from the instructions.", + "- Collapse multi-sentence explanations into a single concise directive.", + "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", + "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", + "- Use bullet points instead of prose where possible — they are more token-efficient.", + "", + "### Reducing token usage (output tokens):", + "- Instruct the agent to be concise and avoid unnecessary elaboration.", + "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", + "- Set or reduce max_tokens if the current value allows longer responses than needed.", + "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", + "", + "### Reducing per-token cost via model selection:", + "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", + f" Available models: {model_choices}", + " Use your knowledge of relative model pricing to prefer lower-cost options.", + " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + "", + "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", + "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", + ] + ) diff --git a/packages/optimization/src/ldai_optimizer/util.py b/packages/optimization/src/ldai_optimizer/util.py index 46429e50..8257434a 100644 --- a/packages/optimization/src/ldai_optimizer/util.py +++ b/packages/optimization/src/ldai_optimizer/util.py @@ -5,7 +5,10 @@ import logging import random import re -from typing import Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union + +if TYPE_CHECKING: + from ldai.tracker import TokenUsage from ldai_optimizer._slug_words import _ADJECTIVES, _NOUNS @@ -313,3 +316,43 @@ def judge_passed(score: float, threshold: float, is_inverted: bool) -> bool: the score must stay at or below the threshold: ``score <= threshold``. """ return score <= threshold if is_inverted else score >= threshold + + +def estimate_cost( + usage: Optional["TokenUsage"], + model_config: Optional[Dict[str, Any]], +) -> Optional[float]: + """Estimate the monetary cost of a single agent call. + + Uses ``costPerInputToken`` and ``costPerOutputToken`` from the model config + when available. If the model config has no pricing fields, falls back to + returning the raw total token count as a dimensionless proxy so the cost + gate can still operate comparatively. Returns ``None`` only when ``usage`` + itself is ``None``. + + ``costPerCachedInputToken`` is intentionally ignored — the estimate uses + input/output tokens only, which is sufficient for relative comparison + across optimization iterations. + + :param usage: Token usage from the agent call. When ``None``, returns ``None``. + :param model_config: Model config dict from ``get_model_configs()``, or ``None``. + :return: Estimated cost in USD, or raw total token count as proxy, or ``None``. + """ + if usage is None: + return None + + input_price = model_config.get("costPerInputToken") if model_config else None + output_price = model_config.get("costPerOutputToken") if model_config else None + + if input_price is not None or output_price is not None: + cost = 0.0 + if input_price is not None and usage.input is not None: + cost += usage.input * input_price + if output_price is not None and usage.output is not None: + cost += usage.output * output_price + return cost + + logger.debug( + "No pricing data on model config for cost estimation; falling back to total token count" + ) + return float(usage.total or 0) diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 46f2d876..ae492fa3 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -26,15 +26,17 @@ ToolDefinition, ) from ldai_optimizer.prompts import ( + _acceptance_criteria_implies_cost_optimization, _acceptance_criteria_implies_duration_optimization, build_new_variation_prompt, variation_prompt_acceptance_criteria, + variation_prompt_cost_optimization, variation_prompt_feedback, variation_prompt_improvement_instructions, variation_prompt_overfit_warning, variation_prompt_preamble, ) -from ldai_optimizer.util import interpolate_variables +from ldai_optimizer.util import estimate_cost, interpolate_variables from ldai_optimizer.util import ( restore_variable_placeholders, ) @@ -235,6 +237,43 @@ def test_converts_object_with_to_dict(self): # --------------------------------------------------------------------------- +class TestIsTokenLimitExceeded: + def _client_with_limit(self, limit): + client = _make_client() + client._options = _make_options(token_limit=limit) + return client + + def test_no_limit_returns_false(self): + client = self._client_with_limit(None) + client._total_token_usage = 9999 + assert client._is_token_limit_exceeded() is False + + def test_zero_limit_treated_as_no_limit(self): + client = self._client_with_limit(0) + client._total_token_usage = 0 + assert client._is_token_limit_exceeded() is False + + def test_zero_limit_with_high_usage_returns_false(self): + client = self._client_with_limit(0) + client._total_token_usage = 100_000 + assert client._is_token_limit_exceeded() is False + + def test_positive_limit_not_yet_reached(self): + client = self._client_with_limit(1000) + client._total_token_usage = 999 + assert client._is_token_limit_exceeded() is False + + def test_positive_limit_exactly_reached(self): + client = self._client_with_limit(1000) + client._total_token_usage = 1000 + assert client._is_token_limit_exceeded() is True + + def test_positive_limit_exceeded(self): + client = self._client_with_limit(1000) + client._total_token_usage = 1001 + assert client._is_token_limit_exceeded() is True + + class TestEvaluateResponse: def setup_method(self): self.client = _make_client() @@ -4656,3 +4695,378 @@ def test_mixed_judges_feedback_reflects_correct_pass_fail(self): # Both should be PASSED — relevance high enough, toxicity low enough assert result.count("PASSED") == 2 assert "FAILED" not in result + + +# --------------------------------------------------------------------------- +# estimate_cost helper +# --------------------------------------------------------------------------- + + +class TestEstimateCost: + def _usage(self, total=100, inp=60, out=40) -> TokenUsage: + return TokenUsage(total=total, input=inp, output=out) + + def test_returns_none_when_usage_is_none(self): + assert estimate_cost(None, {"costPerInputToken": 0.001}) is None + + def test_uses_pricing_when_available(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001 + 40 * 0.002) + + def test_uses_only_input_price_when_output_absent(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerInputToken": 0.001} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001) + + def test_uses_only_output_price_when_input_absent(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerOutputToken": 0.002} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(40 * 0.002) + + def test_falls_back_to_total_token_count_when_no_pricing(self): + usage = self._usage(total=100) + cost = estimate_cost(usage, {}) + assert cost == 100.0 + + def test_falls_back_to_total_token_count_when_model_config_none(self): + usage = self._usage(total=250) + cost = estimate_cost(usage, None) + assert cost == 250.0 + + def test_ignores_cached_input_token_price(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = { + "costPerInputToken": 0.001, + "costPerOutputToken": 0.002, + "costPerCachedInputToken": 0.0005, + } + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001 + 40 * 0.002) + + def test_zero_usage_with_pricing_returns_zero(self): + usage = TokenUsage(total=0, input=0, output=0) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# _acceptance_criteria_implies_cost_optimization +# --------------------------------------------------------------------------- + + +class TestAcceptanceCriteriaImpliesCostOptimization: + def _judge(self, statement: str) -> Dict[str, OptimizationJudge]: + return {"j": OptimizationJudge(threshold=0.9, acceptance_statement=statement)} + + def test_returns_false_when_judges_none(self): + assert _acceptance_criteria_implies_cost_optimization(None) is False + + def test_returns_false_when_no_acceptance_statements(self): + judges = {"j": OptimizationJudge(threshold=0.9, judge_key="some-judge")} + assert _acceptance_criteria_implies_cost_optimization(judges) is False + + def test_detects_cheap(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Keep it cheap.")) + + def test_detects_cost(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Reduce overall cost.")) + + def test_detects_costs_plural(self): + assert _acceptance_criteria_implies_cost_optimization( + self._judge("Keep the costs stable or lower them.") + ) + + def test_detects_budget(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Stay within budget.")) + + def test_detects_tokens(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Use fewer tokens.")) + + def test_detects_billing(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Minimize billing.")) + + def test_detects_spend(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Reduce spend on API calls.")) + + def test_case_insensitive(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("BUDGET FRIENDLY response")) + + def test_no_match_on_unrelated_statement(self): + assert not _acceptance_criteria_implies_cost_optimization( + self._judge("Respond accurately and concisely.") + ) + + def test_multiple_judges_one_matches(self): + judges = { + "j1": OptimizationJudge(threshold=0.9, acceptance_statement="Be accurate."), + "j2": OptimizationJudge(threshold=0.9, acceptance_statement="Use fewer tokens."), + } + assert _acceptance_criteria_implies_cost_optimization(judges) + + +# --------------------------------------------------------------------------- +# _evaluate_cost +# --------------------------------------------------------------------------- + + +class TestEvaluateCost: + def setup_method(self): + self.client = _make_client() + self.client._agent_key = "test-agent" + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._options = _make_options() + + def _ctx(self, cost: float, iteration: int = 2) -> OptimizationContext: + return OptimizationContext( + scores={}, + completion_response="ok", + current_instructions="inst", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def _seed_history(self, baseline_cost: float): + self.client._history = [self._ctx(baseline_cost, iteration=1)] + + def test_passes_when_cost_improved_beyond_tolerance(self): + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(0.007)) is True + + def test_fails_when_cost_not_improved_enough(self): + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(0.009)) is False + + def test_passes_at_exact_tolerance_boundary(self): + self._seed_history(0.010) + # 0.010 * 0.80 = 0.008; must be strictly less than 0.008 + assert self.client._evaluate_cost(self._ctx(0.0079)) is True + assert self.client._evaluate_cost(self._ctx(0.008)) is False + + def test_skips_gracefully_when_history_empty(self): + self.client._history = [] + assert self.client._evaluate_cost(self._ctx(0.005)) is True + + def test_skips_gracefully_when_baseline_cost_none(self): + self.client._history = [self._ctx(None)] # type: ignore[arg-type] + assert self.client._evaluate_cost(self._ctx(0.005)) is True + + def test_skips_gracefully_when_candidate_cost_none(self): + self._seed_history(0.010) + ctx = self._ctx(None) # type: ignore[arg-type] + assert self.client._evaluate_cost(ctx) is True + + def test_works_with_token_count_proxy(self): + # When no pricing data, cost is raw token count — gate still compares numerically + self._seed_history(1000.0) + assert self.client._evaluate_cost(self._ctx(750.0)) is True + assert self.client._evaluate_cost(self._ctx(900.0)) is False + + +# --------------------------------------------------------------------------- +# variation_prompt_cost_optimization +# --------------------------------------------------------------------------- + + +class TestVariationPromptCostOptimization: + def test_section_header_present(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"]) + assert "## Cost Optimization:" in result + + def test_mentions_available_models(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"]) + assert "gpt-4o" in result + + def test_mentions_quality_primary(self): + result = variation_prompt_cost_optimization(["gpt-4o"]) + assert "primary objective" in result.lower() + + def test_mentions_token_reduction(self): + result = variation_prompt_cost_optimization(["gpt-4o"]) + assert "token" in result.lower() + + +class TestBuildNewVariationPromptCost: + def _make_history(self) -> list: + return [ + OptimizationContext( + scores={}, + completion_response="response", + current_instructions="instructions", + current_parameters={}, + current_variables={}, + iteration=1, + ) + ] + + def test_cost_section_absent_by_default(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst" + ) + assert "Cost Optimization" not in result + + def test_cost_section_included_when_flag_set(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + optimize_for_cost=True, + ) + assert "Cost Optimization" in result + + def test_duration_and_cost_sections_both_present(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + optimize_for_duration=True, + optimize_for_cost=True, + ) + assert "Duration Optimization" in result + assert "Cost Optimization" in result + + +# --------------------------------------------------------------------------- +# variation_prompt_feedback shows estimated_cost_usd +# --------------------------------------------------------------------------- + + +class TestVariationPromptFeedbackCost: + def _make_ctx(self, cost: float | None, iteration: int = 1) -> OptimizationContext: + return OptimizationContext( + scores={"judge": JudgeResult(score=0.9)}, + completion_response="ok", + current_instructions="inst", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def test_cost_shown_when_present(self): + ctx = self._make_ctx(0.001234) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx], judges) + assert "Estimated agent cost: $0.001234" in result + + def test_cost_omitted_when_none(self): + ctx = self._make_ctx(None) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx], judges) + assert "Estimated agent cost" not in result + + def test_cost_shown_per_iteration(self): + ctx1 = self._make_ctx(0.001, iteration=1) + ctx2 = self._make_ctx(0.0007, iteration=2) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx1, ctx2], judges) + assert "$0.001000" in result + assert "$0.000700" in result + + +# --------------------------------------------------------------------------- +# _evaluate_acceptance_judge cost augmentation +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestEvaluateAcceptanceJudgeCostAugmentation: + def setup_method(self): + self.mock_ldai = _make_ldai_client() + self.client = _make_client(self.mock_ldai) + agent_config = _make_agent_config() + self.client._agent_key = "test-agent" + self.client._agent_config = agent_config + self.client._initialize_class_members_from_config(agent_config) + handle_judge_call = AsyncMock(return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE)) + self.client._options = _make_options(handle_judge_call=handle_judge_call) + self.client._model_configs = [] + + def _cost_judge(self) -> OptimizationJudge: + return OptimizationJudge( + threshold=0.9, + acceptance_statement="Use fewer tokens and keep costs low.", + ) + + async def test_cost_context_injected_into_instructions(self): + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + await self.client._evaluate_acceptance_judge( + judge_key="cost-judge", + optimization_judge=self._cost_judge(), + completion_response="response", + iteration=1, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured, "handle_judge_call was not called" + instructions = captured[0] + assert "60 input tokens" in instructions + assert "40 output tokens" in instructions + + async def test_cost_context_not_injected_for_non_cost_judge(self): + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + non_cost_judge = OptimizationJudge( + threshold=0.9, + acceptance_statement="Be accurate and concise.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="quality-judge", + optimization_judge=non_cost_judge, + completion_response="response", + iteration=1, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured + instructions = captured[0] + # The cost-specific augmentation phrase should not appear + assert "cost/token-usage goal" not in instructions + + async def test_baseline_cost_shown_when_history_present(self): + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + baseline_ctx = OptimizationContext( + scores={}, + completion_response="", + current_instructions="", + current_parameters={}, + current_variables={}, + iteration=1, + estimated_cost_usd=500.0, + ) + self.client._history = [baseline_ctx] + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + await self.client._evaluate_acceptance_judge( + judge_key="cost-judge", + optimization_judge=self._cost_judge(), + completion_response="response", + iteration=2, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured + instructions = captured[0] + assert "baseline" in instructions.lower() From dc82818640878cdce7c657e27f6d9189f374d000 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 6 May 2026 15:23:30 -0800 Subject: [PATCH 2/7] fix: remove unnecessary token path --- .../optimization/src/ldai_optimizer/client.py | 19 ++------ .../src/ldai_optimizer/prompts.py | 2 +- .../optimization/src/ldai_optimizer/util.py | 37 +++++++-------- packages/optimization/tests/test_client.py | 45 +++++++++++-------- 4 files changed, 47 insertions(+), 56 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index bdd00611..1fb418fd 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -791,13 +791,6 @@ async def _evaluate_acceptance_judge( else None ) if current_cost is not None: - has_pricing = ( - _find_model_config(self._current_model or "", self._model_configs) or {} - ).get("costPerInputToken") is not None - if has_pricing: - cost_str = f"${current_cost:.6f}" - else: - cost_str = f"{int(current_cost)} tokens" instructions += ( f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. " ) @@ -805,20 +798,14 @@ async def _evaluate_acceptance_judge( instructions += ( f"The agent's response used {agent_usage.input} input tokens " f"and {agent_usage.output} output tokens " - f"(estimated cost: {cost_str}). " + f"(estimated cost: ${current_cost:.6f}). " ) if baseline_cost is not None: delta = current_cost - baseline_cost direction = "less" if delta < 0 else "more" - if has_pricing: - baseline_str = f"${baseline_cost:.6f}" - delta_str = f"${abs(delta):.6f}" - else: - baseline_str = f"{int(baseline_cost)} tokens" - delta_str = f"{int(abs(delta))} tokens" instructions += ( - f"The baseline cost (first iteration) was {baseline_str}. " - f"This response cost {delta_str} {direction} than the baseline. " + f"The baseline cost (first iteration) was ${baseline_cost:.6f}. " + f"This response cost ${abs(delta):.6f} {direction} than the baseline. " ) instructions += ( "In your rationale, state the token usage and cost, and any change from baseline. " diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index 39909f93..eadf35d7 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -18,7 +18,7 @@ _COST_KEYWORDS = re.compile( r"\b(cheap|cheaper|cheapest|costs?|costly|expensive|budget|affordable|" - r"tokens?|spend|spending|economical|cost-effective|frugal|" + r"spend|spending|economical|cost-effective|frugal|" r"price|pricing|bill|billing)\b", re.IGNORECASE, ) diff --git a/packages/optimization/src/ldai_optimizer/util.py b/packages/optimization/src/ldai_optimizer/util.py index 8257434a..a3671e26 100644 --- a/packages/optimization/src/ldai_optimizer/util.py +++ b/packages/optimization/src/ldai_optimizer/util.py @@ -322,21 +322,21 @@ def estimate_cost( usage: Optional["TokenUsage"], model_config: Optional[Dict[str, Any]], ) -> Optional[float]: - """Estimate the monetary cost of a single agent call. + """Estimate the monetary cost of a single agent call in USD. - Uses ``costPerInputToken`` and ``costPerOutputToken`` from the model config - when available. If the model config has no pricing fields, falls back to - returning the raw total token count as a dimensionless proxy so the cost - gate can still operate comparatively. Returns ``None`` only when ``usage`` - itself is ``None``. + Uses ``costPerInputToken`` and ``costPerOutputToken`` from the model config. + Returns ``None`` when either ``usage`` is ``None`` or no pricing fields are + present on the model config — ensuring the return value is always in USD or + absent, never a raw token count. This prevents unit-mismatch bugs when + comparing costs across iterations where the model (and its pricing + availability) may differ. ``costPerCachedInputToken`` is intentionally ignored — the estimate uses - input/output tokens only, which is sufficient for relative comparison - across optimization iterations. + input/output tokens only. :param usage: Token usage from the agent call. When ``None``, returns ``None``. :param model_config: Model config dict from ``get_model_configs()``, or ``None``. - :return: Estimated cost in USD, or raw total token count as proxy, or ``None``. + :return: Estimated cost in USD, or ``None`` if usage or pricing data is absent. """ if usage is None: return None @@ -344,15 +344,12 @@ def estimate_cost( input_price = model_config.get("costPerInputToken") if model_config else None output_price = model_config.get("costPerOutputToken") if model_config else None - if input_price is not None or output_price is not None: - cost = 0.0 - if input_price is not None and usage.input is not None: - cost += usage.input * input_price - if output_price is not None and usage.output is not None: - cost += usage.output * output_price - return cost + if input_price is None and output_price is None: + return None - logger.debug( - "No pricing data on model config for cost estimation; falling back to total token count" - ) - return float(usage.total or 0) + cost = 0.0 + if input_price is not None and usage.input is not None: + cost += usage.input * input_price + if output_price is not None and usage.output is not None: + cost += usage.output * output_price + return cost diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index ae492fa3..82e10d26 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -535,7 +535,7 @@ async def test_duration_context_added_to_instructions_when_latency_keyword_prese ) _, config, _, _ = self.handle_judge_call.call_args.args assert "1500ms" in config.instructions - assert "mention the duration" in config.instructions + assert "state the duration" in config.instructions async def test_duration_context_includes_baseline_comparison_when_history_present(self): """When history[0] has a duration, the judge instructions include a baseline comparison.""" @@ -1842,11 +1842,11 @@ async def test_apply_variation_response_calls_restore_and_logs_warning(self): with patch("ldai_optimizer.client.logger") as mock_logger: await client._generate_new_variation(iteration=1, variables={}) - warning_calls = [ - call for call in mock_logger.warning.call_args_list + debug_calls = [ + call for call in mock_logger.debug.call_args_list if "user-123" in str(call) or "business" in str(call) ] - assert len(warning_calls) >= 1 + assert len(debug_calls) >= 1 assert "{{user_id}}" in client._current_instructions assert "user-123" not in client._current_instructions @@ -4727,15 +4727,13 @@ def test_uses_only_output_price_when_input_absent(self): cost = estimate_cost(usage, model_config) assert cost == pytest.approx(40 * 0.002) - def test_falls_back_to_total_token_count_when_no_pricing(self): + def test_returns_none_when_no_pricing_in_config(self): usage = self._usage(total=100) - cost = estimate_cost(usage, {}) - assert cost == 100.0 + assert estimate_cost(usage, {}) is None - def test_falls_back_to_total_token_count_when_model_config_none(self): + def test_returns_none_when_model_config_none(self): usage = self._usage(total=250) - cost = estimate_cost(usage, None) - assert cost == 250.0 + assert estimate_cost(usage, None) is None def test_ignores_cached_input_token_price(self): usage = self._usage(total=100, inp=60, out=40) @@ -4783,8 +4781,8 @@ def test_detects_costs_plural(self): def test_detects_budget(self): assert _acceptance_criteria_implies_cost_optimization(self._judge("Stay within budget.")) - def test_detects_tokens(self): - assert _acceptance_criteria_implies_cost_optimization(self._judge("Use fewer tokens.")) + def test_does_not_detect_token_to_avoid_false_positives(self): + assert not _acceptance_criteria_implies_cost_optimization(self._judge("Generate a valid authentication token.")) def test_detects_billing(self): assert _acceptance_criteria_implies_cost_optimization(self._judge("Minimize billing.")) @@ -4803,7 +4801,7 @@ def test_no_match_on_unrelated_statement(self): def test_multiple_judges_one_matches(self): judges = { "j1": OptimizationJudge(threshold=0.9, acceptance_statement="Be accurate."), - "j2": OptimizationJudge(threshold=0.9, acceptance_statement="Use fewer tokens."), + "j2": OptimizationJudge(threshold=0.9, acceptance_statement="Keep costs low."), } assert _acceptance_criteria_implies_cost_optimization(judges) @@ -4861,11 +4859,11 @@ def test_skips_gracefully_when_candidate_cost_none(self): ctx = self._ctx(None) # type: ignore[arg-type] assert self.client._evaluate_cost(ctx) is True - def test_works_with_token_count_proxy(self): - # When no pricing data, cost is raw token count — gate still compares numerically - self._seed_history(1000.0) - assert self.client._evaluate_cost(self._ctx(750.0)) is True - assert self.client._evaluate_cost(self._ctx(900.0)) is False + def test_skips_gracefully_when_units_differ_across_model_switch(self): + # If baseline was captured with pricing (USD) but candidate has no pricing, + # candidate cost is None and the gate skips rather than comparing incompatible units. + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(None)) is True # --------------------------------------------------------------------------- @@ -4986,10 +4984,18 @@ def setup_method(self): def _cost_judge(self) -> OptimizationJudge: return OptimizationJudge( threshold=0.9, - acceptance_statement="Use fewer tokens and keep costs low.", + acceptance_statement="Keep costs low and stay within budget.", ) + def _set_pricing(self): + """Give the client a model config with pricing so estimate_cost returns USD.""" + self.client._current_model = "gpt-4o" + self.client._model_configs = [ + {"id": "gpt-4o", "costPerInputToken": 0.000005, "costPerOutputToken": 0.000015} + ] + async def test_cost_context_injected_into_instructions(self): + self._set_pricing() usage = TokenUsage(total=100, input=60, output=40) captured: list = [] @@ -5040,6 +5046,7 @@ async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): assert "cost/token-usage goal" not in instructions async def test_baseline_cost_shown_when_history_present(self): + self._set_pricing() usage = TokenUsage(total=100, input=60, output=40) captured: list = [] From 365fa94cb0b5ba62097c038e5cf70b7265c68e14 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Thu, 7 May 2026 14:03:14 -0800 Subject: [PATCH 3/7] feat: adds reporting for cost and latency optimization failures --- .../optimization/src/ldai_optimizer/client.py | 375 ++++++--- .../src/ldai_optimizer/dataclasses.py | 3 + .../src/ldai_optimizer/ld_api_client.py | 2 +- .../src/ldai_optimizer/prompts.py | 87 ++- packages/optimization/tests/test_client.py | 730 +++++++++++++++++- 5 files changed, 1052 insertions(+), 145 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 1fb418fd..d08bc0f1 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -22,7 +22,7 @@ import random import time import uuid -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union from ldai import AIAgentConfig, AIJudgeConfig, AIJudgeConfigDefault, LDAIClient from ldai.models import LDMessage, ModelConfig @@ -132,8 +132,30 @@ def _compute_validation_count(pool_size: int) -> int: # Cost gate: a candidate must cost at most this fraction of the baseline # (history[0].estimated_cost_usd) to pass when acceptance criteria imply a -# cost reduction goal. 0.80 means at least 20% cheaper than the baseline. -_COST_TOLERANCE = 0.80 +# cost reduction goal. 0.90 means at least 10% cheaper than the baseline. +_COST_TOLERANCE = 0.90 + +# Maximum number of history items retained in the standard (non-GT) optimizer. +# Since user inputs are randomly selected there is no "full pass" concept, so +# a small fixed window is sufficient context for variation generation. +_MAX_STANDARD_HISTORY_LENGTH = 5 + + +def _trim_history( + history: List["OptimizationContext"], max_len: int +) -> List["OptimizationContext"]: + """Trim history to at most max_len of the most recent items. + + The duration/cost baselines are captured explicitly in ``_baseline_duration_ms`` + and ``_baseline_cost_usd`` so the oldest entry no longer needs to be preserved. + + :param history: Current accumulated history list. + :param max_len: Maximum number of items to retain (must be >= 1). + :return: Trimmed history list, or the original list if already within limit. + """ + if len(history) <= max_len: + return history + return history[-max_len:] # Maps SDK status strings to the API status/activity values expected by # agent_optimization_result records. Defined at module level to avoid @@ -195,6 +217,23 @@ def _initialize_class_members_from_config( agent_config.model.name if agent_config.model else None ) self._history: List[OptimizationContext] = [] + # Explicit baseline captured from the first iteration ever appended to history. + # Stored separately so that history truncation can be a pure slice without + # having to preserve history[0] as an anchor. + self._baseline_duration_ms: Optional[float] = None + self._baseline_cost_usd: Optional[float] = None + + def _record_baseline(self, ctx: OptimizationContext) -> None: + """Capture duration/cost baseline from the first iteration appended to history. + + Called once per run (subsequent calls are no-ops once both values are set). + Storing these explicitly lets ``_trim_history`` use a simple tail slice without + needing to preserve ``history[0]`` as an anchor. + """ + if self._baseline_duration_ms is None and ctx.duration_ms is not None: + self._baseline_duration_ms = ctx.duration_ms + if self._baseline_cost_usd is None and ctx.estimated_cost_usd is not None: + self._baseline_cost_usd = ctx.estimated_cost_usd def _build_agent_config_for_context( self, ctx: OptimizationContext, skip_interpolation: bool = False @@ -260,6 +299,7 @@ def _create_optimization_context( user_input=user_input, history=tuple(flat_history), iteration=iteration, + accumulated_token_usage=self._total_token_usage if self._total_token_usage > 0 else None, ) @property @@ -755,11 +795,7 @@ async def _evaluate_acceptance_judge( {judge_key: optimization_judge} ) ): - baseline_ms = ( - self._history[0].duration_ms - if self._history and self._history[0].duration_ms is not None - else None - ) + baseline_ms = self._baseline_duration_ms instructions += ( f"\n\nThe acceptance criteria for this judge includes a latency/duration goal. " f"The agent's response took {agent_duration_ms:.0f}ms to generate. " @@ -785,11 +821,7 @@ async def _evaluate_acceptance_judge( agent_usage, _find_model_config(self._current_model or "", self._model_configs), ) - baseline_cost = ( - self._history[0].estimated_cost_usd - if self._history and self._history[0].estimated_cost_usd is not None - else None - ) + baseline_cost = self._baseline_cost_usd if current_cost is not None: instructions += ( f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. " @@ -1100,27 +1132,13 @@ async def _run_ground_truth_optimization( expected_response=sample.expected_response, ) self._accumulate_tokens(optimize_context) - if self._is_token_limit_exceeded(): - logger.error( - "[GT Attempt %d] -> Token limit exceeded on sample %d (total=%d)", - attempt, - i + 1, - self._total_token_usage, - ) - attempt_results.append(optimize_context) - self._last_run_succeeded = False - self._last_succeeded_context = None - self._safe_status_update("failure", optimize_context, linear_iter) - if self._options.on_failing_result: - try: - self._options.on_failing_result(optimize_context) - except Exception: - logger.exception( - "[GT Attempt %d] -> on_failing_result callback failed", attempt - ) - return attempt_results + optimize_context = dataclasses.replace( + optimize_context, accumulated_token_usage=self._total_token_usage + ) - # Per-sample pass/fail check + # Per-sample pass/fail check — evaluated before the token limit gate so + # that a sample which passed is not incorrectly stamped as FAILED simply + # because the budget was exhausted after its scores were computed. if self._options.on_turn is not None: try: sample_passed = self._options.on_turn(optimize_context) @@ -1134,15 +1152,8 @@ async def _run_ground_truth_optimization( else: sample_passed = self._evaluate_response(optimize_context) - if sample_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - sample_passed = self._evaluate_duration(optimize_context) - - if sample_passed and _acceptance_criteria_implies_cost_optimization( - self._options.judges - ): - sample_passed = self._evaluate_cost(optimize_context) + sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context) + sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context) if not sample_passed: logger.info( @@ -1163,6 +1174,10 @@ async def _run_ground_truth_optimization( attempt_results.append(optimize_context) + # Persist the completed sample so every API record gets its scores, + # generation tokens, and accumulated_total — not just the final one. + self._safe_status_update("turn completed", optimize_context, linear_iter) + if gt_options.on_sample_result is not None: try: gt_options.on_sample_result(optimize_context) @@ -1173,6 +1188,42 @@ async def _run_ground_truth_optimization( i + 1, ) + # Token limit check after pass/fail so the terminal status reflects + # whether the samples actually passed, not just that the budget was hit. + # Mark success only when every sample in this attempt was processed and + # all passed; stopping mid-batch is always a failure even if the + # partially-processed samples looked good. + if self._is_token_limit_exceeded(): + logger.error( + "[GT Attempt %d] -> Token limit exceeded on sample %d (total=%d)", + attempt, + i + 1, + self._total_token_usage, + ) + if all_passed and i == n - 1: + self._last_run_succeeded = True + self._last_succeeded_context = optimize_context + self._safe_status_update("success", optimize_context, linear_iter) + if self._options.on_passing_result: + try: + self._options.on_passing_result(optimize_context) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_passing_result callback failed", attempt + ) + else: + self._last_run_succeeded = False + self._last_succeeded_context = None + self._safe_status_update("failure", optimize_context, linear_iter) + if self._options.on_failing_result: + try: + self._options.on_failing_result(optimize_context) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_failing_result callback failed", attempt + ) + return attempt_results + last_ctx = attempt_results[-1] if all_passed: @@ -1212,8 +1263,12 @@ async def _run_ground_truth_optimization( return attempt_results # Append all N results to history so the variation generator has full context - # from all of the previous samples + # from all of the previous samples, then trim to one full attempt's worth so + # judge prompts don't grow unboundedly across many failed attempts. + if attempt_results: + self._record_baseline(attempt_results[0]) self._history.extend(attempt_results) + self._history = _trim_history(self._history, n) logger.info( "[GT Attempt %d] -> %d/%d samples failed — generating new variation", @@ -1393,6 +1448,7 @@ async def _generate_new_variation( optimize_for_cost = _acceptance_criteria_implies_cost_optimization( self._options.judges ) + quality_already_passing = self._all_judges_passing() instructions = build_new_variation_prompt( self._history, self._options.judges, @@ -1404,6 +1460,7 @@ async def _generate_new_variation( self._initial_instructions, optimize_for_duration=optimize_for_duration, optimize_for_cost=optimize_for_cost, + quality_already_passing=quality_already_passing, ) # Create a flat history list (without nested history) to avoid exponential growth @@ -1440,6 +1497,8 @@ async def _generate_new_variation( False, ) variation_response: OptimizationResponse = await await_if_needed(result) + if variation_response.usage is not None: + self._total_token_usage += variation_response.usage.total or 0 response_str = variation_response.output try: response_data = extract_json_from_response(response_str) @@ -1707,11 +1766,18 @@ def _persist_and_forward( if snapshot.duration_ms is not None: patch["generationLatency"] = int(snapshot.duration_ms) if snapshot.usage is not None: - patch["generationTokens"] = { + gen_tokens: Dict[str, Any] = { "total": snapshot.usage.total, "input": snapshot.usage.input, "output": snapshot.usage.output, } + if snapshot.accumulated_token_usage is not None: + gen_tokens["accumulated_total"] = snapshot.accumulated_token_usage + patch["generationTokens"] = gen_tokens + elif snapshot.accumulated_token_usage is not None: + patch["generationTokens"] = { + "accumulated_total": snapshot.accumulated_token_usage + } eval_latencies = { k: v.duration_ms for k, v in snapshot.scores.items() @@ -1948,22 +2014,22 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: """ Check whether the candidate's duration meets the improvement target vs. the baseline. - The baseline is history[0].duration_ms — the very first completed iteration, - representing the original unoptimized configuration's latency. The candidate - must be at least _DURATION_TOLERANCE faster (default: 20% improvement). + The baseline is the duration_ms from the very first iteration appended to history, + captured in ``_baseline_duration_ms``. The candidate must be at least + _DURATION_TOLERANCE faster (default: 20% improvement). - Returns True without blocking when no baseline is available (empty history or - history[0].duration_ms is None), or when the candidate's duration_ms was not - captured. This avoids penalising configurations when timing data is missing. + Returns True without blocking when no baseline is available or when the candidate's + duration_ms was not captured. This avoids penalising configurations when timing data + is missing. :param optimize_context: The completed turn context containing duration_ms :return: True if the duration requirement is met or cannot be checked """ - if not self._history or self._history[0].duration_ms is None: + if self._baseline_duration_ms is None: return True if optimize_context.duration_ms is None: return True - baseline = self._history[0].duration_ms + baseline = self._baseline_duration_ms passed = optimize_context.duration_ms < baseline * _DURATION_TOLERANCE if not passed: logger.warning( @@ -1980,26 +2046,25 @@ def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: """ Check whether the candidate's estimated cost meets the improvement target vs. the baseline. - The baseline is history[0].estimated_cost_usd — the very first completed iteration, - representing the original unoptimized configuration's cost. The candidate must be - at least _COST_TOLERANCE cheaper (default: 20% improvement). + The baseline is the estimated_cost_usd from the very first iteration appended to + history, captured in ``_baseline_cost_usd``. The candidate must be at least + _COST_TOLERANCE cheaper (default: 10% improvement). The cost value is in USD when model pricing data is available, or raw total token count as a proxy when pricing is absent. Both are comparable relative to their own baselines. - Returns True without blocking when no baseline is available (empty history or - history[0].estimated_cost_usd is None), or when the candidate's cost was not - captured. This avoids penalising configurations when cost data is missing. + Returns True without blocking when no baseline is available or when the candidate's + cost was not captured. This avoids penalising configurations when cost data is missing. :param optimize_context: The completed turn context containing estimated_cost_usd :return: True if the cost requirement is met or cannot be checked """ - if not self._history or self._history[0].estimated_cost_usd is None: + if self._baseline_cost_usd is None: return True if optimize_context.estimated_cost_usd is None: return True - baseline = self._history[0].estimated_cost_usd + baseline = self._baseline_cost_usd passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE if not passed: logger.warning( @@ -2012,6 +2077,132 @@ def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: ) return passed + def _all_judges_passing(self) -> bool: + """Return True if every user-configured judge passed in the most recent history entry. + + Inspects the last context in ``_history`` and checks each score key that + corresponds to a judge defined in ``_options.judges`` (skipping synthetic gate + entries whose keys begin with ``_``). Returns False when history is empty or any + judge score does not meet its threshold. + + This is used to decide whether variation generation should preserve the current + behavior and only optimise for cost, rather than trying to improve quality further. + """ + if not self._history or not self._options.judges: + return False + recent = self._history[-1] + if not recent.scores: + return False + for key, judge in self._options.judges.items(): + result = recent.scores.get(key) + if result is None: + return False + threshold = judge.threshold if judge.threshold is not None else 1.0 + if not judge_passed(result.score, threshold, judge.is_inverted): + return False + return True + + def _apply_duration_gate( + self, passed_so_far: bool, ctx: OptimizationContext + ) -> Tuple[bool, OptimizationContext]: + """Apply the latency improvement gate and record its result in ctx.scores. + + When the gate is active (any acceptance statement implies latency optimization), + evaluates whether the candidate's duration improved by at least + _DURATION_TOLERANCE vs the baseline. A synthetic ``_latency_gate`` entry is + added to scores with score=1.0 on pass or score=0.0 on fail so the outcome + is visible in the API result and UI. + + The gate is skipped (no score entry added) when: + - No acceptance statement implies latency optimization. + - ``passed_so_far`` is already False (a prior check failed the sample). + + :param passed_so_far: Whether all prior checks for this sample passed. + :param ctx: Current optimization context. + :return: (passed, updated_ctx) where passed reflects gate outcome. + """ + if not _acceptance_criteria_implies_duration_optimization(self._options.judges): + return passed_so_far, ctx + if not passed_so_far: + return passed_so_far, ctx + passed = self._evaluate_duration(ctx) + if passed: + if self._baseline_duration_ms is not None and ctx.duration_ms is not None: + rationale = ( + f"Latency improvement gate passed: {ctx.duration_ms:.0f}ms is at least " + f"{int((1 - _DURATION_TOLERANCE) * 100)}% faster than baseline " + f"{self._baseline_duration_ms:.0f}ms." + ) + else: + rationale = "Latency gate passed (no baseline)." + score = 1.0 + else: + if self._baseline_duration_ms is not None and ctx.duration_ms is not None: + rationale = ( + f"Latency improvement gate failed: {ctx.duration_ms:.0f}ms did not improve " + f"by {int((1 - _DURATION_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_duration_ms:.0f}ms " + f"(required < {self._baseline_duration_ms * _DURATION_TOLERANCE:.0f}ms)." + ) + else: + rationale = "Latency gate failed (no baseline data)." + score = 0.0 + ctx = dataclasses.replace( + ctx, + scores={**ctx.scores, "_latency_gate": JudgeResult(score=score, rationale=rationale)}, + ) + return passed, ctx + + def _apply_cost_gate( + self, passed_so_far: bool, ctx: OptimizationContext + ) -> Tuple[bool, OptimizationContext]: + """Apply the cost improvement gate and record its result in ctx.scores. + + When the gate is active (any acceptance statement implies cost optimization), + evaluates whether the candidate's estimated cost improved by at least + _COST_TOLERANCE vs the baseline. A synthetic ``_cost_gate`` entry is + added to scores with score=1.0 on pass or score=0.0 on fail. + + The gate is skipped (no score entry added) when: + - No acceptance statement implies cost optimization. + - ``passed_so_far`` is already False (a prior check failed the sample). + + :param passed_so_far: Whether all prior checks for this sample passed. + :param ctx: Current optimization context. + :return: (passed, updated_ctx) where passed reflects gate outcome. + """ + if not _acceptance_criteria_implies_cost_optimization(self._options.judges): + return passed_so_far, ctx + if not passed_so_far: + return passed_so_far, ctx + passed = self._evaluate_cost(ctx) + if passed: + if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: + rationale = ( + f"Cost improvement gate passed: {ctx.estimated_cost_usd:.6f} is at least " + f"{int((1 - _COST_TOLERANCE) * 100)}% cheaper than baseline " + f"{self._baseline_cost_usd:.6f}." + ) + else: + rationale = "Cost gate passed (no baseline)." + score = 1.0 + else: + if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: + rationale = ( + f"Cost improvement gate failed: {ctx.estimated_cost_usd:.6f} did not improve " + f"by {int((1 - _COST_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_cost_usd:.6f} " + f"(required < {self._baseline_cost_usd * _COST_TOLERANCE:.6f})." + ) + else: + rationale = "Cost gate failed (no baseline data)." + score = 0.0 + ctx = dataclasses.replace( + ctx, + scores={**ctx.scores, "_cost_gate": JudgeResult(score=score, rationale=rationale)}, + ) + return passed, ctx + def _handle_success( self, optimize_context: OptimizationContext, iteration: int ) -> Any: @@ -2265,15 +2456,12 @@ async def _run_validation_phase( self._safe_status_update("generating", val_ctx, val_iter) val_ctx = await self._execute_agent_turn(val_ctx, val_iter) self._accumulate_tokens(val_ctx) - if self._is_token_limit_exceeded(): - logger.error( - "[Validation %d/%d] -> Token limit exceeded (total=%d)", - i + 1, - validation_count, - self._total_token_usage, - ) - return False, val_ctx + val_ctx = dataclasses.replace( + val_ctx, accumulated_token_usage=self._total_token_usage + ) + # Evaluate pass/fail before the token limit check so a passing validation + # sample is not incorrectly treated as a failure due to budget exhaustion. if options.on_turn is not None: try: sample_passed = options.on_turn(val_ctx) @@ -2285,15 +2473,17 @@ async def _run_validation_phase( else: sample_passed = self._evaluate_response(val_ctx) - if sample_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - sample_passed = self._evaluate_duration(val_ctx) + sample_passed, val_ctx = self._apply_duration_gate(sample_passed, val_ctx) + sample_passed, val_ctx = self._apply_cost_gate(sample_passed, val_ctx) - if sample_passed and _acceptance_criteria_implies_cost_optimization( - self._options.judges - ): - sample_passed = self._evaluate_cost(val_ctx) + if self._is_token_limit_exceeded(): + logger.error( + "[Validation %d/%d] -> Token limit exceeded (total=%d)", + i + 1, + validation_count, + self._total_token_usage, + ) + return False, val_ctx last_ctx = val_ctx @@ -2381,13 +2571,9 @@ async def _run_optimization( optimize_context, iteration ) self._accumulate_tokens(optimize_context) - if self._is_token_limit_exceeded(): - logger.error( - "[Iteration %d] -> Token limit exceeded (total=%d)", - iteration, - self._total_token_usage, - ) - return self._handle_failure(optimize_context, iteration) + optimize_context = dataclasses.replace( + optimize_context, accumulated_token_usage=self._total_token_usage + ) # Manual path: on_turn callback gives caller full control over pass/fail if self._options.on_turn is not None: @@ -2414,15 +2600,18 @@ async def _run_optimization( iteration, ) - if initial_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - initial_passed = self._evaluate_duration(optimize_context) + initial_passed, optimize_context = self._apply_duration_gate(initial_passed, optimize_context) + initial_passed, optimize_context = self._apply_cost_gate(initial_passed, optimize_context) - if initial_passed and _acceptance_criteria_implies_cost_optimization( - self._options.judges - ): - initial_passed = self._evaluate_cost(optimize_context) + # Token limit check after pass/fail evaluation so the persisted record + # correctly reflects whether the iteration passed before stopping the run. + if self._is_token_limit_exceeded(): + logger.error( + "[Iteration %d] -> Token limit exceeded (total=%d)", + iteration, + self._total_token_usage, + ) + return self._handle_failure(optimize_context, iteration) if initial_passed: all_valid, last_ctx = await self._run_validation_phase( @@ -2445,7 +2634,9 @@ async def _run_optimization( ) if iteration >= self._options.max_attempts: return self._handle_failure(optimize_context, iteration) + self._record_baseline(last_ctx) self._history.append(last_ctx) + self._history = _trim_history(self._history, _MAX_STANDARD_HISTORY_LENGTH) try: await self._generate_new_variation( iteration, last_ctx.current_variables @@ -2475,7 +2666,9 @@ async def _run_optimization( ) if iteration >= self._options.max_attempts: return self._handle_failure(optimize_context, iteration) + self._record_baseline(optimize_context) self._history.append(optimize_context) + self._history = _trim_history(self._history, _MAX_STANDARD_HISTORY_LENGTH) try: await self._generate_new_variation( iteration, optimize_context.current_variables diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index 1f5e28c2..2a9f4542 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -218,6 +218,7 @@ class OptimizationContext: duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens + accumulated_token_usage: Optional[int] = None # single running total across ALL calls in this run (generation + judges + variation) def copy_without_history(self) -> OptimizationContext: """ @@ -238,6 +239,7 @@ def copy_without_history(self) -> OptimizationContext: duration_ms=self.duration_ms, usage=self.usage, estimated_cost_usd=self.estimated_cost_usd, + accumulated_token_usage=self.accumulated_token_usage, ) def to_json(self) -> Dict[str, Any]: @@ -264,6 +266,7 @@ def to_json(self) -> Dict[str, Any]: "iteration": self.iteration, "duration_ms": self.duration_ms, "estimated_cost_usd": self.estimated_cost_usd, + "accumulated_token_usage": self.accumulated_token_usage, } if self.usage is not None: result["usage"] = { diff --git a/packages/optimization/src/ldai_optimizer/ld_api_client.py b/packages/optimization/src/ldai_optimizer/ld_api_client.py index 3efa725d..37f6549e 100644 --- a/packages/optimization/src/ldai_optimizer/ld_api_client.py +++ b/packages/optimization/src/ldai_optimizer/ld_api_client.py @@ -118,7 +118,7 @@ class AgentOptimizationResultPatch(TypedDict, total=False): completionResponse: str scores: Dict[str, Any] generationLatency: int - generationTokens: Dict[str, int] + generationTokens: Dict[str, Any] evaluationLatencies: Dict[str, float] evaluationTokens: Dict[str, Dict[str, int]] variation: Dict[str, Any] diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index eadf35d7..0cc8a53d 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -144,6 +144,7 @@ def build_new_variation_prompt( initial_instructions: str, optimize_for_duration: bool = False, optimize_for_cost: bool = False, + quality_already_passing: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -165,6 +166,9 @@ def build_new_variation_prompt( instructing the LLM to prefer faster models and simpler instructions. :param optimize_for_cost: When True, appends a cost optimization section instructing the LLM to prefer cheaper models and reduce token usage. + :param quality_already_passing: When True, signals that all judge criteria are + currently passing and the cost optimization section should instruct the LLM + to preserve existing behavior while only reducing cost. :return: The assembled prompt string """ sections = [ @@ -179,7 +183,7 @@ def build_new_variation_prompt( history, model_choices, variable_choices, initial_instructions ), variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", - variation_prompt_cost_optimization(model_choices) if optimize_for_cost else "", + variation_prompt_cost_optimization(model_choices, quality_already_passing=quality_already_passing) if optimize_for_cost else "", ] return "\n\n".join(s for s in sections if s) @@ -595,7 +599,10 @@ def variation_prompt_duration_optimization(model_choices: List[str]) -> str: ) -def variation_prompt_cost_optimization(model_choices: List[str]) -> str: +def variation_prompt_cost_optimization( + model_choices: List[str], + quality_already_passing: bool = False, +) -> str: """ Cost optimization section of the variation prompt. @@ -604,38 +611,62 @@ def variation_prompt_cost_optimization(model_choices: List[str]) -> str: must still be met first — and provides concrete guidance on how to reduce cost through model selection and instruction simplification. + When ``quality_already_passing`` is True, the framing shifts: since all + judge criteria are already satisfied, the LLM is instructed to preserve + the existing behavior exactly and only apply changes that reduce cost + without affecting output quality. + :param model_choices: List of model IDs the LLM may select from, so it can apply its own knowledge of which models tend to be cheaper. + :param quality_already_passing: When True, signals that all judge criteria + are currently passing. The section will direct the LLM to preserve + output quality and focus exclusively on cost reduction strategies. :return: The cost optimization prompt block. """ - return "\n".join( - [ + if quality_already_passing: + intent_lines = [ "## Cost Optimization:", "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", - "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", - "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", - "Target both factors with the strategies below.", + "*** IMPORTANT: All quality acceptance criteria are currently passing. ***", + "The goal of this variation is to reduce cost WITHOUT changing the behavior or quality of the agent's responses.", + "Do NOT alter the instructions in ways that would change what the agent says or how it reasons.", + "Only apply changes that reduce token usage or switch to a cheaper model while preserving the same output quality.", + "If you cannot reduce cost without risking quality, keep the instructions unchanged and only consider a cheaper model.", "", - "### Reducing token usage (input tokens):", - "- Remove redundant, verbose, or repeated phrasing from the instructions.", - "- Collapse multi-sentence explanations into a single concise directive.", - "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", - "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", - "- Use bullet points instead of prose where possible — they are more token-efficient.", - "", - "### Reducing token usage (output tokens):", - "- Instruct the agent to be concise and avoid unnecessary elaboration.", - "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", - "- Set or reduce max_tokens if the current value allows longer responses than needed.", - "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", - "", - "### Reducing per-token cost via model selection:", - "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", - f" Available models: {model_choices}", - " Use your knowledge of relative model pricing to prefer lower-cost options.", - " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + ] + else: + intent_lines = [ + "## Cost Optimization:", + "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", + "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", "", - "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", - "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", ] - ) + + shared_lines = [ + "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", + "Target both factors with the strategies below.", + "", + "### Reducing token usage (input tokens):", + "- Remove redundant, verbose, or repeated phrasing from the instructions.", + "- Collapse multi-sentence explanations into a single concise directive.", + "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", + "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", + "- Use bullet points instead of prose where possible — they are more token-efficient.", + "", + "### Reducing token usage (output tokens):", + "- Instruct the agent to be concise and avoid unnecessary elaboration.", + "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", + "- Set or reduce max_tokens if the current value allows longer responses than needed.", + "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", + "", + "### Reducing per-token cost via model selection:", + "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", + f" Available models: {model_choices}", + " Use your knowledge of relative model pricing to prefer lower-cost options.", + " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + "", + "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", + "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", + ] + + return "\n".join(intent_lines + shared_lines) diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 82e10d26..8f30c241 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -10,7 +10,13 @@ from ldai.tracker import TokenUsage from ldclient import Context -from ldai_optimizer.client import OptimizationClient, _compute_validation_count, _find_model_config +from ldai_optimizer.client import ( + OptimizationClient, + _MAX_STANDARD_HISTORY_LENGTH, + _compute_validation_count, + _find_model_config, + _trim_history, +) from ldai_optimizer.util import judge_passed from ldai_optimizer.dataclasses import ( AIJudgeCallConfig, @@ -538,7 +544,7 @@ async def test_duration_context_added_to_instructions_when_latency_keyword_prese assert "state the duration" in config.instructions async def test_duration_context_includes_baseline_comparison_when_history_present(self): - """When history[0] has a duration, the judge instructions include a baseline comparison.""" + """When a baseline duration is captured, the judge instructions include a baseline comparison.""" self.client._history = [ OptimizationContext( scores={}, @@ -550,6 +556,7 @@ async def test_duration_context_includes_baseline_comparison_when_history_presen duration_ms=2000.0, ) ] + self.client._baseline_duration_ms = 2000.0 judge = OptimizationJudge( threshold=0.8, acceptance_statement="Responses should have low latency.", @@ -581,6 +588,7 @@ async def test_duration_context_says_slower_when_candidate_is_slower(self): duration_ms=1000.0, ) ] + self.client._baseline_duration_ms = 1000.0 judge = OptimizationJudge( threshold=0.8, acceptance_statement="The response must be fast.", @@ -2686,7 +2694,12 @@ async def test_gt_stops_when_token_limit_exceeded_on_first_sample(self): assert len(results) == 1 async def test_gt_stops_mid_batch_when_limit_exceeded_on_second_sample(self): - """Token limit exceeded on the second of two samples stops after that sample.""" + """Token limit exceeded on the final sample of a batch — all passed — marks run successful. + + With 2 samples and the budget exhausted after sample 2 (the last one), and + both samples passing their judges, the run should be marked succeeded rather + than failed, because the token limit was hit on the final successful iteration. + """ agent_responses = [ OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), @@ -2699,13 +2712,14 @@ async def test_gt_stops_mid_batch_when_limit_exceeded_on_second_sample(self): opts = _make_gt_options( handle_agent_call=handle_agent_call, handle_judge_call=handle_judge_call, - token_limit=250, # 100 + 200 = 300 ≥ 250 → trip on second sample + token_limit=250, # 100 + 200 = 300 ≥ 250 → trip on second (final) sample max_attempts=5, ) results = await client.optimize_from_ground_truth_options("test-agent", opts) - assert client._last_run_succeeded is False + # All samples passed; token limit hit only after the final sample → success + assert client._last_run_succeeded is True assert handle_agent_call.call_count == 2 - # Both samples processed so far are in the results + # Both samples are in the results assert len(results) == 2 async def test_gt_on_failing_result_called_on_token_limit(self): @@ -2812,6 +2826,300 @@ async def test_total_token_usage_resets_between_runs(self): assert client._last_run_succeeded is True +# --------------------------------------------------------------------------- +# History truncation +# --------------------------------------------------------------------------- + + +class TestHistoryTruncation: + """Tests that _trim_history and the optimizer correctly cap _history growth.""" + + def test_trim_history_no_op_when_under_limit(self): + ctx = OptimizationContext( + scores={}, completion_response="r", current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=1, + ) + history = [ctx, ctx, ctx] + assert _trim_history(history, 5) is history + + def test_trim_history_keeps_most_recent_items(self): + """_trim_history is a pure tail slice — it keeps the most recent max_len items.""" + ctxs = [ + OptimizationContext( + scores={}, completion_response=str(i), current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=i, + ) + for i in range(10) + ] + result = _trim_history(ctxs, 4) + assert len(result) == 4 + # The four MOST RECENT items are retained; the oldest are discarded. + assert result[0] is ctxs[6] + assert result[-1] is ctxs[-1] + + def test_trim_history_max_len_1(self): + ctxs = [ + OptimizationContext( + scores={}, completion_response=str(i), current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=i, + ) + for i in range(5) + ] + result = _trim_history(ctxs, 1) + assert len(result) == 1 + assert result[0] is ctxs[-1] + + @pytest.mark.asyncio + async def test_gt_history_trimmed_to_n_after_each_attempt(self): + """After each GT attempt _history must not exceed n (sample count).""" + mock_ldai = _make_ldai_client() + client = _make_client(mock_ldai) + + # Three attempts, 2 samples each (n=2); judge fails on every attempt so + # we cycle through all max_attempts and history never exceeds 2 items. + opts = _make_gt_options( + handle_judge_call=AsyncMock(return_value=OptimizationResponse(output=JUDGE_FAIL_RESPONSE)), + max_attempts=3, + ) + + history_lengths: list[int] = [] + original_extend = list.extend + + def spy_extend(self_list, iterable): + original_extend(self_list, iterable) + + with patch.object(client, "_generate_new_variation", new_callable=AsyncMock): + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + # _history should be capped at n=2 (the number of samples) + assert len(client._history) <= 2 + + @pytest.mark.asyncio + async def test_standard_history_trimmed_to_max_standard_length(self): + """After each standard optimizer iteration _history must not exceed _MAX_STANDARD_HISTORY_LENGTH.""" + mock_ldai = _make_ldai_client() + client = _make_client(mock_ldai) + + # Enough failing attempts to push history beyond the cap. + attempts = _MAX_STANDARD_HISTORY_LENGTH + 3 + opts = _make_options( + handle_judge_call=AsyncMock(return_value=OptimizationResponse(output=JUDGE_FAIL_RESPONSE)), + max_attempts=attempts, + ) + + with patch.object(client, "_generate_new_variation", new_callable=AsyncMock): + await client.optimize_from_options("test-agent", opts) + + assert len(client._history) <= _MAX_STANDARD_HISTORY_LENGTH + + +# --------------------------------------------------------------------------- +# Accumulated token usage +# --------------------------------------------------------------------------- + + +class TestAccumulatedTokenUsage: + """Tests that total token usage is tracked across agent, judge, and variation calls.""" + + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + @pytest.mark.asyncio + async def test_variation_tokens_counted_in_total(self): + """Tokens consumed by variation generation are rolled into _total_token_usage.""" + agent_usage = TokenUsage(total=100, input=60, output=40) + variation_usage = TokenUsage(total=50, input=30, output=20) + + # Build a minimal GT run (no validation phase) so we can control exactly + # which handle_agent_call responses map to agent turns vs variation. + # GT flow for one failed attempt then success: + # Attempt 1: 2 samples → 2 agent calls → judge fails on sample 1 → all_passed=False + # Variation: 1 handle_agent_call with variation_usage + # Attempt 2: 2 samples → 2 agent calls → all pass → success + agent_responses = [ + OptimizationResponse(output="Answer 1.", usage=agent_usage), # attempt 1, sample 1 + OptimizationResponse(output="Answer 2.", usage=agent_usage), # attempt 1, sample 2 + OptimizationResponse(output=VARIATION_RESPONSE, usage=variation_usage), # variation + OptimizationResponse(output="Answer 1.", usage=agent_usage), # attempt 2, sample 1 + OptimizationResponse(output="Answer 2.", usage=agent_usage), # attempt 2, sample 2 + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + judge_responses = [ + OptimizationResponse(output=JUDGE_FAIL_RESPONSE), # attempt 1, sample 1 — fail + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 1, sample 2 — pass + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 2, sample 1 — pass + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 2, sample 2 — pass + ] + handle_judge_call = AsyncMock(side_effect=judge_responses) + + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + max_attempts=3, + ) + await client.optimize_from_ground_truth_options("test-agent", opts) + + # _total_token_usage must include tokens from all 4 agent calls + 1 variation call + expected_min = agent_usage.total * 4 + variation_usage.total + assert client._total_token_usage >= expected_min + + @pytest.mark.asyncio + async def test_accumulated_token_usage_stamped_on_context(self): + """accumulated_token_usage on the returned context equals _total_token_usage.""" + agent_usage = TokenUsage(total=200, input=120, output=80) + handle_agent_call = AsyncMock( + return_value=OptimizationResponse(output="The answer is 4.", usage=agent_usage) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + + client = _make_client(self.mock_ldai) + opts = _make_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + ) + results = await client.optimize_from_options("test-agent", opts) + + assert results is not None + last_ctx = results if isinstance(results, OptimizationContext) else None + # Check the internal total includes at least the agent tokens + assert client._total_token_usage >= agent_usage.total + + @pytest.mark.asyncio + async def test_gt_accumulated_token_usage_on_final_success(self): + """On a passing GT run the last result's accumulated_token_usage == _total_token_usage.""" + agent_usage = TokenUsage(total=100, input=60, output=40) + handle_agent_call = AsyncMock( + return_value=OptimizationResponse(output="correct", usage=agent_usage) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert isinstance(results, list) and len(results) > 0 + final_ctx = results[-1] + assert final_ctx.accumulated_token_usage is not None + assert final_ctx.accumulated_token_usage == client._total_token_usage + + +# --------------------------------------------------------------------------- +# GT token budget exhausted on final successful iteration +# --------------------------------------------------------------------------- + + +class TestGTTokenBudgetOnFinalIteration: + """Token budget exhausted on the very last GT sample should mark the run successful + when all samples passed, and failed when mid-batch or a sample failed.""" + + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + @pytest.mark.asyncio + async def test_token_limit_on_last_sample_all_passed_marks_success(self): + """Exhausting the budget on the final sample of a batch where all passed → success.""" + agent_responses = [ + OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), + OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, # 100+200=300 ≥ 250 → budget hit on final (2nd) sample + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is True + assert len(results) == 2 + + @pytest.mark.asyncio + async def test_token_limit_on_last_sample_one_failed_marks_failure(self): + """Budget hit on the final sample but a prior sample failed → failure.""" + agent_responses = [ + # Sample 1 fails judge + OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), + # Sample 2 passes judge but pushes over limit + OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + judge_responses = [ + OptimizationResponse(output=JUDGE_FAIL_RESPONSE), # sample 1 fails + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # sample 2 passes + ] + handle_judge_call = AsyncMock(side_effect=judge_responses) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is False + + @pytest.mark.asyncio + async def test_token_limit_mid_batch_marks_failure(self): + """Budget hit mid-batch (not on the final sample) → always failure.""" + handle_agent_call = AsyncMock( + return_value=OptimizationResponse( + output="Answer.", usage=TokenUsage(total=600, input=400, output=200) + ) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=500, # 600 > 500 → trip on first of two samples + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is False + assert len(results) == 1 + + @pytest.mark.asyncio + async def test_on_passing_result_called_on_budget_exhaustion_success(self): + """on_passing_result fires when token budget exhausted but all GT samples passed.""" + on_passing = MagicMock() + agent_responses = [ + OptimizationResponse(output="A1.", usage=TokenUsage(total=100, input=60, output=40)), + OptimizationResponse(output="A2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, + max_attempts=5, + on_passing_result=on_passing, + ) + await client.optimize_from_ground_truth_options("test-agent", opts) + + on_passing.assert_called_once() + + # --------------------------------------------------------------------------- # optimize_from_config # --------------------------------------------------------------------------- @@ -3518,43 +3826,43 @@ def test_returns_true_when_baseline_duration_is_none(self): assert self.client._evaluate_duration(self._ctx(5000, iteration=2)) is True def test_returns_true_when_candidate_duration_is_none(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(None, iteration=2)) is True def test_passes_when_candidate_is_more_than_20_percent_faster(self): # baseline=2000ms, threshold=1600ms, candidate=1500ms → 1500 < 1600 → pass - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1500, iteration=2)) is True def test_fails_when_candidate_is_exactly_at_threshold(self): # baseline=2000ms, threshold=1600ms, candidate=1600ms → not strictly less → fail - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1600, iteration=2)) is False def test_fails_when_improvement_is_less_than_20_percent(self): # baseline=2000ms, threshold=1600ms, candidate=1800ms → 1800 >= 1600 → fail - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1800, iteration=2)) is False def test_fails_when_candidate_matches_baseline(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(2000, iteration=2)) is False def test_fails_when_candidate_is_slower_than_baseline(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(2500, iteration=2)) is False - def test_uses_history_index_zero_as_baseline_not_last(self): - # history[0] is 2000ms (baseline), history[-1] is 500ms (fast, but not the baseline) - first = self._ctx(2000, iteration=1) - later = self._ctx(500, iteration=2) - self.client._history = [first, later] - # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses history[0], not history[-1]) + def test_uses_explicit_baseline_not_most_recent_history(self): + # _baseline_duration_ms is 2000ms even though the most recent history item has 500ms. + # The explicit baseline is what must be used, not any history item. + self.client._baseline_duration_ms = 2000.0 + self.client._history = [self._ctx(2000, iteration=1), self._ctx(500, iteration=2)] + # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses explicit baseline, not history[-1]) assert self.client._evaluate_duration(self._ctx(1500, iteration=3)) is True def test_pass_boundary_just_below_threshold(self): # baseline=1000ms, threshold=800ms, candidate=799ms → pass - self.client._history = [self._ctx(1000, iteration=1)] + self.client._baseline_duration_ms = 1000.0 assert self.client._evaluate_duration(self._ctx(799, iteration=2)) is True @@ -3665,8 +3973,8 @@ async def test_no_duration_gate_when_acceptance_criteria_has_no_latency_keywords with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - # Manually seed history so _evaluate_duration would fire if incorrectly triggered - client._history = [self._ctx_with(duration_ms=2000, iteration=0)] + # Manually seed baseline so _evaluate_duration would fire if incorrectly triggered + client._baseline_duration_ms = 2000.0 result = await client.optimize_from_options("test-agent", opts) assert result is not None @@ -4831,6 +5139,7 @@ def _ctx(self, cost: float, iteration: int = 2) -> OptimizationContext: def _seed_history(self, baseline_cost: float): self.client._history = [self._ctx(baseline_cost, iteration=1)] + self.client._baseline_cost_usd = baseline_cost def test_passes_when_cost_improved_beyond_tolerance(self): self._seed_history(0.010) @@ -4838,13 +5147,14 @@ def test_passes_when_cost_improved_beyond_tolerance(self): def test_fails_when_cost_not_improved_enough(self): self._seed_history(0.010) - assert self.client._evaluate_cost(self._ctx(0.009)) is False + assert self.client._evaluate_cost(self._ctx(0.0095)) is False def test_passes_at_exact_tolerance_boundary(self): self._seed_history(0.010) - # 0.010 * 0.80 = 0.008; must be strictly less than 0.008 - assert self.client._evaluate_cost(self._ctx(0.0079)) is True - assert self.client._evaluate_cost(self._ctx(0.008)) is False + # 0.010 * 0.90 ≈ 0.009; must be strictly less than the threshold + assert self.client._evaluate_cost(self._ctx(0.0089)) is True + # 0.0091 is above the threshold → fail + assert self.client._evaluate_cost(self._ctx(0.0091)) is False def test_skips_gracefully_when_history_empty(self): self.client._history = [] @@ -4866,6 +5176,233 @@ def test_skips_gracefully_when_units_differ_across_model_switch(self): assert self.client._evaluate_cost(self._ctx(None)) is True +# --------------------------------------------------------------------------- +# _apply_duration_gate +# --------------------------------------------------------------------------- + + +class TestApplyDurationGate: + """Unit tests for the _apply_duration_gate wrapper method.""" + + def _make_judges_with_latency(self): + return { + "latency": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be faster and reduce latency.", + ) + } + + def _make_judges_no_latency(self): + return { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + + def _ctx(self, duration_ms=None, iteration=2): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + duration_ms=duration_ms, + ) + + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options(judges=self._make_judges_with_latency()) + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._baseline_duration_ms = 2000.0 + + def test_no_entry_added_when_gate_not_active(self): + self.client._options = _make_options(judges=self._make_judges_no_latency()) + ctx = self._ctx(1000) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is True + assert "_latency_gate" not in updated.scores + + def test_no_entry_added_when_already_failed(self): + ctx = self._ctx(1000) + passed, updated = self.client._apply_duration_gate(False, ctx) + assert passed is False + assert "_latency_gate" not in updated.scores + + def test_gate_pass_adds_score_1(self): + # baseline=2000ms, threshold=1600ms, candidate=1500ms → pass + ctx = self._ctx(1500) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is True + assert "_latency_gate" in updated.scores + assert updated.scores["_latency_gate"].score == 1.0 + assert "passed" in updated.scores["_latency_gate"].rationale.lower() + assert "1500" in updated.scores["_latency_gate"].rationale + assert "2000" in updated.scores["_latency_gate"].rationale + + def test_gate_fail_adds_score_0(self): + # baseline=2000ms, threshold=1600ms, candidate=1800ms → fail + ctx = self._ctx(1800) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is False + assert "_latency_gate" in updated.scores + assert updated.scores["_latency_gate"].score == 0.0 + assert "failed" in updated.scores["_latency_gate"].rationale.lower() + assert "1800" in updated.scores["_latency_gate"].rationale + + def test_gate_pass_no_baseline_gives_fallback_rationale(self): + self.client._baseline_duration_ms = None + ctx = self._ctx(None) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert "_latency_gate" in updated.scores + assert "no baseline" in updated.scores["_latency_gate"].rationale.lower() + + def test_existing_scores_are_preserved(self): + ctx = OptimizationContext( + scores={"accuracy": JudgeResult(score=1.0, rationale="ok")}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + duration_ms=1500, + ) + _, updated = self.client._apply_duration_gate(True, ctx) + assert "accuracy" in updated.scores + assert "_latency_gate" in updated.scores + + def test_no_threshold_field_on_judge_result(self): + ctx = self._ctx(1500) + _, updated = self.client._apply_duration_gate(True, ctx) + gate_result = updated.scores["_latency_gate"] + assert not hasattr(gate_result, "threshold") or gate_result.threshold is None # type: ignore[union-attr] + + +# --------------------------------------------------------------------------- +# _apply_cost_gate +# --------------------------------------------------------------------------- + + +class TestApplyCostGate: + """Unit tests for the _apply_cost_gate wrapper method.""" + + def _make_judges_with_cost(self): + return { + "cost": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be cheaper and reduce cost.", + ) + } + + def _make_judges_no_cost(self): + return { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + + def _ctx(self, cost=None, iteration=2): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options(judges=self._make_judges_with_cost()) + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._baseline_cost_usd = 0.010 + + def test_no_entry_added_when_gate_not_active(self): + self.client._options = _make_options(judges=self._make_judges_no_cost()) + ctx = self._ctx(0.005) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is True + assert "_cost_gate" not in updated.scores + + def test_no_entry_added_when_already_failed(self): + ctx = self._ctx(0.005) + passed, updated = self.client._apply_cost_gate(False, ctx) + assert passed is False + assert "_cost_gate" not in updated.scores + + def test_gate_pass_adds_score_1(self): + # baseline=0.010, threshold=0.009, candidate=0.007 → pass + ctx = self._ctx(0.007) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is True + assert "_cost_gate" in updated.scores + assert updated.scores["_cost_gate"].score == 1.0 + assert "passed" in updated.scores["_cost_gate"].rationale.lower() + + def test_gate_fail_adds_score_0(self): + # baseline=0.010, threshold=0.009, candidate=0.0095 → fail + ctx = self._ctx(0.0095) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is False + assert "_cost_gate" in updated.scores + assert updated.scores["_cost_gate"].score == 0.0 + assert "failed" in updated.scores["_cost_gate"].rationale.lower() + + def test_gate_pass_no_baseline_gives_fallback_rationale(self): + self.client._baseline_cost_usd = None + ctx = self._ctx(None) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert "_cost_gate" in updated.scores + assert "no baseline" in updated.scores["_cost_gate"].rationale.lower() + + def test_existing_scores_are_preserved(self): + ctx = OptimizationContext( + scores={"accuracy": JudgeResult(score=1.0, rationale="ok")}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + estimated_cost_usd=0.007, + ) + _, updated = self.client._apply_cost_gate(True, ctx) + assert "accuracy" in updated.scores + assert "_cost_gate" in updated.scores + + def test_both_gates_active_compose_cleanly(self): + """Duration + cost gate can both fire on the same context.""" + self.client._options = _make_options( + judges={ + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be faster, reduce latency, and cheaper cost.", + ) + } + ) + self.client._baseline_duration_ms = 2000.0 + self.client._baseline_cost_usd = 0.010 + ctx = OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + duration_ms=1500, + estimated_cost_usd=0.007, + ) + passed, ctx = self.client._apply_duration_gate(True, ctx) + passed, ctx = self.client._apply_cost_gate(passed, ctx) + assert passed is True + assert "_latency_gate" in ctx.scores + assert "_cost_gate" in ctx.scores + assert ctx.scores["_latency_gate"].score == 1.0 + assert ctx.scores["_cost_gate"].score == 1.0 + + # --------------------------------------------------------------------------- # variation_prompt_cost_optimization # --------------------------------------------------------------------------- @@ -4888,6 +5425,148 @@ def test_mentions_token_reduction(self): result = variation_prompt_cost_optimization(["gpt-4o"]) assert "token" in result.lower() + # quality_already_passing=False (default) — standard framing + def test_default_framing_says_improve_quality_too(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=False) + assert "In addition to improving quality" in result + + def test_default_framing_does_not_mention_quality_passing(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=False) + assert "currently passing" not in result + + # quality_already_passing=True — preserve-behavior framing + def test_passing_framing_says_criteria_already_passing(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "currently passing" in result + + def test_passing_framing_says_do_not_change_behavior(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "preserve" in result.lower() or "do not" in result.lower() or "NOT" in result + + def test_passing_framing_still_includes_token_guidance(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "token" in result.lower() + + def test_passing_framing_still_includes_model_choices(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"], quality_already_passing=True) + assert "gpt-4o" in result + + def test_passing_framing_still_warns_quality_is_primary(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "primary objective" in result.lower() or "do not sacrifice" in result.lower() + + +# --------------------------------------------------------------------------- +# _all_judges_passing +# --------------------------------------------------------------------------- + + +class TestAllJudgesPassing: + def _ctx_with_scores(self, scores, iteration=1): + return OptimizationContext( + scores=scores, + completion_response="ok", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + ) + + def setup_method(self): + self.client = _make_client() + self.client._initialize_class_members_from_config(_make_agent_config()) + + def test_returns_false_when_history_empty(self): + self.client._history = [] + self.client._options = _make_options() + assert self.client._all_judges_passing() is False + + def test_returns_false_when_no_judges(self): + self.client._options = _make_options(judges={}) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="ok")})] + assert self.client._all_judges_passing() is False + + def test_returns_false_when_judge_score_missing(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({})] + assert self.client._all_judges_passing() is False + + def test_returns_true_when_all_judges_pass(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")})] + assert self.client._all_judges_passing() is True + + def test_returns_false_when_one_judge_below_threshold(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=0.7, rationale="bad")})] + assert self.client._all_judges_passing() is False + + def test_returns_false_when_one_of_multiple_judges_fails(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + "tone": OptimizationJudge(threshold=0.9, acceptance_statement="friendly tone"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "tone": JudgeResult(score=0.5, rationale="failed"), + })] + assert self.client._all_judges_passing() is False + + def test_returns_true_when_all_of_multiple_judges_pass(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + "tone": OptimizationJudge(threshold=0.9, acceptance_statement="friendly tone"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "tone": JudgeResult(score=0.95, rationale="ok"), + })] + assert self.client._all_judges_passing() is True + + def test_gate_scores_do_not_affect_result(self): + """Synthetic _latency_gate / _cost_gate keys must not count as judge failures.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "_latency_gate": JudgeResult(score=0.0, rationale="gate failed"), + "_cost_gate": JudgeResult(score=0.0, rationale="gate failed"), + })] + # Gate failures should not prevent _all_judges_passing from returning True + assert self.client._all_judges_passing() is True + + def test_uses_most_recent_history_entry(self): + """Only the last history entry is inspected.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.5, rationale="early fail")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="later pass")}, iteration=2), + ] + assert self.client._all_judges_passing() is True + + def test_inverted_judge_passes_when_score_below_threshold(self): + self.client._options = _make_options(judges={ + "toxicity": OptimizationJudge(threshold=0.2, acceptance_statement="low toxicity", is_inverted=True), + }) + self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.1, rationale="clean")})] + assert self.client._all_judges_passing() is True + + def test_inverted_judge_fails_when_score_above_threshold(self): + self.client._options = _make_options(judges={ + "toxicity": OptimizationJudge(threshold=0.2, acceptance_statement="low toxicity", is_inverted=True), + }) + self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.5, rationale="toxic")})] + assert self.client._all_judges_passing() is False + class TestBuildNewVariationPromptCost: def _make_history(self) -> list: @@ -5064,6 +5743,7 @@ async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): estimated_cost_usd=500.0, ) self.client._history = [baseline_ctx] + self.client._baseline_cost_usd = 500.0 self.client._options = _make_options(handle_judge_call=_capture_judge_call) await self.client._evaluate_acceptance_judge( judge_key="cost-judge", From 9bedf9ef91067f3122b904cf5cc78e87b8929151 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 13 May 2026 12:05:27 -0800 Subject: [PATCH 4/7] fix: don't only evaluate final input in GT results --- .../optimization/src/ldai_optimizer/client.py | 53 +++++++++++++------ .../src/ldai_optimizer/dataclasses.py | 3 ++ packages/optimization/tests/test_client.py | 12 +++-- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index d08bc0f1..9c647e01 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -1155,6 +1155,12 @@ async def _run_ground_truth_optimization( sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context) sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context) + # Flush gate scores to the API for this sample. Without this, + # the next sample's "generating" event closes out this record + # with a status-only PATCH before gate scores are sent, so only + # the last sample would ever show latency/cost gate entries. + self._safe_status_update("evaluating", optimize_context, linear_iter) + if not sample_passed: logger.info( "[GT Attempt %d] -> Sample %d/%d FAILED", @@ -2110,12 +2116,16 @@ def _apply_duration_gate( When the gate is active (any acceptance statement implies latency optimization), evaluates whether the candidate's duration improved by at least _DURATION_TOLERANCE vs the baseline. A synthetic ``_latency_gate`` entry is - added to scores with score=1.0 on pass or score=0.0 on fail so the outcome - is visible in the API result and UI. + always added to scores with score=1.0 on pass or score=0.0 on fail so the + outcome is visible in the API result and UI for every iteration. + + The gate score is recorded even when ``passed_so_far`` is False (quality + judges already failed) so that latency telemetry is visible on all + iterations, not just passing ones. In that case it is informational only + and cannot block the iteration further. - The gate is skipped (no score entry added) when: - - No acceptance statement implies latency optimization. - - ``passed_so_far`` is already False (a prior check failed the sample). + The gate is skipped entirely (no score entry added) only when no acceptance + statement implies latency optimization. :param passed_so_far: Whether all prior checks for this sample passed. :param ctx: Current optimization context. @@ -2123,8 +2133,6 @@ def _apply_duration_gate( """ if not _acceptance_criteria_implies_duration_optimization(self._options.judges): return passed_so_far, ctx - if not passed_so_far: - return passed_so_far, ctx passed = self._evaluate_duration(ctx) if passed: if self._baseline_duration_ms is not None and ctx.duration_ms is not None: @@ -2149,9 +2157,13 @@ def _apply_duration_gate( score = 0.0 ctx = dataclasses.replace( ctx, - scores={**ctx.scores, "_latency_gate": JudgeResult(score=score, rationale=rationale)}, + scores={**ctx.scores, "_latency_gate": JudgeResult( + score=score, + rationale=rationale, + duration_ms=ctx.duration_ms, + )}, ) - return passed, ctx + return passed_so_far and passed, ctx def _apply_cost_gate( self, passed_so_far: bool, ctx: OptimizationContext @@ -2160,12 +2172,16 @@ def _apply_cost_gate( When the gate is active (any acceptance statement implies cost optimization), evaluates whether the candidate's estimated cost improved by at least - _COST_TOLERANCE vs the baseline. A synthetic ``_cost_gate`` entry is + _COST_TOLERANCE vs the baseline. A synthetic ``_cost_gate`` entry is always added to scores with score=1.0 on pass or score=0.0 on fail. - The gate is skipped (no score entry added) when: - - No acceptance statement implies cost optimization. - - ``passed_so_far`` is already False (a prior check failed the sample). + The gate score is recorded even when ``passed_so_far`` is False (quality + judges already failed) so that cost telemetry is visible on all iterations, + not just passing ones. In that case it is informational only and cannot + block the iteration further. + + The gate is skipped entirely (no score entry added) only when no acceptance + statement implies cost optimization. :param passed_so_far: Whether all prior checks for this sample passed. :param ctx: Current optimization context. @@ -2173,8 +2189,6 @@ def _apply_cost_gate( """ if not _acceptance_criteria_implies_cost_optimization(self._options.judges): return passed_so_far, ctx - if not passed_so_far: - return passed_so_far, ctx passed = self._evaluate_cost(ctx) if passed: if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: @@ -2199,9 +2213,14 @@ def _apply_cost_gate( score = 0.0 ctx = dataclasses.replace( ctx, - scores={**ctx.scores, "_cost_gate": JudgeResult(score=score, rationale=rationale)}, + scores={**ctx.scores, "_cost_gate": JudgeResult( + score=score, + rationale=rationale, + duration_ms=ctx.duration_ms, + estimated_cost_usd=ctx.estimated_cost_usd, + )}, ) - return passed, ctx + return passed_so_far and passed, ctx def _handle_success( self, optimize_context: OptimizationContext, iteration: int diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index 2a9f4542..6e308f67 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -43,6 +43,7 @@ class JudgeResult: rationale: Optional[str] = None duration_ms: Optional[float] = None usage: Optional[TokenUsage] = None + estimated_cost_usd: Optional[float] = None def to_json(self) -> Dict[str, Any]: """ @@ -61,6 +62,8 @@ def to_json(self) -> Dict[str, Any]: "input": self.usage.input, "output": self.usage.output, } + if self.estimated_cost_usd is not None: + result["estimated_cost_usd"] = self.estimated_cost_usd return result diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 8f30c241..bdf9d19c 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -5224,11 +5224,13 @@ def test_no_entry_added_when_gate_not_active(self): assert passed is True assert "_latency_gate" not in updated.scores - def test_no_entry_added_when_already_failed(self): + def test_gate_recorded_even_when_already_failed(self): + # Gate score is always written for telemetry; it cannot block an + # iteration that was already failing (passed_so_far=False). ctx = self._ctx(1000) passed, updated = self.client._apply_duration_gate(False, ctx) assert passed is False - assert "_latency_gate" not in updated.scores + assert "_latency_gate" in updated.scores def test_gate_pass_adds_score_1(self): # baseline=2000ms, threshold=1600ms, candidate=1500ms → pass @@ -5327,11 +5329,13 @@ def test_no_entry_added_when_gate_not_active(self): assert passed is True assert "_cost_gate" not in updated.scores - def test_no_entry_added_when_already_failed(self): + def test_gate_recorded_even_when_already_failed(self): + # Gate score is always written for telemetry; it cannot block an + # iteration that was already failing (passed_so_far=False). ctx = self._ctx(0.005) passed, updated = self.client._apply_cost_gate(False, ctx) assert passed is False - assert "_cost_gate" not in updated.scores + assert "_cost_gate" in updated.scores def test_gate_pass_adds_score_1(self): # baseline=0.010, threshold=0.009, candidate=0.007 → pass From 53f455f3fb1073dcc1caeead4ccc7b51153eabb4 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 13 May 2026 12:12:20 -0800 Subject: [PATCH 5/7] fix: don't only evaluate final input in GT results --- .../optimization/src/ldai_optimizer/client.py | 44 +++++++++++++++-- packages/optimization/tests/test_client.py | 47 +++++++++++++++++++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 9c647e01..a9a788e8 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -224,17 +224,51 @@ def _initialize_class_members_from_config( self._baseline_cost_usd: Optional[float] = None def _record_baseline(self, ctx: OptimizationContext) -> None: - """Capture duration/cost baseline from the first iteration appended to history. + """Capture duration/cost baseline from a single context. - Called once per run (subsequent calls are no-ops once both values are set). - Storing these explicitly lets ``_trim_history`` use a simple tail slice without - needing to preserve ``history[0]`` as an anchor. + Used by the standard (non-GT) optimization loop where each iteration + produces one result. Called once per run (subsequent calls are no-ops + once both values are set). Storing these explicitly lets + ``_trim_history`` use a simple tail slice without needing to preserve + ``history[0]`` as an anchor. """ if self._baseline_duration_ms is None and ctx.duration_ms is not None: self._baseline_duration_ms = ctx.duration_ms if self._baseline_cost_usd is None and ctx.estimated_cost_usd is not None: self._baseline_cost_usd = ctx.estimated_cost_usd + def _record_baseline_from_batch(self, attempt_results: List[OptimizationContext]) -> None: + """Capture duration/cost baseline as the average across a GT batch. + + Used by the GT optimization loop. The first attempt's N samples form + the baseline; averaging them gives a more stable reference than a + single sample and ensures comparisons in subsequent attempts reflect + the typical performance of the original configuration rather than an + outlier measurement. + + Called once per run (subsequent calls are no-ops once both values are + set). + + :param attempt_results: All completed sample contexts from the first + GT attempt. + """ + if not attempt_results: + return + if self._baseline_duration_ms is None: + durations = [ + ctx.duration_ms for ctx in attempt_results if ctx.duration_ms is not None + ] + if durations: + self._baseline_duration_ms = sum(durations) / len(durations) + if self._baseline_cost_usd is None: + costs = [ + ctx.estimated_cost_usd + for ctx in attempt_results + if ctx.estimated_cost_usd is not None + ] + if costs: + self._baseline_cost_usd = sum(costs) / len(costs) + def _build_agent_config_for_context( self, ctx: OptimizationContext, skip_interpolation: bool = False ) -> AIAgentConfig: @@ -1272,7 +1306,7 @@ async def _run_ground_truth_optimization( # from all of the previous samples, then trim to one full attempt's worth so # judge prompts don't grow unboundedly across many failed attempts. if attempt_results: - self._record_baseline(attempt_results[0]) + self._record_baseline_from_batch(attempt_results) self._history.extend(attempt_results) self._history = _trim_history(self._history, n) diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index bdf9d19c..ea7ecc3d 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -1,5 +1,6 @@ """Tests for OptimizationClient.""" +import dataclasses import json from typing import Any, Dict from unittest.mock import AsyncMock, MagicMock, patch @@ -5176,6 +5177,52 @@ def test_skips_gracefully_when_units_differ_across_model_switch(self): assert self.client._evaluate_cost(self._ctx(None)) is True +# --------------------------------------------------------------------------- +# _record_baseline_from_batch +# --------------------------------------------------------------------------- + + +class TestRecordBaselineFromBatch: + def setup_method(self): + self.client = _make_client() + self.client._initialize_class_members_from_config(_make_agent_config()) + + def _ctx(self, duration_ms=None, cost=None): + ctx = self.client._create_optimization_context(iteration=1, variables={}) + return dataclasses.replace(ctx, duration_ms=duration_ms, estimated_cost_usd=cost) + + def test_averages_duration_across_batch(self): + results = [self._ctx(duration_ms=1000), self._ctx(duration_ms=2000), self._ctx(duration_ms=3000)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms == 2000.0 + + def test_averages_cost_across_batch(self): + results = [self._ctx(cost=0.01), self._ctx(cost=0.02), self._ctx(cost=0.03)] + self.client._record_baseline_from_batch(results) + assert abs(self.client._baseline_cost_usd - 0.02) < 1e-9 + + def test_skips_none_values_in_average(self): + results = [self._ctx(duration_ms=1000), self._ctx(duration_ms=None), self._ctx(duration_ms=3000)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms == 2000.0 + + def test_noop_when_already_set(self): + self.client._baseline_duration_ms = 999.0 + results = [self._ctx(duration_ms=1000), self._ctx(duration_ms=2000)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms == 999.0 + + def test_noop_on_empty_list(self): + self.client._record_baseline_from_batch([]) + assert self.client._baseline_duration_ms is None + assert self.client._baseline_cost_usd is None + + def test_noop_when_all_values_none(self): + results = [self._ctx(duration_ms=None), self._ctx(duration_ms=None)] + self.client._record_baseline_from_batch(results) + assert self.client._baseline_duration_ms is None + + # --------------------------------------------------------------------------- # _apply_duration_gate # --------------------------------------------------------------------------- From 66bc1f0db79f434614359b4e6724bd73e7d41216 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 13 May 2026 12:45:47 -0800 Subject: [PATCH 6/7] fix: only strip known provider prefixes --- .../optimization/src/ldai_optimizer/client.py | 32 ++++++++++++++--- packages/optimization/tests/test_client.py | 35 +++++++++++++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index a9a788e8..7c70d8d3 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -91,18 +91,40 @@ def _find_model_config( return global_match if global_match is not None else matching[0] +# Known provider prefixes used by the LD API (e.g. "Anthropic.claude-3"). +# Only strip the first segment when it is one of these known values so that +# model IDs whose first dotted segment is NOT a provider — such as Bedrock +# cross-region inference IDs like "us.amazon.nova-pro-v1:0" — are left intact. +_KNOWN_PROVIDER_PREFIXES: frozenset = frozenset({ + "Anthropic", + "Bedrock", + "Cohere", + "Google", + "Groq", + "Meta", + "Mistral", + "OpenAI", + "Perplexity", +}) + + def _strip_provider_prefix(model: str) -> str: """Strip the provider prefix from a model identifier returned by the LD API. API model keys are formatted as "Provider.model-name" (e.g. "OpenAI.gpt-5", - "Anthropic.claude-opus-4.6"). Only the part after the first period is needed - by the underlying LLM clients. If no period is present the string is returned - unchanged. + "Anthropic.claude-opus-4.6"). Only the segment before the first period is + stripped, and only when it is a recognised provider name. This prevents + incorrectly stripping region prefixes from Bedrock cross-region inference + IDs such as "us.amazon.nova-pro-v1:0". :param model: Raw model string from the API. - :return: Model name with provider prefix removed. + :return: Model name with provider prefix removed, or the original string if + the first segment is not a known provider. """ - return model.split(".", 1)[-1] + prefix, _, rest = model.partition(".") + if prefix in _KNOWN_PROVIDER_PREFIXES and rest: + return rest + return model def _compute_validation_count(pool_size: int) -> int: diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index ea7ecc3d..937a24a2 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -16,6 +16,7 @@ _MAX_STANDARD_HISTORY_LENGTH, _compute_validation_count, _find_model_config, + _strip_provider_prefix, _trim_history, ) from ldai_optimizer.util import judge_passed @@ -129,6 +130,40 @@ def _make_client(ldai: MagicMock | None = None) -> OptimizationClient: # --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- +# _strip_provider_prefix +# --------------------------------------------------------------------------- + + +class TestStripProviderPrefix: + def test_strips_known_anthropic_prefix(self): + assert _strip_provider_prefix("Anthropic.claude-opus-4-5") == "claude-opus-4-5" + + def test_strips_known_openai_prefix(self): + assert _strip_provider_prefix("OpenAI.gpt-4o") == "gpt-4o" + + def test_strips_known_bedrock_prefix(self): + # "Bedrock.us.amazon.nova-pro-v1:0" → region prefix is preserved + assert _strip_provider_prefix("Bedrock.us.amazon.nova-pro-v1:0") == "us.amazon.nova-pro-v1:0" + + def test_does_not_strip_bedrock_region_prefix(self): + # Raw Bedrock cross-region ID has no provider prefix — must be unchanged + assert _strip_provider_prefix("us.amazon.nova-pro-v1:0") == "us.amazon.nova-pro-v1:0" + + def test_does_not_strip_eu_region_prefix(self): + assert _strip_provider_prefix("eu.anthropic.claude-3-5-sonnet-20241022-v2:0") == "eu.anthropic.claude-3-5-sonnet-20241022-v2:0" + + def test_no_period_returns_unchanged(self): + assert _strip_provider_prefix("gpt-4o") == "gpt-4o" + + def test_empty_string_returns_unchanged(self): + assert _strip_provider_prefix("") == "" + + def test_preserves_dots_in_model_name_after_stripping(self): + # Multiple dots after the provider prefix are preserved + assert _strip_provider_prefix("Anthropic.claude-3.5-sonnet") == "claude-3.5-sonnet" + + # --------------------------------------------------------------------------- # _find_model_config # --------------------------------------------------------------------------- From f2f0894625d763a25006d95bea9ea45780e623a4 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Thu, 14 May 2026 10:40:55 -0800 Subject: [PATCH 7/7] fix: address cursor feedback --- .../optimization/src/ldai_optimizer/client.py | 73 ++++++++++-------- .../optimization/src/ldai_optimizer/util.py | 8 +- packages/optimization/tests/test_client.py | 77 ++++++++++++++++++- 3 files changed, 122 insertions(+), 36 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 7c70d8d3..b924edd7 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -212,6 +212,7 @@ def __init__(self, ldClient: LDAIClient) -> None: self._initial_tool_keys: List[str] = [] self._total_token_usage: int = 0 self._model_configs: List[Dict[str, Any]] = [] + self._last_batch_size: int = 1 if os.environ.get("LAUNCHDARKLY_API_KEY"): self._has_api_key = True @@ -1123,6 +1124,7 @@ async def _run_ground_truth_optimization( self._last_succeeded_context = None self._last_optimization_result_id = None self._total_token_usage = 0 + self._last_batch_size = 1 self._initialize_class_members_from_config(agent_config) # Seed from the first model choice on the first iteration @@ -1331,6 +1333,9 @@ async def _run_ground_truth_optimization( self._record_baseline_from_batch(attempt_results) self._history.extend(attempt_results) self._history = _trim_history(self._history, n) + # Track batch size so _all_judges_passing checks every sample in this + # attempt, not just the last one. + self._last_batch_size = n logger.info( "[GT Attempt %d] -> %d/%d samples failed — generating new variation", @@ -2140,28 +2145,35 @@ def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: return passed def _all_judges_passing(self) -> bool: - """Return True if every user-configured judge passed in the most recent history entry. + """Return True if every user-configured judge passed in every sample of the most recent batch. - Inspects the last context in ``_history`` and checks each score key that - corresponds to a judge defined in ``_options.judges`` (skipping synthetic gate - entries whose keys begin with ``_``). Returns False when history is empty or any - judge score does not meet its threshold. + In ground-truth mode the last ``_last_batch_size`` entries in ``_history`` + correspond to the samples from the latest attempt. All of them must pass; + checking only the last entry would incorrectly return True when a middle sample + failed but the final sample passed. + + In single-sample (non-GT) mode ``_last_batch_size`` is 1, so only the most + recent entry is inspected (original behaviour). + + Synthetic gate entries (keys beginning with ``_``) are skipped. + Returns False when history is empty or any judge score does not meet its threshold. This is used to decide whether variation generation should preserve the current - behavior and only optimise for cost, rather than trying to improve quality further. + behaviour and only optimise for cost, rather than trying to improve quality further. """ if not self._history or not self._options.judges: return False - recent = self._history[-1] - if not recent.scores: - return False - for key, judge in self._options.judges.items(): - result = recent.scores.get(key) - if result is None: - return False - threshold = judge.threshold if judge.threshold is not None else 1.0 - if not judge_passed(result.score, threshold, judge.is_inverted): + batch = self._history[-self._last_batch_size:] + for ctx in batch: + if not ctx.scores: return False + for key, judge in self._options.judges.items(): + result = ctx.scores.get(key) + if result is None: + return False + threshold = judge.threshold if judge.threshold is not None else 1.0 + if not judge_passed(result.score, threshold, judge.is_inverted): + return False return True def _apply_duration_gate( @@ -2201,15 +2213,12 @@ def _apply_duration_gate( rationale = "Latency gate passed (no baseline)." score = 1.0 else: - if self._baseline_duration_ms is not None and ctx.duration_ms is not None: - rationale = ( - f"Latency improvement gate failed: {ctx.duration_ms:.0f}ms did not improve " - f"by {int((1 - _DURATION_TOLERANCE) * 100)}% vs baseline " - f"{self._baseline_duration_ms:.0f}ms " - f"(required < {self._baseline_duration_ms * _DURATION_TOLERANCE:.0f}ms)." - ) - else: - rationale = "Latency gate failed (no baseline data)." + rationale = ( + f"Latency improvement gate failed: {ctx.duration_ms:.0f}ms did not improve " + f"by {int((1 - _DURATION_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_duration_ms:.0f}ms " + f"(required < {self._baseline_duration_ms * _DURATION_TOLERANCE:.0f}ms)." + ) score = 0.0 ctx = dataclasses.replace( ctx, @@ -2257,15 +2266,12 @@ def _apply_cost_gate( rationale = "Cost gate passed (no baseline)." score = 1.0 else: - if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: - rationale = ( - f"Cost improvement gate failed: {ctx.estimated_cost_usd:.6f} did not improve " - f"by {int((1 - _COST_TOLERANCE) * 100)}% vs baseline " - f"{self._baseline_cost_usd:.6f} " - f"(required < {self._baseline_cost_usd * _COST_TOLERANCE:.6f})." - ) - else: - rationale = "Cost gate failed (no baseline data)." + rationale = ( + f"Cost improvement gate failed: {ctx.estimated_cost_usd:.6f} did not improve " + f"by {int((1 - _COST_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_cost_usd:.6f} " + f"(required < {self._baseline_cost_usd * _COST_TOLERANCE:.6f})." + ) score = 0.0 ctx = dataclasses.replace( ctx, @@ -2600,6 +2606,7 @@ async def _run_optimization( self._last_succeeded_context = None self._last_optimization_result_id = None self._total_token_usage = 0 + self._last_batch_size = 1 self._initialize_class_members_from_config(agent_config) # If the LD flag doesn't carry a model name, seed from the first model choice diff --git a/packages/optimization/src/ldai_optimizer/util.py b/packages/optimization/src/ldai_optimizer/util.py index a3671e26..b725f227 100644 --- a/packages/optimization/src/ldai_optimizer/util.py +++ b/packages/optimization/src/ldai_optimizer/util.py @@ -336,7 +336,8 @@ def estimate_cost( :param usage: Token usage from the agent call. When ``None``, returns ``None``. :param model_config: Model config dict from ``get_model_configs()``, or ``None``. - :return: Estimated cost in USD, or ``None`` if usage or pricing data is absent. + :return: Estimated cost in USD, or ``None`` if usage or pricing data is absent, or if + both ``usage.input`` and ``usage.output`` are ``None`` (no token counts available). """ if usage is None: return None @@ -348,8 +349,11 @@ def estimate_cost( return None cost = 0.0 + computed = False if input_price is not None and usage.input is not None: cost += usage.input * input_price + computed = True if output_price is not None and usage.output is not None: cost += usage.output * output_price - return cost + computed = True + return cost if computed else None diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 937a24a2..6d94d3df 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -5094,6 +5094,26 @@ def test_zero_usage_with_pricing_returns_zero(self): model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} assert estimate_cost(usage, model_config) == pytest.approx(0.0) + def test_returns_none_when_both_token_counts_are_none(self): + # Pricing exists but both input and output are None — no token counts to + # compute from, so we must return None rather than 0.0 to avoid + # cost-gate treating unknown cost as zero cost. + usage = TokenUsage(total=None, input=None, output=None) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) is None + + def test_returns_partial_cost_when_only_input_count_is_none(self): + # Only output count available — should still compute a partial cost. + usage = TokenUsage(total=40, input=None, output=40) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(40 * 0.002) + + def test_returns_partial_cost_when_only_output_count_is_none(self): + # Only input count available — should still compute a partial cost. + usage = TokenUsage(total=60, input=60, output=None) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(60 * 0.001) + # --------------------------------------------------------------------------- # _acceptance_criteria_implies_cost_optimization @@ -5629,10 +5649,11 @@ def test_gate_scores_do_not_affect_result(self): assert self.client._all_judges_passing() is True def test_uses_most_recent_history_entry(self): - """Only the last history entry is inspected.""" + """In non-GT mode (_last_batch_size=1) only the last history entry is inspected.""" self.client._options = _make_options(judges={ "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), }) + self.client._last_batch_size = 1 self.client._history = [ self._ctx_with_scores({"accuracy": JudgeResult(score=0.5, rationale="early fail")}, iteration=1), self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="later pass")}, iteration=2), @@ -5653,6 +5674,60 @@ def test_inverted_judge_fails_when_score_above_threshold(self): self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.5, rationale="toxic")})] assert self.client._all_judges_passing() is False + # --- GT batch tests --- + + def test_gt_batch_last_sample_passes_but_earlier_fails_returns_false(self): + """Core GT bug: if any sample in the batch failed, must return False even if the last passed.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 3 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.3, rationale="fail")}, iteration=1), # FAILS + self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")}, iteration=2), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=3), + ] + assert self.client._all_judges_passing() is False + + def test_gt_batch_all_samples_pass_returns_true(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 3 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.85, rationale="ok")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.90, rationale="ok")}, iteration=2), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=3), + ] + assert self.client._all_judges_passing() is True + + def test_gt_batch_middle_sample_fails_returns_false(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 3 + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.20, rationale="fail")}, iteration=2), # FAILS + self._ctx_with_scores({"accuracy": JudgeResult(score=0.95, rationale="ok")}, iteration=3), + ] + assert self.client._all_judges_passing() is False + + def test_gt_batch_size_respected_ignores_older_batches(self): + """Entries outside the current batch window should not influence the result.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._last_batch_size = 2 + # 4 entries; batch covers last 2; first 2 are stale (from a previous attempt) + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.1, rationale="old fail")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.1, rationale="old fail")}, iteration=2), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")}, iteration=3), + self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")}, iteration=4), + ] + assert self.client._all_judges_passing() is True + class TestBuildNewVariationPromptCost: def _make_history(self) -> list: