Skip to content

feat: Fleet client#1

Open
dzorlu wants to merge 88 commits into
mainfrom
deniz/fleet_client
Open

feat: Fleet client#1
dzorlu wants to merge 88 commits into
mainfrom
deniz/fleet_client

Conversation

@dzorlu

@dzorlu dzorlu commented Dec 12, 2025

Copy link
Copy Markdown
Collaborator

PR: Fleet environments (OpenEnv)

This PR documents and refines the Fleet runtime integration for OpenEnv.

What this enables

  • Run OpenEnv environments on Fleet (remote) with no local Docker.
  • Keep a strict split between:
    • Orchestration (HTTP): reset / step / state
    • Agent actions (MCP): tools/list + tools/call

What this is not

  • This is not the local “Dockerized env server + env container” setup.
  • There is no container/provider abstraction here; Fleet hosts the runtime remotely (HTTP env server + MCP service). The client only connects.

Main abstractions

  • FleetEnvClient (HTTP): orchestrator handle for reset/step/state.
  • FleetMCPTools (MCP): agent handle for listing/calling tools.
    • Unions tools across Fleet’s MCP endpoints (today often api/v1/mcp and mcp)
    • Returns tools in OpenAI “tools” dict format (via convert_tool_format)
    • Routes tool calls to the owning endpoint (cached after discovery)

Quickstart

  • Install: pip install "openenv-core[fleet]"
  • Set: export FLEET_API_KEY="..."
  • Run: python examples/fleet_env_example.py <env_key>

References

  • RFC 001: rfcs/001-abstractions.md
  • RFC 003: rfcs/003-mcp-support.md

TODOs / known sharp edges

  • Endpoint discovery (avoid hardcoding api/v1/mcp vs mcp)
  • Reset inconsistencies across some env keys (better errors + compatibility notes)
  • Tool-name collision policy across endpoints
  • Retries/backoff and clearer “endpoint down” failure modes

@dzorlu dzorlu changed the title Deniz/fleet client feat: Fleet client Dec 13, 2025
Deniz and others added 15 commits December 17, 2025 17:29
- FleetTaskEnv wraps FleetEnvClient with task-oriented interface
- Accepts task configs from export_training_tasks.py
- Creates versioned environments on reset
- Injects task prompt into observations
- Executes verifier for reward computation on episode completion
- Supports both sync and async step methods
- Factory functions: make_fleet_task_env, from_json_file
- Tests: 20 unit tests for init, specs, verifiers, factories

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The MCP images don't exist for all environment versions, causing
FleetVersionNotFoundError when trying to create environments.
Changing the default to None allows the Fleet SDK to use standard
images which are available for all versions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
FleetEnvClient.from_fleet() was not accepting data_key/data_version
parameters, causing them to be passed through **kwargs to HTTPEnvClient
which doesn't accept them.

- Add data_key and data_version as explicit parameters
- Pass them to fleet.make()
- Update task_env.py to pass them separately

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Fleet SDK expects data_key in "key:version" format, not as separate
parameters. Updated from_fleet() to combine them before calling
fleet.make().

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
HTTPEnvClient.reset() doesn't support seed parameter yet.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Increases default timeout from 15s to 60s for Fleet API calls.
This prevents timeouts during environment initialization.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Previously reset() did partial work and reset_async() added tool fetching.
Now reset_async() does all the work (including fetching tools) and reset()
is just a sync wrapper that calls it via run_until_complete().

This ensures both methods return identical results including tools.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
MCP's call_tool() returns a CallToolResult Pydantic object, not plain text.
This was causing ugly repr strings to be passed to agents like:
  "meta=None content=[TextContent(type='text', text='...')] ..."

Now properly extracts:
- Text content from result.content[].text
- Tries JSON parsing for structured results
- Falls back to structuredContent if available
- Handles isError cases

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Tests for:
- FleetMCPClient._extract_tool_result():
  - Single text content extraction
  - JSON parsing from text
  - Multiple text contents
  - Error result handling
  - Structured content fallback
  - Empty result handling

- FleetTaskEnv reset:
  - reset_async() returns tools
  - reset() calls reset_async() (sync wrapper)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Move fleet.make() and list_tools() into FleetTaskEnv.__init__()
- Tools are now fetched at env creation, not during reset
- reset_async() calls _orch.reset() with error handling, returns cached tools
- Use asyncio.run() for Python 3.13 compatibility
- Update tests for new initialization pattern

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Log task_key and verifier code preview when verifier fails
- Catch syntax errors separately with clear message
- Show which functions were found if 'verify' is missing

Helps debug issues like "Verifier code must define a 'verify' function"

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace custom _execute_verifier_local() with Fleet SDK's Task.verify_detailed()
which properly sets up the verifier namespace with:
- Environment type annotation
- Helper functions (normalized_contains, etc.)
- Proper function discovery (not just "verify" function)

This fixes "name 'Environment' is not defined" errors during verifier execution.

Changes:
- _compute_reward: Create Fleet SDK Task and call verify_detailed()
- Support both 'verifier_code' and 'verifier_func' field names
- Add comprehensive logging for debugging
- Remove broken _execute_verifier_local method

Tests:
- Update all verifier tests to mock Fleet SDK Task.verify_detailed()
- Add tests for various edge cases (no verifier, no orch, exceptions)
- Fix fixture to avoid asyncio.run() conflicts with pytest-asyncio

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Deniz and others added 9 commits January 26, 2026 11:01
- Add retry with exponential backoff (3 attempts, 1s/2s/4s delays)
- Log errors instead of silently swallowing exceptions
- Log warning when some clients fail but others succeed
- Log error after all retries exhausted

This fixes silent failures when MCP connections are flaky, which caused
'no tools found' errors in SkyRL training.
call_tool now retries with exponential backoff (3 attempts, 1s/2s/4s)
on connection errors, similar to list_tools.

ValueError (tool not found) is not retried.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Adds exponential backoff retry (3 attempts, 2s base delay) around
fleet.make() to handle transient Fleet API errors like health check
failures that can occur during instance provisioning.

Only retries on transient errors (health check, timeout, connection).
Permanent errors are raised immediately.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add Toolathlon-style context management tools for long trajectories:
- check_context: Check visible/total turn counts
- manage_context: Drop old turns to free up context space
- search_history: Search all history (including dropped)
- search_tool_output: Search truncated tool output
- view_tool_output: Paginate through truncated output

The ContextManager class can be used by any training framework that
maintains chat_history. It tracks full history and handles truncated
tool outputs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Computer-use tasks require MCP-enabled container images (e.g., famazon:mcp0.0.7)
which have scrot installed for screenshots and the MCP server with 'computer' tool
for mouse/keyboard control.
When `partial_reward=True`, failed verifier runs compute a fractional
score from the error/success accumulators instead of binary 0/1.
Passing tasks are unaffected. Off by default.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@dzorlu dzorlu reopened this Mar 11, 2026
@dzorlu dzorlu marked this pull request as ready for review March 11, 2026 05:42
ttl_seconds: Optional[int] = 3600,
env_variables: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Tuple["FleetEnvClient", FleetMCPTools]:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required parameters missing defaults break example and tests

High Severity

from_fleet requires data_key, data_version, and image_type as mandatory positional parameters with no defaults. However, the example in fleet_env_example.py calls FleetEnvClient.from_fleet(api_key=..., env_key=..., ttl_seconds=600) without providing these three arguments, and the tests similarly call from_fleet(api_key="k", env_key="e"). This will raise a TypeError at runtime. These parameters likely need default values (e.g., data_key: Optional[str] = None, data_version: Optional[str] = None, image_type: str = "standard"), since the method body already handles falsy data_key gracefully.

Additional Locations (1)
Fix in Cursor Fix in Web


elapsed = time.time() - start
instance_id = getattr(env, "instance_id", "unknown")
_logger.info(f"Fleet instance ready in {elapsed:.1f}s: {instance_id}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync from_fleet missing provisioning telemetry event

Medium Severity

The sync from_fleet method logs provisioning completion only via _logger.info but does not emit the fleet_provisioning_completed telemetry event. The async counterpart from_fleet_async correctly calls fleet_info("fleet_provisioning_completed", ...) at the equivalent point. This asymmetry means sync-provisioned instances are invisible in Logfire dashboards for provisioning latency tracking.

Fix in Cursor Fix in Web

assert env._tools_cache == []

# Should have logged warning
assert "computer_use modality but no 'computer' tool found" in caplog.text

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test expects success but code raises RuntimeError

Medium Severity

test_computer_use_clears_tools_when_no_computer_tool expects reset_async() to return normally and asserts env._tools_cache == []. However, the production code in task_env.py raises RuntimeError("computer_use modality but no 'computer' tool found...") when no computer tool is found for computer_use modality. The test assertion at line 447 is unreachable — the test either passes vacuously due to mock setup issues or will fail once mocks are corrected.

Fix in Cursor Fix in Web

Deniz and others added 2 commits March 12, 2026 15:28
SkyRL can end trajectories early (context overflow, its own max_turns)
without OpenEnv knowing. Previously the model got 0 reward even if the
task was completed. Now close_async()/close() run the verifier when
step_async() never computed a reward, storing the result in
self.final_reward for SkyRL to read.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: Run verifier at close() for orphaned rollouts
obs = await env.reset_async()

# Should have all 3 tools
assert len(env._tools_cache) == 3

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test asserts wrong tool count after filtering

Medium Severity

test_tool_use_does_not_filter asserts len(env._tools_cache) == 3, expecting all tools to remain for tool_use modality. But the production code at task_env.py lines 340-346 explicitly filters out the "computer" tool for tool_use modality. With mock tools including {"name": "computer", ...}, the filter t.get("name") != "computer" removes it, leaving only 2 tools. The test name and assertion contradict the intended code behavior.

Fix in Cursor Fix in Web

Comment thread PR_README.md
- Reset inconsistencies across some env keys (better errors + compatibility notes)
- Tool-name collision policy across endpoints
- Retries/backoff and clearer “endpoint down” failure modes

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR-specific readme committed to repository

Low Severity

PR_README.md contains PR-specific documentation ("This PR documents and refines…", "TODOs / known sharp edges") that duplicates the PR description. This appears to be temporary PR documentation that shouldn't be merged into the main branch.

Fix in Cursor Fix in Web

Deniz and others added 10 commits March 13, 2026 12:26
Carlisle tasks (354 total, 8 in eval) require models to call
submit_final_answer to submit results, but this tool is a
harness-level synthetic injected by the orchestrator's SessionWorkflow,
not an MCP tool. OpenEnv connects directly to MCP servers, so the tool
was missing — causing 0% scores across all carlisle tasks in training.

Changes:
- Inject submit_final_answer into tool list when prompt references it
- Intercept calls locally (not routed to MCP), store the answer
- Pass final_answer to verifier via Fleet SDK's verify_detailed()
- Run verifier in close()/close_async() for orphaned rollouts
- Add unit tests for the synthetic tool

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: Add submit_final_answer synthetic tool for carlisle tasks
Runs k × m rollouts of generated tasks on Fleet environments.
Given (prompt, verifier_code, env_key), creates FleetTaskEnv instances,
runs agent loops with model inference, and returns structured results
for reward computation (learnability variance + model separation).

Used as the inner loop of the task-scaling RL pipeline in SkyRL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Instead of calling Anthropic directly and running a local agent loop,
the evaluator now:
1. Imports the generated task via fleet.import_task()
2. Creates a harness job via fleet.create_job()
3. Polls for completion
4. Extracts per-session verifier scores from job sessions

Uses real Fleet model IDs (claude-sonnet-4.5, claude-opus-4.5) instead
of the broken weak/strong mapping that required ANTHROPIC_API_KEY.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fleet harness POST /v1/jobs requires model IDs in 'provider/model'
format (e.g., 'anthropic/claude-sonnet-4.5'), not just the model name.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The sync time.sleep() in _poll_job blocked the asyncio event loop,
preventing trajectory timeouts from cancelling evaluations. Using
asyncio.sleep() allows the event loop to properly handle cancellations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fleet returns session model IDs without provider prefix (e.g., "claude-sonnet-4.5")
while we configure them with prefix ("anthropic/claude-sonnet-4.5"). Added
_match_model_id() to normalize and match by bare model name, so scores land
in the correct results_per_model bucket.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feat: Add TaskEvaluator for task generation inner loop
# Wait for MCP services to initialize (matches harness initial_wait=8)
if self.initial_wait > 0:
logger.info(f"Waiting {self.initial_wait:.0f}s for MCP services to initialize...")
await asyncio.sleep(self.initial_wait)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unconditional 8-second sleep on every list_tools call

Medium Severity

list_tools() unconditionally sleeps for initial_wait (8 seconds) on every invocation, not just the first. When FleetTaskEnv.reset_async() is called more than once on the same instance (e.g., multi-episode use), each reset pays the 8-second penalty even though MCP services are already running. There's no mechanism to skip the wait after a successful first call.

Fix in Cursor Fix in Web

Deniz and others added 2 commits March 15, 2026 18:26
Add _tool_errors, _verifier_stdout, _verifier_error to FleetTaskEnv so
SkyRL can build hints from failed rollout feedback without an LLM call.

- Tool errors accumulated in step_async() on MCP errors and exceptions
- Verifier stdout/error captured in _compute_reward() after verifier runs
- Verifier exceptions also captured in _verifier_error (not just failures)
- All feedback properties reset in reset_async() to prevent cross-episode leakage
- Properties: verifier_stdout, verifier_error, tool_errors_list

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Expose verifier feedback properties for hint generation

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 7 total unresolved issues (including 6 from previous reviews).

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Test expects ValueError during init but error deferred to reset
    • Added an explicit env_key validation in FleetTaskEnv.init so missing env_key now raises ValueError during construction as the test expects.

Create PR

Or push these changes by commenting:

@cursor push 19403ebbf6
Preview (19403ebbf6)
diff --git a/src/envs/fleet_env/task_env.py b/src/envs/fleet_env/task_env.py
--- a/src/envs/fleet_env/task_env.py
+++ b/src/envs/fleet_env/task_env.py
@@ -153,6 +153,8 @@
             raise ValueError(
                 "Fleet API key required (pass api_key or set FLEET_API_KEY)"
             )
+        if not self.task.get("env_key"):
+            raise ValueError("Task config missing env_key")
 
         self._step_count = 0
         self._done = False

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

task = {"task_key": "test", "prompt": "test"}
# The error is raised during __init__ when _build_env_spec is called
with pytest.raises(ValueError, match="missing env_key"):
FleetTaskEnv(task, api_key="test")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test expects ValueError during init but error deferred to reset

Medium Severity

test_build_env_spec_raises_without_env_key expects FleetTaskEnv(task, api_key="test") to raise ValueError("missing env_key") during __init__. However, __init__ never calls _build_env_spec() — provisioning is deferred to _ensure_provisioned() which runs during reset_async(). The __init__ only accesses self.env_key which returns "unknown" via .get("env_key", "unknown"). The pytest.raises block will fail because no exception is raised.

Fix in Cursor Fix in Web

Deniz and others added 3 commits March 17, 2026 18:01
Add describe_db() and query_db() methods (sync + async) to
FleetEnvClient, delegating to the Fleet SDK's SQLiteResource.
This enables querying provisioned environment databases (seed/current)
from outside the container via HTTP, which is needed for task
generation workflows where the model explores DB data before
designing tasks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
describe_db_async/query_db_async were wrapping sync describe_db/query_db
in asyncio.to_thread, but when _fleet_env is an AsyncEnv (from
AsyncFleet.make), .db().describe() and .query() are async coroutines.
Now detects whether the resource method is a coroutine and awaits
directly instead of using to_thread.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When the Fleet MCP server returns {"base64_image": null} (failed screenshot),
_extract_tool_result would create an image_url block with url=None, causing
downstream AttributeError in SkyRL's extract_images_from_conversation.

Now returns a text error message instead of propagating the null.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

There are 9 total unresolved issues (including 7 from previous reviews).

Autofix Details

Bugbot Autofix prepared fixes for both issues found in the latest run.

  • ✅ Fixed: Example crashes with missing required parameters
    • Made FleetEnvClient.from_fleet() and from_fleet_async() accept optional data_key/data_version and default image_type="standard" so the quickstart call no longer raises TypeError.
  • ✅ Fixed: Massive duplication between sync and async provisioning methods
    • Extracted shared provisioning helpers for data key spec, retry/error handling, MCP URL/tool construction, and provisioning telemetry so sync and async paths now reuse the same logic.

Create PR

Or push these changes by commenting:

@cursor push 82f79a59c5
Preview (82f79a59c5)
diff --git a/src/envs/fleet_env/client.py b/src/envs/fleet_env/client.py
--- a/src/envs/fleet_env/client.py
+++ b/src/envs/fleet_env/client.py
@@ -9,6 +9,7 @@
 import asyncio
 import dataclasses
 import logging
+import time
 from typing import Any, Dict, List, Optional, Tuple, Type
 
 try:
@@ -30,6 +31,15 @@
 class FleetEnvClient(HTTPEnvClient[Action, Observation]):
     """Orchestrator-facing client for Fleet-hosted environments (HTTP only)."""
 
+    _MAX_FLEET_MAKE_RETRIES = 3
+    _FLEET_MAKE_RETRY_BASE_DELAY_S = 2.0
+    _TRANSIENT_FLEET_ERROR_HINTS = (
+        "health check",
+        "timeout",
+        "connection",
+        "temporarily",
+    )
+
     def __init__(
         self,
         base_url: str,
@@ -47,14 +57,103 @@
         self._api_key = api_key
         self._mcp_urls = mcp_urls
 
+    @staticmethod
+    def _build_data_key_spec(
+        data_key: Optional[str], data_version: Optional[str]
+    ) -> Optional[str]:
+        # Fleet SDK expects data_key in "key:version" format.
+        if not data_key:
+            return None
+        return f"{data_key}:{data_version}" if data_version else data_key
+
+    @staticmethod
+    def _sdk_image_type(image_type: str) -> Optional[str]:
+        # Fleet SDK expects image_type=None for standard images.
+        return image_type if image_type == "mcp" else None
+
+    @staticmethod
+    def _get_mcp_urls(root: str, image_type: str) -> Tuple[str, ...]:
+        # Pick MCP endpoint based on modality:
+        # - computer_use (image_type="mcp"): aggregator on port 8081
+        # - tool_use: per-env MCP server on port 3003
+        if image_type == "mcp":
+            return (f"{root}api/v1/mcp",)
+        return (f"{root}mcp",)
+
     @classmethod
+    def _retry_delay_on_make_error(
+        cls,
+        *,
+        env_key: str,
+        attempt: int,
+        error: Exception,
+        logger: logging.Logger,
+        client_name: str,
+    ) -> Optional[float]:
+        max_retries = cls._MAX_FLEET_MAKE_RETRIES
+        error_msg = str(error).lower()
+        is_transient = any(
+            hint in error_msg for hint in cls._TRANSIENT_FLEET_ERROR_HINTS
+        )
+        if attempt < max_retries - 1 and is_transient:
+            delay = cls._FLEET_MAKE_RETRY_BASE_DELAY_S * (2**attempt)
+            logger.warning(
+                f"[env={env_key}] {client_name}.make() failed (attempt {attempt + 1}/{max_retries}): {error}. "
+                f"Retrying in {delay:.1f}s..."
+            )
+            fleet_warning(
+                "fleet_make_retry",
+                attempt=attempt + 1,
+                max_retries=max_retries,
+                error_type=type(error).__name__,
+                error_message=str(error),
+                retry_delay_s=delay,
+            )
+            return delay
+
+        logger.error(
+            f"[env={env_key}] {client_name}.make() failed after {attempt + 1} attempt(s): {error}"
+        )
+        fleet_error(
+            "fleet_make_failed",
+            attempt=attempt + 1,
+            max_retries=max_retries,
+            error_type=type(error).__name__,
+            error_message=str(error),
+        )
+        return None
+
+    @staticmethod
+    def _emit_provisioning_completed(elapsed: float, instance_id: Any) -> None:
+        fleet_info(
+            "fleet_provisioning_completed",
+            provisioning_time_s=round(elapsed, 1),
+            instance_id=instance_id,
+        )
+
+    @classmethod
+    def _build_orchestrator_and_tools(
+        cls, env: Any, api_key: str, image_type: str, **kwargs: Any
+    ) -> Tuple["FleetEnvClient", FleetMCPTools]:
+        mcp_urls = cls._get_mcp_urls(root=env.urls.root, image_type=image_type)
+        orch = cls(
+            base_url=env.urls.manager.api,
+            fleet_env_handle=env,
+            api_key=api_key,
+            mcp_urls=mcp_urls,
+            **kwargs,
+        )
+        tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
+        return orch, tools
+
+    @classmethod
     def from_fleet(
         cls: Type["FleetEnvClient"],
         api_key: str,
         env_key: str,
-        data_key: str,
-        data_version: str,
-        image_type: str,
+        data_key: Optional[str] = None,
+        data_version: Optional[str] = None,
+        image_type: str = "standard",
         region: Optional[str] = None,
         ttl_seconds: Optional[int] = 3600,
         env_variables: Optional[Dict[str, Any]] = None,
@@ -72,106 +171,56 @@
         # This ensures .close() and other lifecycle methods are synchronous.
         fleet = Fleet(api_key=api_key)
 
-        # Fleet SDK expects data_key in "key:version" format
-        data_key_spec = None
-        if data_key:
-            if data_version:
-                data_key_spec = f"{data_key}:{data_version}"
-            else:
-                data_key_spec = data_key
-
-        import time
-        import logging
-
+        data_key_spec = cls._build_data_key_spec(data_key=data_key, data_version=data_version)
         _logger = logging.getLogger(__name__)
 
         _logger.info(f"Creating Fleet instance: env_key={env_key}, ttl={ttl_seconds}s")
         start = time.time()
 
-        # Retry logic for transient Fleet API failures (e.g., health check failures)
-        max_retries = 3
-        retry_base_delay = 2.0  # seconds
+        # Retry logic for transient Fleet API failures (e.g., health check failures).
+        max_retries = cls._MAX_FLEET_MAKE_RETRIES
         env = None
 
         for attempt in range(max_retries):
             try:
-                # Fleet SDK expects image_type=None for standard images
-                sdk_image_type = image_type if image_type == "mcp" else None
                 env = fleet.make(
                     env_key=env_key,
                     region=region,
                     ttl_seconds=ttl_seconds,
                     env_variables=env_variables,
-                    image_type=sdk_image_type,
+                    image_type=cls._sdk_image_type(image_type),
                     data_key=data_key_spec,
                 )
                 break  # Success
             except Exception as e:
-                error_msg = str(e)
-                # Retry on transient errors (health check failures, timeouts, etc.)
-                is_transient = any(
-                    x in error_msg.lower()
-                    for x in ["health check", "timeout", "connection", "temporarily"]
+                delay = cls._retry_delay_on_make_error(
+                    env_key=env_key,
+                    attempt=attempt,
+                    error=e,
+                    logger=_logger,
+                    client_name="Fleet",
                 )
-                if attempt < max_retries - 1 and is_transient:
-                    delay = retry_base_delay * (2**attempt)
-                    _logger.warning(
-                        f"[env={env_key}] Fleet.make() failed (attempt {attempt + 1}/{max_retries}): {e}. "
-                        f"Retrying in {delay:.1f}s..."
-                    )
-                    fleet_warning(
-                        "fleet_make_retry",
-                        attempt=attempt + 1,
-                        max_retries=max_retries,
-                        error_type=type(e).__name__,
-                        error_message=str(e),
-                        retry_delay_s=delay,
-                    )
-                    time.sleep(delay)
-                else:
-                    _logger.error(
-                        f"[env={env_key}] Fleet.make() failed after {attempt + 1} attempt(s): {e}"
-                    )
-                    fleet_error(
-                        "fleet_make_failed",
-                        attempt=attempt + 1,
-                        max_retries=max_retries,
-                        error_type=type(e).__name__,
-                        error_message=str(e),
-                    )
+                if delay is None:
                     raise
+                time.sleep(delay)
 
         elapsed = time.time() - start
         instance_id = getattr(env, "instance_id", "unknown")
         _logger.info(f"Fleet instance ready in {elapsed:.1f}s: {instance_id}")
+        cls._emit_provisioning_completed(elapsed=elapsed, instance_id=instance_id)
 
-        root = env.urls.root
-        # Pick MCP endpoint based on modality:
-        # - computer_use: aggregator on port 8081 (has computer tool + API tools)
-        # - tool_use: per-env MCP server on port 3003 (API tools only)
-        if image_type == "mcp":
-            mcp_urls = (f"{root}api/v1/mcp",)
-        else:
-            mcp_urls = (f"{root}mcp",)
-
-        orch = cls(
-            base_url=env.urls.manager.api,
-            fleet_env_handle=env,
-            api_key=api_key,
-            mcp_urls=mcp_urls,
-            **kwargs,
+        return cls._build_orchestrator_and_tools(
+            env=env, api_key=api_key, image_type=image_type, **kwargs
         )
-        tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
-        return orch, tools
 
     @classmethod
     async def from_fleet_async(
         cls: Type["FleetEnvClient"],
         api_key: str,
         env_key: str,
-        data_key: str,
-        data_version: str,
-        image_type: str,
+        data_key: Optional[str] = None,
+        data_version: Optional[str] = None,
+        image_type: str = "standard",
         region: Optional[str] = None,
         ttl_seconds: Optional[int] = 3600,
         env_variables: Optional[Dict[str, Any]] = None,
@@ -192,17 +241,7 @@
 
         async_fleet = AsyncFleet(api_key=api_key)
 
-        # Fleet SDK expects data_key in "key:version" format
-        data_key_spec = None
-        if data_key:
-            if data_version:
-                data_key_spec = f"{data_key}:{data_version}"
-            else:
-                data_key_spec = data_key
-
-        import time
-        import logging
-
+        data_key_spec = cls._build_data_key_spec(data_key=data_key, data_version=data_version)
         _logger = logging.getLogger(__name__)
 
         _logger.info(
@@ -210,14 +249,10 @@
         )
         start = time.time()
 
-        # Retry logic with async sleep (non-blocking)
-        max_retries = 3
-        retry_base_delay = 2.0  # seconds
+        # Retry logic with async sleep (non-blocking).
+        max_retries = cls._MAX_FLEET_MAKE_RETRIES
         env = None
 
-        # Fleet SDK expects image_type=None for standard images
-        sdk_image_type = image_type if image_type == "mcp" else None
-
         for attempt in range(max_retries):
             try:
                 env = await async_fleet.make(
@@ -225,72 +260,30 @@
                     region=region,
                     ttl_seconds=ttl_seconds,
                     env_variables=env_variables,
-                    image_type=sdk_image_type,
+                    image_type=cls._sdk_image_type(image_type),
                     data_key=data_key_spec,
                 )
                 break  # Success
             except Exception as e:
-                error_msg = str(e)
-                # Retry on transient errors (health check failures, timeouts, etc.)
-                is_transient = any(
-                    x in error_msg.lower()
-                    for x in ["health check", "timeout", "connection", "temporarily"]
+                delay = cls._retry_delay_on_make_error(
+                    env_key=env_key,
+                    attempt=attempt,
+                    error=e,
+                    logger=_logger,
+                    client_name="AsyncFleet",
                 )
-                if attempt < max_retries - 1 and is_transient:
-                    delay = retry_base_delay * (2**attempt)
-                    _logger.warning(
-                        f"[env={env_key}] AsyncFleet.make() failed (attempt {attempt + 1}/{max_retries}): {e}. "
-                        f"Retrying in {delay:.1f}s..."
-                    )
-                    fleet_warning(
-                        "fleet_make_retry",
-                        attempt=attempt + 1,
-                        max_retries=max_retries,
-                        error_type=type(e).__name__,
-                        error_message=str(e),
-                        retry_delay_s=delay,
-                    )
-                    await asyncio.sleep(delay)
-                else:
-                    _logger.error(
-                        f"[env={env_key}] AsyncFleet.make() failed after {attempt + 1} attempt(s): {e}"
-                    )
-                    fleet_error(
-                        "fleet_make_failed",
-                        attempt=attempt + 1,
-                        max_retries=max_retries,
-                        error_type=type(e).__name__,
-                        error_message=str(e),
-                    )
+                if delay is None:
                     raise
+                await asyncio.sleep(delay)
 
         elapsed = time.time() - start
         instance_id = getattr(env, "instance_id", "unknown")
         _logger.info(f"Fleet instance ready (async) in {elapsed:.1f}s: {instance_id}")
-        fleet_info(
-            "fleet_provisioning_completed",
-            provisioning_time_s=round(elapsed, 1),
-            instance_id=instance_id,
-        )
+        cls._emit_provisioning_completed(elapsed=elapsed, instance_id=instance_id)
 
-        root = env.urls.root
-        # Pick MCP endpoint based on modality:
-        # - computer_use (image_type="mcp"): aggregator on port 8081 (has computer tool + API tools)
-        # - tool_use: per-env MCP server on port 3003 (API tools only)
-        if image_type == "mcp":
-            mcp_urls = (f"{root}api/v1/mcp",)
-        else:
-            mcp_urls = (f"{root}mcp",)
-
-        orch = cls(
-            base_url=env.urls.manager.api,
-            fleet_env_handle=env,
-            api_key=api_key,
-            mcp_urls=mcp_urls,
-            **kwargs,
+        return cls._build_orchestrator_and_tools(
+            env=env, api_key=api_key, image_type=image_type, **kwargs
         )
-        tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
-        return orch, tools
 
     # ------------------------------------------------------------------
     # Database query methods (delegate to Fleet SDK's SQLiteResource)

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

api_key=api_key,
env_key=env_key,
ttl_seconds=600, # 10 min TTL
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example crashes with missing required parameters

High Severity

The advertised quickstart example calls FleetEnvClient.from_fleet() with only api_key, env_key, and ttl_seconds, but the method signature requires data_key, data_version, and image_type as mandatory positional parameters with no defaults. This will immediately raise a TypeError at runtime, making the documented quickstart non-functional.

Additional Locations (1)
Fix in Cursor Fix in Web

**kwargs,
)
tools = FleetMCPTools(api_key=api_key, mcp_urls=mcp_urls)
return orch, tools

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Massive duplication between sync and async provisioning methods

Low Severity

from_fleet and from_fleet_async duplicate ~130 lines of nearly identical logic: retry loops, data_key spec construction, MCP URL selection, and orchestrator/tools creation. The only structural difference is fleet.make() vs await async_fleet.make() and time.sleep() vs await asyncio.sleep(). This already caused the telemetry inconsistency where only the async path emits fleet_provisioning_completed.

Additional Locations (1)
Fix in Cursor Fix in Web

Deniz and others added 3 commits April 13, 2026 22:19
browser_use tasks now get the same treatment as computer_use:
- MCP image type (triggers browser sidecar, no MCP image needed)
- computer tool only (screenshot, click, type, scroll)
- initial screenshot on reset
- 1800s TTL

Follows platform change: browsers are now sidecars on base images,
MCP images are deprecated.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
browser_use tasks now create a standard Fleet env (web UI only) plus
a separate browser lease via the Fleet browser API. The browser
navigates to the env's web UI and provides the computer tool
(screenshot, click, type, scroll) via its MCP endpoint.

Flow: Fleet env (standard) → browser lease → navigate → healthcheck
→ use browser MCP for computer tool. On close, delete the lease.

MCP images are deprecated — browsers are now sidecars on base images.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FLEET_API_KEY is not accepted by the browser lease API (401).
Use the shared fallback token from theseus api_token_config.py,
same as the harness uses.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 10 total unresolved issues (including 9 from previous reviews).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 725ddba. Configure here.

return f"https://api.browser.{cluster_name}.fleetai.com"


_BROWSER_API_TOKEN_FALLBACK = "bb46d985-763a-47bf-a67a-c2c98ba6e1d9"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded API token committed to source code

High Severity

_BROWSER_API_TOKEN_FALLBACK contains a hardcoded UUID credential (bb46d985-763a-47bf-a67a-c2c98ba6e1d9) used as a fallback authentication token for the browser lease API. This token is committed to the public repository and used in Authorization: Bearer headers, exposing it to anyone with repo access.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 725ddba. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant