Skip to content

Commit 8f46c5e

Browse files
authored
Add IFunctionStore to FlowsManager and rework suspend/interrupt lifecycle (#129)
1 parent 6171e22 commit 8f46c5e

5 files changed

Lines changed: 115 additions & 48 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -559,19 +559,22 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
559559
inner: async Task<string> (string _, Workflow workflow) =>
560560
{
561561

562+
var flowTimeouts = new FlowTimeouts();
563+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
562564
var queueManager = new QueueManager(
563565
workflow.FlowId,
564566
workflow.StoredId,
565567
functionStore.MessageStore,
566568
exceptionThrowingSerializer,
567569
workflow.Effect,
568570
unhandledExceptionHandler,
569-
new FlowTimeouts(),
571+
flowTimeouts,
570572
() => DateTime.UtcNow,
571573
SettingsWithDefaults.Default,
572-
new FlowsManager(functionStore, () => DateTime.UtcNow)
574+
flowsManager
573575
);
574576

577+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
575578
var queueClient = await queueManager.CreateQueueClient();
576579

577580
var message = await queueClient.Pull<GoodMessage>(
@@ -620,6 +623,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
620623
{
621624
storedId = workflow.StoredId;
622625
var minimumTimeout = new FlowTimeouts();
626+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
623627
var queueManager = new QueueManager(
624628
workflow.FlowId,
625629
workflow.StoredId,
@@ -630,9 +634,11 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
630634
minimumTimeout,
631635
() => DateTime.UtcNow,
632636
SettingsWithDefaults.Default,
633-
new FlowsManager(functionStore, () => DateTime.UtcNow)
637+
flowsManager
634638
);
635639

640+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, minimumTimeout);
641+
636642
var queueClient = await queueManager.CreateQueueClient();
637643

638644
// Verify timeout is not set before pull
@@ -677,19 +683,22 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
677683
inner: async Task<string> (string _, Workflow workflow) =>
678684
{
679685

686+
var flowTimeouts = new FlowTimeouts();
687+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
680688
var queueManager = new QueueManager(
681689
workflow.FlowId,
682690
workflow.StoredId,
683691
functionStore.MessageStore,
684692
DefaultSerializer.Instance,
685693
workflow.Effect,
686694
unhandledExceptionHandler,
687-
new FlowTimeouts(),
695+
flowTimeouts,
688696
() => DateTime.UtcNow,
689697
SettingsWithDefaults.Default,
690-
new FlowsManager(functionStore, () => DateTime.UtcNow)
698+
flowsManager
691699
);
692700

701+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
693702
var queueClient = await queueManager.CreateQueueClient();
694703

695704
// Pull envelope for specific receiver

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,22 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
3434
inner: async Task<string> (string _, Workflow workflow) =>
3535
{
3636

37+
var flowTimeouts = new FlowTimeouts();
38+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
3739
var queueManager = new QueueManager(
3840
workflow.FlowId,
3941
workflow.StoredId,
4042
functionStore.MessageStore,
4143
DefaultSerializer.Instance,
4244
workflow.Effect,
4345
unhandledExceptionHandler,
44-
new FlowTimeouts(),
46+
flowTimeouts,
4547
() => DateTime.UtcNow,
4648
SettingsWithDefaults.Default,
47-
new FlowsManager(functionStore, () => DateTime.UtcNow)
49+
flowsManager
4850
);
4951

52+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
5053
queueClient = await queueManager.CreateQueueClient();
5154
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
5255

@@ -84,6 +87,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
8487
{
8588

8689
var flowTimeouts = new FlowTimeouts();
90+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
8791
var queueManager = new QueueManager(
8892
workflow.FlowId,
8993
workflow.StoredId,
@@ -94,10 +98,9 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
9498
flowTimeouts,
9599
() => DateTime.UtcNow,
96100
SettingsWithDefaults.Default,
97-
new FlowsManager(functionStore, () => DateTime.UtcNow)
101+
flowsManager
98102
);
99103

100-
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
101104
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
102105
var queueClient = await queueManager.CreateQueueClient();
103106
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));
@@ -131,6 +134,7 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
131134
{
132135

133136
var flowTimeouts = new FlowTimeouts();
137+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
134138
var queueManager = new QueueManager(
135139
workflow.FlowId,
136140
workflow.StoredId,
@@ -141,10 +145,9 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
141145
flowTimeouts,
142146
() => DateTime.UtcNow,
143147
SettingsWithDefaults.Default,
144-
new FlowsManager(functionStore, () => DateTime.UtcNow)
148+
flowsManager
145149
);
146150

147-
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
148151
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
149152
var queueClient = await queueManager.CreateQueueClient();
150153
var message = await queueClient.Pull<object>(
@@ -182,19 +185,22 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
182185
inner: async Task<string> (string _, Workflow workflow) =>
183186
{
184187

188+
var flowTimeouts = new FlowTimeouts();
189+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
185190
var queueManager = new QueueManager(
186191
workflow.FlowId,
187192
workflow.StoredId,
188193
functionStore.MessageStore,
189194
DefaultSerializer.Instance,
190195
workflow.Effect,
191196
unhandledExceptionHandler,
192-
new FlowTimeouts(),
197+
flowTimeouts,
193198
() => DateTime.UtcNow,
194199
SettingsWithDefaults.Default,
195-
new FlowsManager(functionStore, () => DateTime.UtcNow)
200+
flowsManager
196201
);
197202

203+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
198204
var queueClient = await queueManager.CreateQueueClient();
199205
var message = await queueClient.Pull<object>(
200206
workflow,
@@ -232,19 +238,22 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
232238
inner: async Task<string> (string _, Workflow workflow) =>
233239
{
234240

241+
var flowTimeouts = new FlowTimeouts();
242+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
235243
var queueManager = new QueueManager(
236244
workflow.FlowId,
237245
workflow.StoredId,
238246
functionStore.MessageStore,
239247
DefaultSerializer.Instance,
240248
workflow.Effect,
241249
unhandledExceptionHandler,
242-
new FlowTimeouts(),
250+
flowTimeouts,
243251
() => DateTime.UtcNow,
244252
SettingsWithDefaults.Default,
245-
new FlowsManager(functionStore, () => DateTime.UtcNow)
253+
flowsManager
246254
);
247255

256+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
248257
var queueClient = await queueManager.CreateQueueClient();
249258
var message = await queueClient.Pull<string>(
250259
workflow,
@@ -283,19 +292,22 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
283292
{
284293
storedId = workflow.StoredId;
285294

295+
var flowTimeouts = new FlowTimeouts();
296+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
286297
var queueManager = new QueueManager(
287298
workflow.FlowId,
288299
workflow.StoredId,
289300
functionStore.MessageStore,
290301
DefaultSerializer.Instance,
291302
workflow.Effect,
292303
unhandledExceptionHandler,
293-
new FlowTimeouts(),
304+
flowTimeouts,
294305
() => DateTime.UtcNow,
295306
SettingsWithDefaults.Default,
296-
new FlowsManager(functionStore, () => DateTime.UtcNow)
307+
flowsManager
297308
);
298309

310+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
299311
var queueClient = await queueManager.CreateQueueClient();
300312
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
301313
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -336,19 +348,22 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
336348
{
337349
storedId = workflow.StoredId;
338350

351+
var flowTimeouts = new FlowTimeouts();
352+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
339353
var queueManager = new QueueManager(
340354
workflow.FlowId,
341355
workflow.StoredId,
342356
functionStore.MessageStore,
343357
DefaultSerializer.Instance,
344358
workflow.Effect,
345359
unhandledExceptionHandler,
346-
new FlowTimeouts(),
360+
flowTimeouts,
347361
() => DateTime.UtcNow,
348362
SettingsWithDefaults.Default,
349-
new FlowsManager(functionStore, () => DateTime.UtcNow)
363+
flowsManager
350364
);
351365

366+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
352367
var queueClient = await queueManager.CreateQueueClient();
353368

354369
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -387,19 +402,22 @@ protected async Task BatchedMessagesIsDeliveredToAwaitingFlows(Task<IFunctionSto
387402
async Task (workflow) =>
388403
{
389404

405+
var flowTimeouts = new FlowTimeouts();
406+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
390407
var queueManager = new QueueManager(
391408
workflow.FlowId,
392409
workflow.StoredId,
393410
functionStore.MessageStore,
394411
DefaultSerializer.Instance,
395412
workflow.Effect,
396413
unhandledExceptionHandler,
397-
new FlowTimeouts(),
414+
flowTimeouts,
398415
() => DateTime.UtcNow,
399416
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) },
400-
new FlowsManager(functionStore, () => DateTime.UtcNow)
417+
flowsManager
401418
);
402419

420+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
403421
var queueClient = await queueManager.CreateQueueClient();
404422
lock (queueClients)
405423
queueClients[workflow.FlowId.Instance.Value] = queueClient;
@@ -450,19 +468,22 @@ protected async Task MultipleMessagesCanBeAppendedOneAfterTheOther(Task<IFunctio
450468
async Task (workflow) =>
451469
{
452470

471+
var flowTimeouts = new FlowTimeouts();
472+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
453473
var queueManager = new QueueManager(
454474
workflow.FlowId,
455475
workflow.StoredId,
456476
functionStore.MessageStore,
457477
DefaultSerializer.Instance,
458478
workflow.Effect,
459479
unhandledExceptionHandler,
460-
new FlowTimeouts(),
480+
flowTimeouts,
461481
() => DateTime.UtcNow,
462482
SettingsWithDefaults.Default,
463-
new FlowsManager(functionStore, () => DateTime.UtcNow)
483+
flowsManager
464484
);
465485

486+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
466487
var queueClient = await queueManager.CreateQueueClient();
467488

468489
while (true)
@@ -515,19 +536,22 @@ protected async Task PingPongMessagesCanBeExchangedMultipleTimes(Task<IFunctionS
515536
async Task (workflow) =>
516537
{
517538

539+
var flowTimeouts = new FlowTimeouts();
540+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
518541
var queueManager = new QueueManager(
519542
workflow.FlowId,
520543
workflow.StoredId,
521544
functionStore.MessageStore,
522545
DefaultSerializer.Instance,
523546
workflow.Effect,
524547
unhandledExceptionHandler,
525-
new FlowTimeouts(),
548+
flowTimeouts,
526549
() => DateTime.UtcNow,
527550
SettingsWithDefaults.Default,
528-
new FlowsManager(functionStore, () => DateTime.UtcNow)
551+
flowsManager
529552
);
530553

554+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
531555
var queueClient = await queueManager.CreateQueueClient();
532556

533557
for (var i = 0; i < 10; i++)
@@ -542,19 +566,22 @@ async Task (workflow) =>
542566
async Task (workflow) =>
543567
{
544568

569+
var flowTimeouts = new FlowTimeouts();
570+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
545571
var queueManager = new QueueManager(
546572
workflow.FlowId,
547573
workflow.StoredId,
548574
functionStore.MessageStore,
549575
DefaultSerializer.Instance,
550576
workflow.Effect,
551577
unhandledExceptionHandler,
552-
new FlowTimeouts(),
578+
flowTimeouts,
553579
() => DateTime.UtcNow,
554580
SettingsWithDefaults.Default,
555-
new FlowsManager(functionStore, () => DateTime.UtcNow)
581+
flowsManager
556582
);
557583

584+
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
558585
var queueClient = await queueManager.CreateQueueClient();
559586

560587
for (var i = 0; i < 10; i++)

0 commit comments

Comments
 (0)