feat: Fleet client#1
Conversation
- FleetTaskEnv wraps FleetEnvClient with task-oriented interface - Accepts task configs from export_training_tasks.py - Creates versioned environments on reset - Injects task prompt into observations - Executes verifier for reward computation on episode completion - Supports both sync and async step methods - Factory functions: make_fleet_task_env, from_json_file - Tests: 20 unit tests for init, specs, verifiers, factories 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The MCP images don't exist for all environment versions, causing FleetVersionNotFoundError when trying to create environments. Changing the default to None allows the Fleet SDK to use standard images which are available for all versions. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
FleetEnvClient.from_fleet() was not accepting data_key/data_version parameters, causing them to be passed through **kwargs to HTTPEnvClient which doesn't accept them. - Add data_key and data_version as explicit parameters - Pass them to fleet.make() - Update task_env.py to pass them separately 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Fleet SDK expects data_key in "key:version" format, not as separate parameters. Updated from_fleet() to combine them before calling fleet.make(). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
HTTPEnvClient.reset() doesn't support seed parameter yet. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Increases default timeout from 15s to 60s for Fleet API calls. This prevents timeouts during environment initialization. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Previously reset() did partial work and reset_async() added tool fetching. Now reset_async() does all the work (including fetching tools) and reset() is just a sync wrapper that calls it via run_until_complete(). This ensures both methods return identical results including tools. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
MCP's call_tool() returns a CallToolResult Pydantic object, not plain text. This was causing ugly repr strings to be passed to agents like: "meta=None content=[TextContent(type='text', text='...')] ..." Now properly extracts: - Text content from result.content[].text - Tries JSON parsing for structured results - Falls back to structuredContent if available - Handles isError cases 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Tests for: - FleetMCPClient._extract_tool_result(): - Single text content extraction - JSON parsing from text - Multiple text contents - Error result handling - Structured content fallback - Empty result handling - FleetTaskEnv reset: - reset_async() returns tools - reset() calls reset_async() (sync wrapper) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Move fleet.make() and list_tools() into FleetTaskEnv.__init__() - Tools are now fetched at env creation, not during reset - reset_async() calls _orch.reset() with error handling, returns cached tools - Use asyncio.run() for Python 3.13 compatibility - Update tests for new initialization pattern 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Log task_key and verifier code preview when verifier fails - Catch syntax errors separately with clear message - Show which functions were found if 'verify' is missing Helps debug issues like "Verifier code must define a 'verify' function" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace custom _execute_verifier_local() with Fleet SDK's Task.verify_detailed() which properly sets up the verifier namespace with: - Environment type annotation - Helper functions (normalized_contains, etc.) - Proper function discovery (not just "verify" function) This fixes "name 'Environment' is not defined" errors during verifier execution. Changes: - _compute_reward: Create Fleet SDK Task and call verify_detailed() - Support both 'verifier_code' and 'verifier_func' field names - Add comprehensive logging for debugging - Remove broken _execute_verifier_local method Tests: - Update all verifier tests to mock Fleet SDK Task.verify_detailed() - Add tests for various edge cases (no verifier, no orch, exceptions) - Fix fixture to avoid asyncio.run() conflicts with pytest-asyncio 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add retry with exponential backoff (3 attempts, 1s/2s/4s delays) - Log errors instead of silently swallowing exceptions - Log warning when some clients fail but others succeed - Log error after all retries exhausted This fixes silent failures when MCP connections are flaky, which caused 'no tools found' errors in SkyRL training.
call_tool now retries with exponential backoff (3 attempts, 1s/2s/4s) on connection errors, similar to list_tools. ValueError (tool not found) is not retried. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Adds exponential backoff retry (3 attempts, 2s base delay) around fleet.make() to handle transient Fleet API errors like health check failures that can occur during instance provisioning. Only retries on transient errors (health check, timeout, connection). Permanent errors are raised immediately. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add Toolathlon-style context management tools for long trajectories: - check_context: Check visible/total turn counts - manage_context: Drop old turns to free up context space - search_history: Search all history (including dropped) - search_tool_output: Search truncated tool output - view_tool_output: Paginate through truncated output The ContextManager class can be used by any training framework that maintains chat_history. It tracks full history and handles truncated tool outputs. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Computer-use tasks require MCP-enabled container images (e.g., famazon:mcp0.0.7) which have scrot installed for screenshots and the MCP server with 'computer' tool for mouse/keyboard control.
When `partial_reward=True`, failed verifier runs compute a fractional score from the error/success accumulators instead of binary 0/1. Passing tasks are unaffected. Off by default. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: partial reward support
| ttl_seconds: Optional[int] = 3600, | ||
| env_variables: Optional[Dict[str, Any]] = None, | ||
| **kwargs: Any, | ||
| ) -> Tuple["FleetEnvClient", FleetMCPTools]: |
There was a problem hiding this comment.
Required parameters missing defaults break example and tests
High Severity
from_fleet requires data_key, data_version, and image_type as mandatory positional parameters with no defaults. However, the example in fleet_env_example.py calls FleetEnvClient.from_fleet(api_key=..., env_key=..., ttl_seconds=600) without providing these three arguments, and the tests similarly call from_fleet(api_key="k", env_key="e"). This will raise a TypeError at runtime. These parameters likely need default values (e.g., data_key: Optional[str] = None, data_version: Optional[str] = None, image_type: str = "standard"), since the method body already handles falsy data_key gracefully.
Additional Locations (1)
|
|
||
| elapsed = time.time() - start | ||
| instance_id = getattr(env, "instance_id", "unknown") | ||
| _logger.info(f"Fleet instance ready in {elapsed:.1f}s: {instance_id}") |
There was a problem hiding this comment.
Sync from_fleet missing provisioning telemetry event
Medium Severity
The sync from_fleet method logs provisioning completion only via _logger.info but does not emit the fleet_provisioning_completed telemetry event. The async counterpart from_fleet_async correctly calls fleet_info("fleet_provisioning_completed", ...) at the equivalent point. This asymmetry means sync-provisioned instances are invisible in Logfire dashboards for provisioning latency tracking.
| assert env._tools_cache == [] | ||
|
|
||
| # Should have logged warning | ||
| assert "computer_use modality but no 'computer' tool found" in caplog.text |
There was a problem hiding this comment.
Test expects success but code raises RuntimeError
Medium Severity
test_computer_use_clears_tools_when_no_computer_tool expects reset_async() to return normally and asserts env._tools_cache == []. However, the production code in task_env.py raises RuntimeError("computer_use modality but no 'computer' tool found...") when no computer tool is found for computer_use modality. The test assertion at line 447 is unreachable — the test either passes vacuously due to mock setup issues or will fail once mocks are corrected.
SkyRL can end trajectories early (context overflow, its own max_turns) without OpenEnv knowing. Previously the model got 0 reward even if the task was completed. Now close_async()/close() run the verifier when step_async() never computed a reward, storing the result in self.final_reward for SkyRL to read. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: Run verifier at close() for orphaned rollouts
| obs = await env.reset_async() | ||
|
|
||
| # Should have all 3 tools | ||
| assert len(env._tools_cache) == 3 |
There was a problem hiding this comment.
Test asserts wrong tool count after filtering
Medium Severity
test_tool_use_does_not_filter asserts len(env._tools_cache) == 3, expecting all tools to remain for tool_use modality. But the production code at task_env.py lines 340-346 explicitly filters out the "computer" tool for tool_use modality. With mock tools including {"name": "computer", ...}, the filter t.get("name") != "computer" removes it, leaving only 2 tools. The test name and assertion contradict the intended code behavior.
| - Reset inconsistencies across some env keys (better errors + compatibility notes) | ||
| - Tool-name collision policy across endpoints | ||
| - Retries/backoff and clearer “endpoint down” failure modes | ||
|
|
There was a problem hiding this comment.
Carlisle tasks (354 total, 8 in eval) require models to call submit_final_answer to submit results, but this tool is a harness-level synthetic injected by the orchestrator's SessionWorkflow, not an MCP tool. OpenEnv connects directly to MCP servers, so the tool was missing — causing 0% scores across all carlisle tasks in training. Changes: - Inject submit_final_answer into tool list when prompt references it - Intercept calls locally (not routed to MCP), store the answer - Pass final_answer to verifier via Fleet SDK's verify_detailed() - Run verifier in close()/close_async() for orphaned rollouts - Add unit tests for the synthetic tool Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: Add submit_final_answer synthetic tool for carlisle tasks
Runs k × m rollouts of generated tasks on Fleet environments. Given (prompt, verifier_code, env_key), creates FleetTaskEnv instances, runs agent loops with model inference, and returns structured results for reward computation (learnability variance + model separation). Used as the inner loop of the task-scaling RL pipeline in SkyRL. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Instead of calling Anthropic directly and running a local agent loop, the evaluator now: 1. Imports the generated task via fleet.import_task() 2. Creates a harness job via fleet.create_job() 3. Polls for completion 4. Extracts per-session verifier scores from job sessions Uses real Fleet model IDs (claude-sonnet-4.5, claude-opus-4.5) instead of the broken weak/strong mapping that required ANTHROPIC_API_KEY. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fleet harness POST /v1/jobs requires model IDs in 'provider/model' format (e.g., 'anthropic/claude-sonnet-4.5'), not just the model name. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The sync time.sleep() in _poll_job blocked the asyncio event loop, preventing trajectory timeouts from cancelling evaluations. Using asyncio.sleep() allows the event loop to properly handle cancellations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fleet returns session model IDs without provider prefix (e.g., "claude-sonnet-4.5")
while we configure them with prefix ("anthropic/claude-sonnet-4.5"). Added
_match_model_id() to normalize and match by bare model name, so scores land
in the correct results_per_model bucket.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: Add TaskEvaluator for task generation inner loop
| # Wait for MCP services to initialize (matches harness initial_wait=8) | ||
| if self.initial_wait > 0: | ||
| logger.info(f"Waiting {self.initial_wait:.0f}s for MCP services to initialize...") | ||
| await asyncio.sleep(self.initial_wait) |
There was a problem hiding this comment.
Unconditional 8-second sleep on every list_tools call
Medium Severity
list_tools() unconditionally sleeps for initial_wait (8 seconds) on every invocation, not just the first. When FleetTaskEnv.reset_async() is called more than once on the same instance (e.g., multi-episode use), each reset pays the 8-second penalty even though MCP services are already running. There's no mechanism to skip the wait after a successful first call.
Add _tool_errors, _verifier_stdout, _verifier_error to FleetTaskEnv so SkyRL can build hints from failed rollout feedback without an LLM call. - Tool errors accumulated in step_async() on MCP errors and exceptions - Verifier stdout/error captured in _compute_reward() after verifier runs - Verifier exceptions also captured in _verifier_error (not just failures) - All feedback properties reset in reset_async() to prevent cross-episode leakage - Properties: verifier_stdout, verifier_error, tool_errors_list Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Expose verifier feedback properties for hint generation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 7 total unresolved issues (including 6 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Test expects ValueError during init but error deferred to reset
- Added an explicit env_key validation in FleetTaskEnv.init so missing env_key now raises ValueError during construction as the test expects.
Or push these changes by commenting:
@cursor push 19403ebbf6
Preview (19403ebbf6)
diff --git a/src/envs/fleet_env/task_env.py b/src/envs/fleet_env/task_env.py
--- a/src/envs/fleet_env/task_env.py
+++ b/src/envs/fleet_env/task_env.py
@@ -153,6 +153,8 @@
raise ValueError(
"Fleet API key required (pass api_key or set FLEET_API_KEY)"
)
+ if not self.task.get("env_key"):
+ raise ValueError("Task config missing env_key")
self._step_count = 0
self._done = FalseThis Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
| task = {"task_key": "test", "prompt": "test"} | ||
| # The error is raised during __init__ when _build_env_spec is called | ||
| with pytest.raises(ValueError, match="missing env_key"): | ||
| FleetTaskEnv(task, api_key="test") |
There was a problem hiding this comment.
Test expects ValueError during init but error deferred to reset
Medium Severity
test_build_env_spec_raises_without_env_key expects FleetTaskEnv(task, api_key="test") to raise ValueError("missing env_key") during __init__. However, __init__ never calls _build_env_spec() — provisioning is deferred to _ensure_provisioned() which runs during reset_async(). The __init__ only accesses self.env_key which returns "unknown" via .get("env_key", "unknown"). The pytest.raises block will fail because no exception is raised.
Add describe_db() and query_db() methods (sync + async) to FleetEnvClient, delegating to the Fleet SDK's SQLiteResource. This enables querying provisioned environment databases (seed/current) from outside the container via HTTP, which is needed for task generation workflows where the model explores DB data before designing tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
describe_db_async/query_db_async were wrapping sync describe_db/query_db in asyncio.to_thread, but when _fleet_env is an AsyncEnv (from AsyncFleet.make), .db().describe() and .query() are async coroutines. Now detects whether the resource method is a coroutine and awaits directly instead of using to_thread. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When the Fleet MCP server returns {"base64_image": null} (failed screenshot),
_extract_tool_result would create an image_url block with url=None, causing
downstream AttributeError in SkyRL's extract_images_from_conversation.
Now returns a text error message instead of propagating the null.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 9 total unresolved issues (including 7 from previous reviews).
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Example crashes with missing required parameters
- Made
FleetEnvClient.from_fleet()andfrom_fleet_async()accept optionaldata_key/data_versionand defaultimage_type="standard"so the quickstart call no longer raisesTypeError.
- Made
- ✅ Fixed: Massive duplication between sync and async provisioning methods
- Extracted shared provisioning helpers for data key spec, retry/error handling, MCP URL/tool construction, and provisioning telemetry so sync and async paths now reuse the same logic.
Or push these changes by commenting:
@cursor push 82f79a59c5
Preview (82f79a59c5)
diff --git a/src/envs/fleet_env/client.py b/src/envs/fleet_env/client.py
--- a/src/envs/fleet_env/client.py
+++ b/src/envs/fleet_env/client.py
@@ -9,6 +9,7 @@
import asyncio
import dataclasses
import logging
+import time
from typing import Any, Dict, List, Optional, Tuple, Type
try:
@@ -30,6 +31,15 @@
class FleetEnvClient(HTTPEnvClient[Action, Observation]):
"""Orchestrator-facing client for Fleet-hosted environments (HTTP only)."""
+ _MAX_FLEET_MAKE_RETRIES = 3
+ _FLEET_MAKE_RETRY_BASE_DELAY_S = 2.0
+ _TRANSIENT_FLEET_ERROR_HINTS = (
+ "health check",
+ "timeout",
+ "connection",
+ "temporarily",
+ )
+
def __init__(
self,
base_url: str,
@@ -47,14 +57,103 @@
self._api_key = api_key
self._mcp_urls = mcp_urls
+ @staticmethod
+ def _build_data_key_spec(
+ data_key: Optional[str], data_version: Optional[str]
+ ) -> Optional[str]:
+ # Fleet SDK expects data_key in "key:version" format.
+ if not data_key:
+ return None
+ return f"{data_key}:{data_version}" if data_version else data_key
+
+ @staticmethod
+ def _sdk_image_type(image_type: str) -> Optional[str]:
+ # Fleet SDK expects image_type=None for standard images.
+ return image_type if image_type == "mcp" else None
+
+ @staticmethod
+ def _get_mcp_urls(root: str, image_type: str) -> Tuple[str, ...]:
+ # Pick MCP endpoint based on modality:
+ # - computer_use (image_type="mcp"): aggregator on port 8081
+ # - tool_use: per-env MCP server on port 3003
+ if image_type == "mcp":
+ return (f"{root}api/v1/mcp",)
+ return (f"{root}mcp",)
+
@classmethod
+ def _retry_delay_on_make_error(
+ cls,
+ *,
+ env_key: str,
+ attempt: int,
+ error: Exception,
+ logger: logging.Logger,
+ client_name: str,
+ ) -> Optional[float]:
+ max_retries = cls._MAX_FLEET_MAKE_RETRIES
+ error_msg = str(error).lower()
+ is_transient = any(
+ hint in error_msg for hint in cls._TRANSIENT_FLEET_ERROR_HINTS
+ )
+ if attempt < max_retries - 1 and is_transient:
+ delay = cls._FLEET_MAKE_RETRY_BASE_DELAY_S * (2**attempt)
+ logger.warning(
+ f"[env={env_key}] {client_name}.make() failed (attempt {attempt + 1}/{max_retries}): {error}. "
+ f"Retrying in {delay:.1f}s..."
+ )
+ fleet_warning(
+ "fleet_make_retry",
+ attempt=attempt + 1,
+ max_retries=max_retries,
+ error_type=type(error).__name__,
+ error_message=str(error),
+ retry_delay_s=delay,
+ )
+ return delay
+
+ logger.error(
+ f"[env={env_key}] {client_name}.make() failed after {attempt + 1} attempt(s): {error}"
+ )
+ fleet_error(
+ "fleet_make_failed",
+ attempt=attempt + 1,
+ max_retries=max_retries,
+ error_type=type(error).__name__,
+ error_message=str(error),
+ )
+ return None
+
+ @staticmethod
+ def _emit_provisioning_completed(elapsed: float, instance_id: Any) -> None:
+ fleet_info(
+ "fleet_provisioning_completed",
+ provisioning_time_s=round(elapsed, 1),
+ instance_id=instance_id,
+ )
+
+ @classmethod
+ def _build_orchestrator_and_tools(
+ cls, env: Any, api_key: str, image_type: str, **kwargs: Any
+ ) -> Tuple["FleetEnvClient", FleetMCPTools]:
+ mcp_urls = cls._get_mcp_urls(root=env.urls.root, image_type=image_type)
+ orch = cls(
+ base_url=env.urls.manager.api,
+ fleet_env_handle=env,
+ api_key=api_key,
+ mcp_urls=mcp_urls,
+ **kwargs,
+ )
+ tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
+ return orch, tools
+
+ @classmethod
def from_fleet(
cls: Type["FleetEnvClient"],
api_key: str,
env_key: str,
- data_key: str,
- data_version: str,
- image_type: str,
+ data_key: Optional[str] = None,
+ data_version: Optional[str] = None,
+ image_type: str = "standard",
region: Optional[str] = None,
ttl_seconds: Optional[int] = 3600,
env_variables: Optional[Dict[str, Any]] = None,
@@ -72,106 +171,56 @@
# This ensures .close() and other lifecycle methods are synchronous.
fleet = Fleet(api_key=api_key)
- # Fleet SDK expects data_key in "key:version" format
- data_key_spec = None
- if data_key:
- if data_version:
- data_key_spec = f"{data_key}:{data_version}"
- else:
- data_key_spec = data_key
-
- import time
- import logging
-
+ data_key_spec = cls._build_data_key_spec(data_key=data_key, data_version=data_version)
_logger = logging.getLogger(__name__)
_logger.info(f"Creating Fleet instance: env_key={env_key}, ttl={ttl_seconds}s")
start = time.time()
- # Retry logic for transient Fleet API failures (e.g., health check failures)
- max_retries = 3
- retry_base_delay = 2.0 # seconds
+ # Retry logic for transient Fleet API failures (e.g., health check failures).
+ max_retries = cls._MAX_FLEET_MAKE_RETRIES
env = None
for attempt in range(max_retries):
try:
- # Fleet SDK expects image_type=None for standard images
- sdk_image_type = image_type if image_type == "mcp" else None
env = fleet.make(
env_key=env_key,
region=region,
ttl_seconds=ttl_seconds,
env_variables=env_variables,
- image_type=sdk_image_type,
+ image_type=cls._sdk_image_type(image_type),
data_key=data_key_spec,
)
break # Success
except Exception as e:
- error_msg = str(e)
- # Retry on transient errors (health check failures, timeouts, etc.)
- is_transient = any(
- x in error_msg.lower()
- for x in ["health check", "timeout", "connection", "temporarily"]
+ delay = cls._retry_delay_on_make_error(
+ env_key=env_key,
+ attempt=attempt,
+ error=e,
+ logger=_logger,
+ client_name="Fleet",
)
- if attempt < max_retries - 1 and is_transient:
- delay = retry_base_delay * (2**attempt)
- _logger.warning(
- f"[env={env_key}] Fleet.make() failed (attempt {attempt + 1}/{max_retries}): {e}. "
- f"Retrying in {delay:.1f}s..."
- )
- fleet_warning(
- "fleet_make_retry",
- attempt=attempt + 1,
- max_retries=max_retries,
- error_type=type(e).__name__,
- error_message=str(e),
- retry_delay_s=delay,
- )
- time.sleep(delay)
- else:
- _logger.error(
- f"[env={env_key}] Fleet.make() failed after {attempt + 1} attempt(s): {e}"
- )
- fleet_error(
- "fleet_make_failed",
- attempt=attempt + 1,
- max_retries=max_retries,
- error_type=type(e).__name__,
- error_message=str(e),
- )
+ if delay is None:
raise
+ time.sleep(delay)
elapsed = time.time() - start
instance_id = getattr(env, "instance_id", "unknown")
_logger.info(f"Fleet instance ready in {elapsed:.1f}s: {instance_id}")
+ cls._emit_provisioning_completed(elapsed=elapsed, instance_id=instance_id)
- root = env.urls.root
- # Pick MCP endpoint based on modality:
- # - computer_use: aggregator on port 8081 (has computer tool + API tools)
- # - tool_use: per-env MCP server on port 3003 (API tools only)
- if image_type == "mcp":
- mcp_urls = (f"{root}api/v1/mcp",)
- else:
- mcp_urls = (f"{root}mcp",)
-
- orch = cls(
- base_url=env.urls.manager.api,
- fleet_env_handle=env,
- api_key=api_key,
- mcp_urls=mcp_urls,
- **kwargs,
+ return cls._build_orchestrator_and_tools(
+ env=env, api_key=api_key, image_type=image_type, **kwargs
)
- tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
- return orch, tools
@classmethod
async def from_fleet_async(
cls: Type["FleetEnvClient"],
api_key: str,
env_key: str,
- data_key: str,
- data_version: str,
- image_type: str,
+ data_key: Optional[str] = None,
+ data_version: Optional[str] = None,
+ image_type: str = "standard",
region: Optional[str] = None,
ttl_seconds: Optional[int] = 3600,
env_variables: Optional[Dict[str, Any]] = None,
@@ -192,17 +241,7 @@
async_fleet = AsyncFleet(api_key=api_key)
- # Fleet SDK expects data_key in "key:version" format
- data_key_spec = None
- if data_key:
- if data_version:
- data_key_spec = f"{data_key}:{data_version}"
- else:
- data_key_spec = data_key
-
- import time
- import logging
-
+ data_key_spec = cls._build_data_key_spec(data_key=data_key, data_version=data_version)
_logger = logging.getLogger(__name__)
_logger.info(
@@ -210,14 +249,10 @@
)
start = time.time()
- # Retry logic with async sleep (non-blocking)
- max_retries = 3
- retry_base_delay = 2.0 # seconds
+ # Retry logic with async sleep (non-blocking).
+ max_retries = cls._MAX_FLEET_MAKE_RETRIES
env = None
- # Fleet SDK expects image_type=None for standard images
- sdk_image_type = image_type if image_type == "mcp" else None
-
for attempt in range(max_retries):
try:
env = await async_fleet.make(
@@ -225,72 +260,30 @@
region=region,
ttl_seconds=ttl_seconds,
env_variables=env_variables,
- image_type=sdk_image_type,
+ image_type=cls._sdk_image_type(image_type),
data_key=data_key_spec,
)
break # Success
except Exception as e:
- error_msg = str(e)
- # Retry on transient errors (health check failures, timeouts, etc.)
- is_transient = any(
- x in error_msg.lower()
- for x in ["health check", "timeout", "connection", "temporarily"]
+ delay = cls._retry_delay_on_make_error(
+ env_key=env_key,
+ attempt=attempt,
+ error=e,
+ logger=_logger,
+ client_name="AsyncFleet",
)
- if attempt < max_retries - 1 and is_transient:
- delay = retry_base_delay * (2**attempt)
- _logger.warning(
- f"[env={env_key}] AsyncFleet.make() failed (attempt {attempt + 1}/{max_retries}): {e}. "
- f"Retrying in {delay:.1f}s..."
- )
- fleet_warning(
- "fleet_make_retry",
- attempt=attempt + 1,
- max_retries=max_retries,
- error_type=type(e).__name__,
- error_message=str(e),
- retry_delay_s=delay,
- )
- await asyncio.sleep(delay)
- else:
- _logger.error(
- f"[env={env_key}] AsyncFleet.make() failed after {attempt + 1} attempt(s): {e}"
- )
- fleet_error(
- "fleet_make_failed",
- attempt=attempt + 1,
- max_retries=max_retries,
- error_type=type(e).__name__,
- error_message=str(e),
- )
+ if delay is None:
raise
+ await asyncio.sleep(delay)
elapsed = time.time() - start
instance_id = getattr(env, "instance_id", "unknown")
_logger.info(f"Fleet instance ready (async) in {elapsed:.1f}s: {instance_id}")
- fleet_info(
- "fleet_provisioning_completed",
- provisioning_time_s=round(elapsed, 1),
- instance_id=instance_id,
- )
+ cls._emit_provisioning_completed(elapsed=elapsed, instance_id=instance_id)
- root = env.urls.root
- # Pick MCP endpoint based on modality:
- # - computer_use (image_type="mcp"): aggregator on port 8081 (has computer tool + API tools)
- # - tool_use: per-env MCP server on port 3003 (API tools only)
- if image_type == "mcp":
- mcp_urls = (f"{root}api/v1/mcp",)
- else:
- mcp_urls = (f"{root}mcp",)
-
- orch = cls(
- base_url=env.urls.manager.api,
- fleet_env_handle=env,
- api_key=api_key,
- mcp_urls=mcp_urls,
- **kwargs,
+ return cls._build_orchestrator_and_tools(
+ env=env, api_key=api_key, image_type=image_type, **kwargs
)
- tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
- return orch, tools
# ------------------------------------------------------------------
# Database query methods (delegate to Fleet SDK's SQLiteResource)This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
| api_key=api_key, | ||
| env_key=env_key, | ||
| ttl_seconds=600, # 10 min TTL | ||
| ) |
There was a problem hiding this comment.
Example crashes with missing required parameters
High Severity
The advertised quickstart example calls FleetEnvClient.from_fleet() with only api_key, env_key, and ttl_seconds, but the method signature requires data_key, data_version, and image_type as mandatory positional parameters with no defaults. This will immediately raise a TypeError at runtime, making the documented quickstart non-functional.
Additional Locations (1)
| **kwargs, | ||
| ) | ||
| tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls) | ||
| return orch, tools |
There was a problem hiding this comment.
Massive duplication between sync and async provisioning methods
Low Severity
from_fleet and from_fleet_async duplicate ~130 lines of nearly identical logic: retry loops, data_key spec construction, MCP URL selection, and orchestrator/tools creation. The only structural difference is fleet.make() vs await async_fleet.make() and time.sleep() vs await asyncio.sleep(). This already caused the telemetry inconsistency where only the async path emits fleet_provisioning_completed.
Additional Locations (1)
browser_use tasks now get the same treatment as computer_use: - MCP image type (triggers browser sidecar, no MCP image needed) - computer tool only (screenshot, click, type, scroll) - initial screenshot on reset - 1800s TTL Follows platform change: browsers are now sidecars on base images, MCP images are deprecated. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
browser_use tasks now create a standard Fleet env (web UI only) plus a separate browser lease via the Fleet browser API. The browser navigates to the env's web UI and provides the computer tool (screenshot, click, type, scroll) via its MCP endpoint. Flow: Fleet env (standard) → browser lease → navigate → healthcheck → use browser MCP for computer tool. On close, delete the lease. MCP images are deprecated — browsers are now sidecars on base images. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FLEET_API_KEY is not accepted by the browser lease API (401). Use the shared fallback token from theseus api_token_config.py, same as the harness uses. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 10 total unresolved issues (including 9 from previous reviews).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 725ddba. Configure here.
| return f"https://api.browser.{cluster_name}.fleetai.com" | ||
|
|
||
|
|
||
| _BROWSER_API_TOKEN_FALLBACK = "bb46d985-763a-47bf-a67a-c2c98ba6e1d9" |
There was a problem hiding this comment.
Hardcoded API token committed to source code
High Severity
_BROWSER_API_TOKEN_FALLBACK contains a hardcoded UUID credential (bb46d985-763a-47bf-a67a-c2c98ba6e1d9) used as a fallback authentication token for the browser lease API. This token is committed to the public repository and used in Authorization: Bearer headers, exposing it to anyone with repo access.
Reviewed by Cursor Bugbot for commit 725ddba. Configure here.



PR: Fleet environments (OpenEnv)
This PR documents and refines the Fleet runtime integration for OpenEnv.
What this enables
reset / step / statetools/list + tools/callWhat this is not
Main abstractions
FleetEnvClient(HTTP): orchestrator handle for reset/step/state.FleetMCPTools(MCP): agent handle for listing/calling tools.api/v1/mcpandmcp)convert_tool_format)Quickstart
pip install "openenv-core[fleet]"export FLEET_API_KEY="..."python examples/fleet_env_example.py <env_key>References
rfcs/001-abstractions.mdrfcs/003-mcp-support.mdTODOs / known sharp edges
api/v1/mcpvsmcp)