diff --git a/src/stepwise/engine.py b/src/stepwise/engine.py index 3467835..5f6496b 100644 --- a/src/stepwise/engine.py +++ b/src/stepwise/engine.py @@ -3449,6 +3449,7 @@ def _process_completion(self, job: Job, run: StepRun) -> None: "step": run.step_name, "rule": rule.name, }) + self._cleanup_job_sessions(job.id, job) return # No rule matched — behavior depends on whether advance rules exist diff --git a/tests/test_resource_manager_lifecycle.py b/tests/test_resource_manager_lifecycle.py index 68214c9..7d2dee8 100644 --- a/tests/test_resource_manager_lifecycle.py +++ b/tests/test_resource_manager_lifecycle.py @@ -15,6 +15,7 @@ from stepwise.executors import ExecutorRegistry from stepwise.models import ( ExecutorRef, + ExitRule, JobStatus, StepDefinition, WorkflowDefinition, @@ -109,6 +110,34 @@ async def test_completed_job_triggers_release(self): assert result.status == JobStatus.COMPLETED assert mgr.released_jobs == [job.id] + @pytest.mark.asyncio + async def test_abandoned_job_triggers_release(self): + """A job abandoned via exit rule on a completed step releases resources. + + Regression test: the abandon arm of exit-rule resolution in + _process_completion marked the job FAILED without draining + resource managers, leaking the job's ACP agent processes for + the life of the server. + """ + register_step_fn("noop", lambda inputs: {}) + mgr = FakeResourceManager() + engine = _make_engine_with_manager(mgr) + wf = WorkflowDefinition(steps={ + "s1": StepDefinition( + name="s1", outputs=[], + executor=ExecutorRef("callable", {"fn_name": "noop"}), + exit_rules=[ + ExitRule("cancel", "always", {"action": "abandon"}, priority=10), + ], + ), + }) + job = engine.create_job("t-abandon", wf) + + result = await run_job(engine, job.id) + + assert result.status == JobStatus.FAILED + assert mgr.released_jobs == [job.id] + @pytest.mark.asyncio async def test_failing_manager_does_not_break_engine(self): """If release_for_job raises, the job still completes cleanly."""