From aef76698e45767c53f97352a11b12f73952a92c1 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Tue, 19 May 2026 00:26:53 +0200 Subject: [PATCH] Fix infinite job retry when fleet is at capacity --- .../pipeline_tasks/jobs_submitted.py | 10 +++++----- .../pipeline_tasks/test_submitted_jobs.py | 19 +++++++++++-------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 76738a8cf..8ecdc0e03 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -588,12 +588,12 @@ async def _apply_assignment_result( return fleet_spec = get_fleet_spec(fleet_model) if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec): - logger.debug( - "%s: fleet %s is full, retrying assignment", - fmt(context.job_model), - fleet_model.name, + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Fleet is at capacity", ) - await _reset_job_lock_for_retry(session=session, item=item) return instance_model = _create_placeholder_instance( fleet_model=fleet_model, diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index c4baf6f89..fd9cf3b58 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -1202,14 +1202,18 @@ async def test_assignment_creates_placeholder_instance_for_new_capacity( assert placeholder.offer is None assert placeholder.instance_num == 0 - async def test_assignment_retries_when_fleet_is_full( - self, test_db, session: AsyncSession, worker: JobSubmittedWorker + @pytest.mark.parametrize("fleet_type", ["cloud", "ssh"]) + async def test_job_fails_when_fleet_is_full( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker, fleet_type: str ): project = await create_project(session=session) user = await create_user(session=session) repo = await create_repo(session=session, project_id=project.id) - fleet_spec = get_fleet_spec() - fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + if fleet_type == "cloud": + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + else: + fleet_spec = get_fleet_spec(get_ssh_fleet_configuration(hosts=["10.0.0.1"])) fleet = await create_fleet(session=session, project=project, spec=fleet_spec) await create_instance( session=session, @@ -1230,10 +1234,9 @@ async def test_assignment_retries_when_fleet_is_full( await _process_job(session=session, worker=worker, job_model=job) job = await _get_job(session, job.id) - # Assignment retried — job not committed as assigned - assert job.status == JobStatus.SUBMITTED - assert not job.instance_assigned - assert job.instance is None + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + assert job.termination_reason_message == "Fleet is at capacity" # No placeholder must be committed when the fleet is full. res = await session.execute( select(InstanceModel).where(