Skip to content

Commit d332cb7

Browse files
authored
Replace SignalSuspendedToInvoker callback with SuspendedTask on FlowState (#131)
1 parent 8f46c5e commit d332cb7

5 files changed

Lines changed: 28 additions & 27 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
574574
flowsManager
575575
);
576576

577-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
577+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
578578
var queueClient = await queueManager.CreateQueueClient();
579579

580580
var message = await queueClient.Pull<GoodMessage>(
@@ -637,7 +637,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
637637
flowsManager
638638
);
639639

640-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, minimumTimeout);
640+
flowsManager.AddFlow(workflow.StoredId, queueManager,minimumTimeout);
641641

642642
var queueClient = await queueManager.CreateQueueClient();
643643

@@ -698,7 +698,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
698698
flowsManager
699699
);
700700

701-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
701+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
702702
var queueClient = await queueManager.CreateQueueClient();
703703

704704
// Pull envelope for specific receiver

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
4949
flowsManager
5050
);
5151

52-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
52+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
5353
queueClient = await queueManager.CreateQueueClient();
5454
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
5555

@@ -101,7 +101,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
101101
flowsManager
102102
);
103103

104-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
104+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
105105
var queueClient = await queueManager.CreateQueueClient();
106106
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));
107107

@@ -148,7 +148,7 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
148148
flowsManager
149149
);
150150

151-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
151+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
152152
var queueClient = await queueManager.CreateQueueClient();
153153
var message = await queueClient.Pull<object>(
154154
workflow,
@@ -200,7 +200,7 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
200200
flowsManager
201201
);
202202

203-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
203+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
204204
var queueClient = await queueManager.CreateQueueClient();
205205
var message = await queueClient.Pull<object>(
206206
workflow,
@@ -253,7 +253,7 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
253253
flowsManager
254254
);
255255

256-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
256+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
257257
var queueClient = await queueManager.CreateQueueClient();
258258
var message = await queueClient.Pull<string>(
259259
workflow,
@@ -307,7 +307,7 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
307307
flowsManager
308308
);
309309

310-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
310+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
311311
var queueClient = await queueManager.CreateQueueClient();
312312
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
313313
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -363,7 +363,7 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
363363
flowsManager
364364
);
365365

366-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
366+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
367367
var queueClient = await queueManager.CreateQueueClient();
368368

369369
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -417,7 +417,7 @@ async Task (workflow) =>
417417
flowsManager
418418
);
419419

420-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
420+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
421421
var queueClient = await queueManager.CreateQueueClient();
422422
lock (queueClients)
423423
queueClients[workflow.FlowId.Instance.Value] = queueClient;
@@ -483,7 +483,7 @@ async Task (workflow) =>
483483
flowsManager
484484
);
485485

486-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
486+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
487487
var queueClient = await queueManager.CreateQueueClient();
488488

489489
while (true)
@@ -551,7 +551,7 @@ async Task (workflow) =>
551551
flowsManager
552552
);
553553

554-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
554+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
555555
var queueClient = await queueManager.CreateQueueClient();
556556

557557
for (var i = 0; i < 10; i++)
@@ -581,7 +581,7 @@ async Task (workflow) =>
581581
flowsManager
582582
);
583583

584-
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
584+
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
585585
var queueClient = await queueManager.CreateQueueClient();
586586

587587
for (var i = 0; i < 10; i++)

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowState.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System;
21
using System.Threading;
32
using System.Threading.Tasks;
43
using Cleipnir.ResilientFunctions.Helpers;
@@ -7,33 +6,33 @@
76

87
namespace Cleipnir.ResilientFunctions.CoreRuntime;
98

10-
internal class FlowState
9+
public class FlowState
1110
{
1211
private readonly Lock _lock = new();
12+
private readonly TaskCompletionSource _suspendedTcs = new();
1313

1414
public StoredId Id { get; }
15-
public Action SignalSuspendedToInvoker { get; }
1615
public QueueManager QueueManager { get; }
1716
public int Threads { get; private set; }
1817
public int SuspendedThreads { get; private set; }
1918
public FlowTimeouts Timeouts { get; }
2019
public AsyncSignal InterruptSignal { get; } = new();
2120
public bool Suspended { get; private set; }
21+
public Task SuspendedTask { get; }
2222

2323
public FlowState(
2424
StoredId id,
25-
Action signalSuspendedToInvoker,
2625
QueueManager queueManager,
2726
int threads,
2827
int suspendedThreads,
2928
FlowTimeouts timeouts)
3029
{
3130
Id = id;
32-
SignalSuspendedToInvoker = signalSuspendedToInvoker;
3331
QueueManager = queueManager;
3432
Threads = threads;
3533
SuspendedThreads = suspendedThreads;
3634
Timeouts = timeouts;
35+
SuspendedTask = _suspendedTcs.Task;
3736
}
3837

3938
public void NewThreadStarted()
@@ -83,4 +82,6 @@ public bool Suspend()
8382
lock (_lock)
8483
return Threads == SuspendedThreads && (Suspended = true);
8584
}
85+
86+
public void NotifySuspension() => _suspendedTcs.TrySetResult();
8687
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ private async Task TimeoutCheckLoop()
4444

4545
public void Dispose() => _disposed = true;
4646

47-
public void AddFlow(StoredId id, Action suspend, QueueManager queueManager, FlowTimeouts timeouts)
47+
public FlowState AddFlow(StoredId id, QueueManager queueManager, FlowTimeouts timeouts)
4848
{
4949
lock (_lock)
50-
_dict[id] = new FlowState(id, suspend, queueManager, threads: 1, suspendedThreads: 0, timeouts);
50+
return _dict[id] = new FlowState(id, queueManager, threads: 1, suspendedThreads: 0, timeouts);
5151
}
5252

5353
public void RemoveFlow(StoredId id)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
6262
return _invocationHelper.CreateInnerScheduled([flowId], parentWorkflow, detach);
6363

6464
var tcs = new TaskCompletionSource<TReturn>();
65-
_flowsManager.AddFlow(
65+
var flowState = _flowsManager.AddFlow(
6666
storedId,
67-
suspend: () => tcs.TrySetException(new InvocationSuspendedException(flowId)),
6867
queueManager,
6968
timeouts
7069
);
70+
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
7171
_ = Task.Run(async () =>
7272
{
7373
try
@@ -162,12 +162,12 @@ public async Task<InnerScheduled<TReturn>> ScheduleRestart(StoredId storedId)
162162
var flowId = new FlowId(_flowType, humanInstanceId);
163163

164164
var tcs = new TaskCompletionSource<TReturn>();
165-
_flowsManager.AddFlow(
165+
var flowState = _flowsManager.AddFlow(
166166
storedId,
167-
suspend: () => tcs.TrySetException(new InvocationSuspendedException(flowId)),
168167
queueManager,
169168
timeouts
170169
);
170+
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
171171
_ = Task.Run(async () =>
172172
{
173173
CurrentFlow._workflow.Value = workflow;
@@ -221,12 +221,12 @@ internal async Task ScheduleRestart(StoredId storedId, RestartedFunction rf, Act
221221
var flowId = new FlowId(_flowType, humanInstanceId);
222222

223223
var tcs = new TaskCompletionSource<TReturn>();
224-
_flowsManager.AddFlow(
224+
var flowState = _flowsManager.AddFlow(
225225
storedId,
226-
suspend: () => tcs.TrySetException(new InvocationSuspendedException(flowId)),
227226
queueManager,
228227
timeouts
229228
);
229+
_ = flowState.SuspendedTask.ContinueWith(_ => tcs.TrySetException(new InvocationSuspendedException(flowId)));
230230
_ = Task.Run(async () =>
231231
{
232232
CurrentFlow._workflow.Value = workflow;

0 commit comments

Comments
 (0)