Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/stepwise/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3449,6 +3449,7 @@ def _process_completion(self, job: Job, run: StepRun) -> None:
"step": run.step_name,
"rule": rule.name,
})
self._cleanup_job_sessions(job.id, job)
return

# No rule matched — behavior depends on whether advance rules exist
Expand Down
29 changes: 29 additions & 0 deletions tests/test_resource_manager_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from stepwise.executors import ExecutorRegistry
from stepwise.models import (
ExecutorRef,
ExitRule,
JobStatus,
StepDefinition,
WorkflowDefinition,
Expand Down Expand Up @@ -109,6 +110,34 @@ async def test_completed_job_triggers_release(self):
assert result.status == JobStatus.COMPLETED
assert mgr.released_jobs == [job.id]

@pytest.mark.asyncio
async def test_abandoned_job_triggers_release(self):
"""A job abandoned via exit rule on a completed step releases resources.

Regression test: the abandon arm of exit-rule resolution in
_process_completion marked the job FAILED without draining
resource managers, leaking the job's ACP agent processes for
the life of the server.
"""
register_step_fn("noop", lambda inputs: {})
mgr = FakeResourceManager()
engine = _make_engine_with_manager(mgr)
wf = WorkflowDefinition(steps={
"s1": StepDefinition(
name="s1", outputs=[],
executor=ExecutorRef("callable", {"fn_name": "noop"}),
exit_rules=[
ExitRule("cancel", "always", {"action": "abandon"}, priority=10),
],
),
})
job = engine.create_job("t-abandon", wf)

result = await run_job(engine, job.id)

assert result.status == JobStatus.FAILED
assert mgr.released_jobs == [job.id]

@pytest.mark.asyncio
async def test_failing_manager_does_not_break_engine(self):
"""If release_for_job raises, the job still completes cleanly."""
Expand Down