Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,12 @@ async def _apply_assignment_result(
return
fleet_spec = get_fleet_spec(fleet_model)
if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec):
logger.debug(
"%s: fleet %s is full, retrying assignment",
fmt(context.job_model),
fleet_model.name,
await _terminate_submitted_job(
session=session,
job_model=job_model,
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
message="Fleet is at capacity",
)
await _reset_job_lock_for_retry(session=session, item=item)
return
instance_model = _create_placeholder_instance(
fleet_model=fleet_model,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,14 +1202,18 @@ async def test_assignment_creates_placeholder_instance_for_new_capacity(
assert placeholder.offer is None
assert placeholder.instance_num == 0

async def test_assignment_retries_when_fleet_is_full(
self, test_db, session: AsyncSession, worker: JobSubmittedWorker
@pytest.mark.parametrize("fleet_type", ["cloud", "ssh"])
async def test_job_fails_when_fleet_is_full(
self, test_db, session: AsyncSession, worker: JobSubmittedWorker, fleet_type: str
):
project = await create_project(session=session)
user = await create_user(session=session)
repo = await create_repo(session=session, project_id=project.id)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
if fleet_type == "cloud":
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
else:
fleet_spec = get_fleet_spec(get_ssh_fleet_configuration(hosts=["10.0.0.1"]))
fleet = await create_fleet(session=session, project=project, spec=fleet_spec)
await create_instance(
session=session,
Expand All @@ -1230,10 +1234,9 @@ async def test_assignment_retries_when_fleet_is_full(
await _process_job(session=session, worker=worker, job_model=job)

job = await _get_job(session, job.id)
# Assignment retried — job not committed as assigned
assert job.status == JobStatus.SUBMITTED
assert not job.instance_assigned
assert job.instance is None
assert job.status == JobStatus.TERMINATING
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
assert job.termination_reason_message == "Fleet is at capacity"
# No placeholder must be committed when the fleet is full.
res = await session.execute(
select(InstanceModel).where(
Expand Down
Loading