Skip to content

Commit 95f53a3

Browse files
committed
Added flow state parameter to FlowsManager's RemoveFlow-method
1 parent 7761400 commit 95f53a3

2 files changed

Lines changed: 6 additions & 5 deletions

File tree

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ public FlowState AddFlow(StoredId id, QueueManager queueManager, FlowTimeouts ti
5050
return _dict[id] = new FlowState(id, queueManager, threads: 1, waitingThreads: 0, timeouts);
5151
}
5252

53-
public void RemoveFlow(StoredId id)
53+
public void RemoveFlow(StoredId id, FlowState flowState)
5454
{
5555
lock (_lock)
56-
_dict.Remove(id);
56+
if (_dict.TryGetValue(id, out var existingState) && flowState == existingState)
57+
_dict.Remove(id);
5758
}
5859

5960
public async Task Interrupt(IReadOnlyList<StoredId> ids)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ await PersistResultAndEnsureSuccess(
118118
}
119119
finally
120120
{
121-
_flowsManager.RemoveFlow(storedId);
121+
_flowsManager.RemoveFlow(storedId, flowState);
122122
}
123123
});
124124

@@ -208,7 +208,7 @@ public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)
208208
}
209209
finally
210210
{
211-
_flowsManager.RemoveFlow(storedId);
211+
_flowsManager.RemoveFlow(storedId, flowState);
212212
}
213213
});
214214

@@ -250,7 +250,7 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act
250250
tcs.TrySetCanceled();
251251
}
252252
catch (Exception exception) { _unhandledExceptionHandler.Invoke(_flowType, exception); tcs.TrySetException(exception); }
253-
finally{ _flowsManager.RemoveFlow(storedId); onCompletion(); }
253+
finally{ _flowsManager.RemoveFlow(storedId, flowState); onCompletion(); }
254254
});
255255
}
256256

0 commit comments

Comments
 (0)