Skip to content

Commit 9d1a25e

Browse files
authored
Use FlowState directly in QueueManager (#132)
1 parent 95f53a3 commit 9d1a25e

8 files changed

Lines changed: 116 additions & 145 deletions

File tree

.claude/settings.local.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
"Bash(git commit -m \"$\\(cat <<''EOF''\nRemoved unused QueueFlag and associated tests\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>\nEOF\n\\)\")",
2323
"Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions diff)",
2424
"Bash(git -C /Users/stidsborg/Repos/Cleipnir.ResilientFunctions log --oneline -5)",
25-
"Bash(git commit:*)"
25+
"Bash(git commit:*)",
26+
"Bash(git checkout:*)",
27+
"Bash(git push:*)"
2628
],
2729
"deny": [],
2830
"ask": []

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -561,20 +561,20 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
561561

562562
var flowTimeouts = new FlowTimeouts();
563563
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
564+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
564565
var queueManager = new QueueManager(
565566
workflow.FlowId,
566567
workflow.StoredId,
567568
functionStore.MessageStore,
568569
exceptionThrowingSerializer,
569570
workflow.Effect,
571+
flowState,
570572
unhandledExceptionHandler,
571573
flowTimeouts,
572574
() => DateTime.UtcNow,
573-
SettingsWithDefaults.Default,
574-
flowsManager
575+
SettingsWithDefaults.Default
575576
);
576577

577-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
578578
var queueClient = await queueManager.CreateQueueClient();
579579

580580
var message = await queueClient.Pull<GoodMessage>(
@@ -624,20 +624,20 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
624624
storedId = workflow.StoredId;
625625
var minimumTimeout = new FlowTimeouts();
626626
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
627+
var flowState = flowsManager.CreateFlow(workflow.StoredId, minimumTimeout);
627628
var queueManager = new QueueManager(
628629
workflow.FlowId,
629630
workflow.StoredId,
630631
functionStore.MessageStore,
631632
DefaultSerializer.Instance,
632633
workflow.Effect,
634+
flowState,
633635
unhandledExceptionHandler,
634636
minimumTimeout,
635637
() => DateTime.UtcNow,
636-
SettingsWithDefaults.Default,
637-
flowsManager
638+
SettingsWithDefaults.Default
638639
);
639640

640-
flowsManager.AddFlow(workflow.StoredId, queueManager,minimumTimeout);
641641

642642
var queueClient = await queueManager.CreateQueueClient();
643643

@@ -685,20 +685,20 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
685685

686686
var flowTimeouts = new FlowTimeouts();
687687
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
688+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
688689
var queueManager = new QueueManager(
689690
workflow.FlowId,
690691
workflow.StoredId,
691692
functionStore.MessageStore,
692693
DefaultSerializer.Instance,
693694
workflow.Effect,
695+
flowState,
694696
unhandledExceptionHandler,
695697
flowTimeouts,
696698
() => DateTime.UtcNow,
697-
SettingsWithDefaults.Default,
698-
flowsManager
699+
SettingsWithDefaults.Default
699700
);
700701

701-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
702702
var queueClient = await queueManager.CreateQueueClient();
703703

704704
// Pull envelope for specific receiver

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,20 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
3636

3737
var flowTimeouts = new FlowTimeouts();
3838
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
39+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
3940
var queueManager = new QueueManager(
4041
workflow.FlowId,
4142
workflow.StoredId,
4243
functionStore.MessageStore,
4344
DefaultSerializer.Instance,
4445
workflow.Effect,
46+
flowState,
4547
unhandledExceptionHandler,
4648
flowTimeouts,
4749
() => DateTime.UtcNow,
48-
SettingsWithDefaults.Default,
49-
flowsManager
50+
SettingsWithDefaults.Default
5051
);
5152

52-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
5353
queueClient = await queueManager.CreateQueueClient();
5454
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
5555

@@ -88,20 +88,20 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
8888

8989
var flowTimeouts = new FlowTimeouts();
9090
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
91+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
9192
var queueManager = new QueueManager(
9293
workflow.FlowId,
9394
workflow.StoredId,
9495
functionStore.MessageStore,
9596
DefaultSerializer.Instance,
9697
workflow.Effect,
98+
flowState,
9799
unhandledExceptionHandler,
98100
flowTimeouts,
99101
() => DateTime.UtcNow,
100-
SettingsWithDefaults.Default,
101-
flowsManager
102+
SettingsWithDefaults.Default
102103
);
103104

104-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
105105
var queueClient = await queueManager.CreateQueueClient();
106106
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));
107107

@@ -135,20 +135,20 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
135135

136136
var flowTimeouts = new FlowTimeouts();
137137
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
138+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
138139
var queueManager = new QueueManager(
139140
workflow.FlowId,
140141
workflow.StoredId,
141142
functionStore.MessageStore,
142143
DefaultSerializer.Instance,
143144
workflow.Effect,
145+
flowState,
144146
unhandledExceptionHandler,
145147
flowTimeouts,
146148
() => DateTime.UtcNow,
147-
SettingsWithDefaults.Default,
148-
flowsManager
149+
SettingsWithDefaults.Default
149150
);
150151

151-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
152152
var queueClient = await queueManager.CreateQueueClient();
153153
var message = await queueClient.Pull<object>(
154154
workflow,
@@ -187,20 +187,20 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
187187

188188
var flowTimeouts = new FlowTimeouts();
189189
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
190+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
190191
var queueManager = new QueueManager(
191192
workflow.FlowId,
192193
workflow.StoredId,
193194
functionStore.MessageStore,
194195
DefaultSerializer.Instance,
195196
workflow.Effect,
197+
flowState,
196198
unhandledExceptionHandler,
197199
flowTimeouts,
198200
() => DateTime.UtcNow,
199-
SettingsWithDefaults.Default,
200-
flowsManager
201+
SettingsWithDefaults.Default
201202
);
202203

203-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
204204
var queueClient = await queueManager.CreateQueueClient();
205205
var message = await queueClient.Pull<object>(
206206
workflow,
@@ -240,20 +240,20 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
240240

241241
var flowTimeouts = new FlowTimeouts();
242242
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
243+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
243244
var queueManager = new QueueManager(
244245
workflow.FlowId,
245246
workflow.StoredId,
246247
functionStore.MessageStore,
247248
DefaultSerializer.Instance,
248249
workflow.Effect,
250+
flowState,
249251
unhandledExceptionHandler,
250252
flowTimeouts,
251253
() => DateTime.UtcNow,
252-
SettingsWithDefaults.Default,
253-
flowsManager
254+
SettingsWithDefaults.Default
254255
);
255256

256-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
257257
var queueClient = await queueManager.CreateQueueClient();
258258
var message = await queueClient.Pull<string>(
259259
workflow,
@@ -294,20 +294,20 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
294294

295295
var flowTimeouts = new FlowTimeouts();
296296
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
297+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
297298
var queueManager = new QueueManager(
298299
workflow.FlowId,
299300
workflow.StoredId,
300301
functionStore.MessageStore,
301302
DefaultSerializer.Instance,
302303
workflow.Effect,
304+
flowState,
303305
unhandledExceptionHandler,
304306
flowTimeouts,
305307
() => DateTime.UtcNow,
306-
SettingsWithDefaults.Default,
307-
flowsManager
308+
SettingsWithDefaults.Default
308309
);
309310

310-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
311311
var queueClient = await queueManager.CreateQueueClient();
312312
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
313313
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -350,20 +350,20 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
350350

351351
var flowTimeouts = new FlowTimeouts();
352352
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
353+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
353354
var queueManager = new QueueManager(
354355
workflow.FlowId,
355356
workflow.StoredId,
356357
functionStore.MessageStore,
357358
DefaultSerializer.Instance,
358359
workflow.Effect,
360+
flowState,
359361
unhandledExceptionHandler,
360362
flowTimeouts,
361363
() => DateTime.UtcNow,
362-
SettingsWithDefaults.Default,
363-
flowsManager
364+
SettingsWithDefaults.Default
364365
);
365366

366-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
367367
var queueClient = await queueManager.CreateQueueClient();
368368

369369
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
@@ -404,20 +404,20 @@ async Task (workflow) =>
404404

405405
var flowTimeouts = new FlowTimeouts();
406406
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
407+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
407408
var queueManager = new QueueManager(
408409
workflow.FlowId,
409410
workflow.StoredId,
410411
functionStore.MessageStore,
411412
DefaultSerializer.Instance,
412413
workflow.Effect,
414+
flowState,
413415
unhandledExceptionHandler,
414416
flowTimeouts,
415417
() => DateTime.UtcNow,
416-
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) },
417-
flowsManager
418+
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) }
418419
);
419420

420-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
421421
var queueClient = await queueManager.CreateQueueClient();
422422
lock (queueClients)
423423
queueClients[workflow.FlowId.Instance.Value] = queueClient;
@@ -470,20 +470,20 @@ async Task (workflow) =>
470470

471471
var flowTimeouts = new FlowTimeouts();
472472
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
473+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
473474
var queueManager = new QueueManager(
474475
workflow.FlowId,
475476
workflow.StoredId,
476477
functionStore.MessageStore,
477478
DefaultSerializer.Instance,
478479
workflow.Effect,
480+
flowState,
479481
unhandledExceptionHandler,
480482
flowTimeouts,
481483
() => DateTime.UtcNow,
482-
SettingsWithDefaults.Default,
483-
flowsManager
484+
SettingsWithDefaults.Default
484485
);
485486

486-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
487487
var queueClient = await queueManager.CreateQueueClient();
488488

489489
while (true)
@@ -538,20 +538,20 @@ async Task (workflow) =>
538538

539539
var flowTimeouts = new FlowTimeouts();
540540
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
541+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
541542
var queueManager = new QueueManager(
542543
workflow.FlowId,
543544
workflow.StoredId,
544545
functionStore.MessageStore,
545546
DefaultSerializer.Instance,
546547
workflow.Effect,
548+
flowState,
547549
unhandledExceptionHandler,
548550
flowTimeouts,
549551
() => DateTime.UtcNow,
550-
SettingsWithDefaults.Default,
551-
flowsManager
552+
SettingsWithDefaults.Default
552553
);
553554

554-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
555555
var queueClient = await queueManager.CreateQueueClient();
556556

557557
for (var i = 0; i < 10; i++)
@@ -568,20 +568,20 @@ async Task (workflow) =>
568568

569569
var flowTimeouts = new FlowTimeouts();
570570
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
571+
var flowState = flowsManager.CreateFlow(workflow.StoredId, flowTimeouts);
571572
var queueManager = new QueueManager(
572573
workflow.FlowId,
573574
workflow.StoredId,
574575
functionStore.MessageStore,
575576
DefaultSerializer.Instance,
576577
workflow.Effect,
578+
flowState,
577579
unhandledExceptionHandler,
578580
flowTimeouts,
579581
() => DateTime.UtcNow,
580-
SettingsWithDefaults.Default,
581-
flowsManager
582+
SettingsWithDefaults.Default
582583
);
583584

584-
flowsManager.AddFlow(workflow.StoredId, queueManager,flowTimeouts);
585585
var queueClient = await queueManager.CreateQueueClient();
586586

587587
for (var i = 0; i < 10; i++)

0 commit comments

Comments
 (0)