Skip to content

Commit 5c99fa9

Browse files
authored
Extract FlowState and centralize signal management (#123)
1 parent d500fd0 commit 5c99fa9

5 files changed

Lines changed: 133 additions & 57 deletions

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.Helpers;
5+
using Cleipnir.ResilientFunctions.Queuing;
6+
using Cleipnir.ResilientFunctions.Storage;
7+
8+
namespace Cleipnir.ResilientFunctions.CoreRuntime;
9+
10+
internal class FlowState
11+
{
12+
private readonly Lock _lock = new();
13+
14+
public StoredId Id { get; }
15+
public Action Suspend { get; }
16+
public QueueManager QueueManager { get; }
17+
public int Threads { get; private set; }
18+
public int SuspendedThreads { get; private set; }
19+
public FlowTimeouts Timeouts { get; }
20+
public AsyncSignal Signal { get; } = new();
21+
22+
public FlowState(
23+
StoredId id,
24+
Action suspend,
25+
QueueManager queueManager,
26+
int threads,
27+
int suspendedThreads,
28+
FlowTimeouts timeouts)
29+
{
30+
Id = id;
31+
Suspend = suspend;
32+
QueueManager = queueManager;
33+
Threads = threads;
34+
SuspendedThreads = suspendedThreads;
35+
Timeouts = timeouts;
36+
}
37+
38+
public void NewThreadStarted()
39+
{
40+
lock (_lock)
41+
Threads++;
42+
}
43+
44+
public void ThreadCompleted()
45+
{
46+
lock (_lock)
47+
Threads--;
48+
}
49+
50+
public async Task ThreadSuspended()
51+
{
52+
lock (_lock)
53+
SuspendedThreads++;
54+
55+
await Signal.Wait();
56+
}
57+
58+
public void Interrupt()
59+
{
60+
lock (_lock)
61+
SuspendedThreads = 0;
62+
63+
Signal.Fire();
64+
}
65+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99

1010
namespace Cleipnir.ResilientFunctions.CoreRuntime;
1111

12-
public record FlowStatus(StoredId Id, Action Suspend, QueueManager QueueManager, int Threads, int SuspendedThreads, FlowTimeouts Timeouts);
13-
1412
public class FlowsManager : IDisposable
1513
{
16-
private readonly Dictionary<StoredId, FlowStatus> _dict = new();
14+
private readonly Dictionary<StoredId, FlowState> _dict = new();
1715
private readonly Lock _lock = new();
1816
private readonly UtcNow _utcNow;
1917
private volatile bool _disposed;
@@ -28,14 +26,14 @@ private async Task TimeoutCheckLoop()
2826
{
2927
while (!_disposed)
3028
{
31-
var expiredStatuses = new List<FlowStatus>();
29+
var expiredStates = new List<FlowState>();
3230
var now = _utcNow();
3331
lock (_lock)
3432
foreach (var (_, status) in _dict)
3533
if (status.Timeouts.HasExpiredTimeouts(now))
36-
expiredStatuses.Add(status);
34+
expiredStates.Add(status);
3735

38-
foreach (var status in expiredStatuses)
36+
foreach (var status in expiredStates)
3937
status.Timeouts.SignalExpiredTimeouts(now);
4038

4139
await Task.Delay(10);
@@ -47,7 +45,7 @@ private async Task TimeoutCheckLoop()
4745
public void AddFlow(StoredId id, Action suspend, QueueManager queueManager, FlowTimeouts timeouts)
4846
{
4947
lock (_lock)
50-
_dict[id] = new FlowStatus(id, suspend, queueManager, Threads: 1, SuspendedThreads: 0, timeouts);
48+
_dict[id] = new FlowState(id, suspend, queueManager, threads: 1, suspendedThreads: 0, timeouts);
5149
}
5250

5351
public void RemoveFlow(StoredId id)
@@ -62,62 +60,53 @@ public void Interrupt(IEnumerable<StoredId> ids)
6260
{
6361
foreach (var id in ids)
6462
{
65-
if (!_dict.ContainsKey(id))
63+
if (!_dict.TryGetValue(id, out var flowState))
6664
continue;
67-
68-
var queueManager = _dict[id].QueueManager;
69-
InterruptThreads(id);
70-
Task.Run(() => queueManager.FetchMessagesOnce());
65+
66+
flowState.Interrupt();
67+
Task.Run(() => flowState.QueueManager.FetchMessagesOnce());
7168
}
69+
7270
}
7371
}
7472

7573
public void StartThread(StoredId id)
7674
{
7775
lock (_lock)
78-
{
79-
if (!_dict.ContainsKey(id))
80-
return;
81-
82-
var status = _dict[id];
83-
_dict[id] = status with { Threads = status.Threads + 1 };
84-
}
76+
if (_dict.TryGetValue(id, out var flowState))
77+
flowState.NewThreadStarted();
8578
}
8679

8780
public void CompleteThread(StoredId id)
8881
{
8982
lock (_lock)
90-
{
91-
if (!_dict.ContainsKey(id))
92-
return;
93-
94-
var status = _dict[id];
95-
_dict[id] = status with { Threads = status.Threads - 1 };
96-
}
83+
if (_dict.TryGetValue(id, out var flowState))
84+
flowState.ThreadCompleted();
9785
}
9886

99-
public void InterruptThreads(StoredId id)
87+
public async Task SuspendThread(StoredId id)
10088
{
89+
FlowState? flowState;
10190
lock (_lock)
102-
{
103-
if (!_dict.ContainsKey(id))
104-
return;
91+
_dict.TryGetValue(id, out flowState);
10592

106-
var status = _dict[id];
107-
_dict[id] = status with { SuspendedThreads = 0 };
108-
}
93+
if (flowState != null)
94+
await flowState.ThreadSuspended();
10995
}
11096

111-
public void SuspendThread(StoredId id)
97+
public Task GetInterruptedSignal(StoredId id)
11298
{
11399
lock (_lock)
114-
{
115-
if (!_dict.ContainsKey(id))
116-
return;
100+
return _dict.TryGetValue(id, out var flowState)
101+
? flowState.Signal.Wait()
102+
: Task.CompletedTask;
103+
}
117104

118-
var status = _dict[id];
119-
_dict[id] = status with { SuspendedThreads = status.SuspendedThreads + 1 };
120-
}
105+
public void SignalInterrupt(StoredId id)
106+
{
107+
lock (_lock)
108+
if (_dict.TryGetValue(id, out var flowState))
109+
flowState.Signal.Fire();
121110
}
122111

123112
[DoesNotReturn]

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ async Task Inner()
5656

5757
var maxWait = suspend ? TimeSpan.Zero : (TimeSpan?)null;
5858
await Effect.FlowTimeouts.AddTimeout(timeoutId, expiry.ToDateTime(), maxWait);
59-
59+
6060
await Effect.Upsert(timeoutId, -1L, alias, flush: false);
6161
Effect.FlowTimeouts.RemoveTimeout(timeoutId);
6262
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace Cleipnir.ResilientFunctions.Helpers;
5+
6+
public class AsyncSignal
7+
{
8+
private TaskCompletionSource _tcs = new();
9+
private readonly Lock _lock = new();
10+
11+
public async Task Wait()
12+
{
13+
Task task;
14+
lock (_lock)
15+
task = _tcs.Task;
16+
17+
await task;
18+
}
19+
20+
public void Fire()
21+
{
22+
TaskCompletionSource tcs;
23+
lock (_lock)
24+
{
25+
tcs = _tcs;
26+
_tcs = new();
27+
}
28+
29+
tcs.SetResult();
30+
}
31+
}

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class QueueManager(
4242
private volatile bool _disposed;
4343

4444
private volatile Exception? _thrownException = null;
45-
private TaskCompletionSource _pulse = new(TaskCreationOptions.RunContinuationsAsynchronously);
4645

4746
private async Task Initialize()
4847
{
@@ -152,24 +151,28 @@ public async Task FetchMessagesOnce()
152151
}
153152
finally
154153
{
155-
PulseAll();
154+
flowsManager.SignalInterrupt(storedId);
156155
_semaphoreSlim.Release();
157156
}
158157
}
159158

160159
private (MessageData? Matched, int PositionToRemoveIndex, Task PulseTask) TryTakeMessage(MessagePredicate predicate)
161160
{
161+
var interruptedSignal = flowsManager.GetInterruptedSignal(storedId);
162+
162163
lock (_lock)
163164
{
164165
for (var i = 0; i < _toDeliver.Count; i++)
165166
if (predicate(_toDeliver[i].Envelope))
166167
{
167168
var matched = _toDeliver[i];
168169
_toDeliver.RemoveAt(i);
169-
return (matched, _nextToRemoveIndex++, _pulse.Task);
170+
var positionToRemoveIndex = _nextToRemoveIndex++;
171+
effect.FlushlessUpsert(_toRemoveNextIndex, _nextToRemoveIndex, alias: null);
172+
return (matched, positionToRemoveIndex, interruptedSignal);
170173
}
171174

172-
return (null, 0, _pulse.Task);
175+
return (null, 0, interruptedSignal);
173176
}
174177
}
175178

@@ -221,17 +224,6 @@ public async Task AfterFlush()
221224
}
222225
}
223226

224-
private void PulseAll()
225-
{
226-
TaskCompletionSource oldPulse;
227-
lock (_lock)
228-
{
229-
oldPulse = _pulse;
230-
_pulse = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
231-
}
232-
oldPulse.TrySetResult();
233-
}
234-
235227
public async Task<Envelope?> Subscribe(
236228
MessagePredicate predicate,
237229
DateTime? timeout,
@@ -266,7 +258,6 @@ private void PulseAll()
266258
effect.FlushlessUpserts(
267259
new List<EffectResult>(
268260
[
269-
new EffectResult(_toRemoveNextIndex, positionToRemoveIndex, Alias: null),
270261
new EffectResult(toRemoveId, matched.Position, Alias: null),
271262
new EffectResult(messageId, matched.MessageContentBytes, Alias: null),
272263
new EffectResult(messageTypeId, matched.MessageTypeBytes, Alias: null),

0 commit comments

Comments
 (0)