Skip to content

Commit 6171e22

Browse files
authored
Add IFunctionStore to FlowsManager and call ResetInterrupted on interrupt (#128)
1 parent e7cc3bb commit 6171e22

6 files changed

Lines changed: 37 additions & 36 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
569569
new FlowTimeouts(),
570570
() => DateTime.UtcNow,
571571
SettingsWithDefaults.Default,
572-
new FlowsManager(() => DateTime.UtcNow)
572+
new FlowsManager(functionStore, () => DateTime.UtcNow)
573573
);
574574

575575
var queueClient = await queueManager.CreateQueueClient();
@@ -630,7 +630,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
630630
minimumTimeout,
631631
() => DateTime.UtcNow,
632632
SettingsWithDefaults.Default,
633-
new FlowsManager(() => DateTime.UtcNow)
633+
new FlowsManager(functionStore, () => DateTime.UtcNow)
634634
);
635635

636636
var queueClient = await queueManager.CreateQueueClient();
@@ -687,7 +687,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
687687
new FlowTimeouts(),
688688
() => DateTime.UtcNow,
689689
SettingsWithDefaults.Default,
690-
new FlowsManager(() => DateTime.UtcNow)
690+
new FlowsManager(functionStore, () => DateTime.UtcNow)
691691
);
692692

693693
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
4444
new FlowTimeouts(),
4545
() => DateTime.UtcNow,
4646
SettingsWithDefaults.Default,
47-
new FlowsManager(() => DateTime.UtcNow)
47+
new FlowsManager(functionStore, () => DateTime.UtcNow)
4848
);
4949

5050
queueClient = await queueManager.CreateQueueClient();
@@ -94,10 +94,10 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
9494
flowTimeouts,
9595
() => DateTime.UtcNow,
9696
SettingsWithDefaults.Default,
97-
new FlowsManager(() => DateTime.UtcNow)
97+
new FlowsManager(functionStore, () => DateTime.UtcNow)
9898
);
9999

100-
var flowsManager = new FlowsManager(() => DateTime.UtcNow);
100+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
101101
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
102102
var queueClient = await queueManager.CreateQueueClient();
103103
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));
@@ -141,10 +141,10 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
141141
flowTimeouts,
142142
() => DateTime.UtcNow,
143143
SettingsWithDefaults.Default,
144-
new FlowsManager(() => DateTime.UtcNow)
144+
new FlowsManager(functionStore, () => DateTime.UtcNow)
145145
);
146146

147-
var flowsManager = new FlowsManager(() => DateTime.UtcNow);
147+
var flowsManager = new FlowsManager(functionStore, () => DateTime.UtcNow);
148148
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
149149
var queueClient = await queueManager.CreateQueueClient();
150150
var message = await queueClient.Pull<object>(
@@ -192,7 +192,7 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
192192
new FlowTimeouts(),
193193
() => DateTime.UtcNow,
194194
SettingsWithDefaults.Default,
195-
new FlowsManager(() => DateTime.UtcNow)
195+
new FlowsManager(functionStore, () => DateTime.UtcNow)
196196
);
197197

198198
var queueClient = await queueManager.CreateQueueClient();
@@ -242,7 +242,7 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
242242
new FlowTimeouts(),
243243
() => DateTime.UtcNow,
244244
SettingsWithDefaults.Default,
245-
new FlowsManager(() => DateTime.UtcNow)
245+
new FlowsManager(functionStore, () => DateTime.UtcNow)
246246
);
247247

248248
var queueClient = await queueManager.CreateQueueClient();
@@ -293,7 +293,7 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
293293
new FlowTimeouts(),
294294
() => DateTime.UtcNow,
295295
SettingsWithDefaults.Default,
296-
new FlowsManager(() => DateTime.UtcNow)
296+
new FlowsManager(functionStore, () => DateTime.UtcNow)
297297
);
298298

299299
var queueClient = await queueManager.CreateQueueClient();
@@ -346,7 +346,7 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
346346
new FlowTimeouts(),
347347
() => DateTime.UtcNow,
348348
SettingsWithDefaults.Default,
349-
new FlowsManager(() => DateTime.UtcNow)
349+
new FlowsManager(functionStore, () => DateTime.UtcNow)
350350
);
351351

352352
var queueClient = await queueManager.CreateQueueClient();
@@ -397,7 +397,7 @@ async Task (workflow) =>
397397
new FlowTimeouts(),
398398
() => DateTime.UtcNow,
399399
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) },
400-
new FlowsManager(() => DateTime.UtcNow)
400+
new FlowsManager(functionStore, () => DateTime.UtcNow)
401401
);
402402

403403
var queueClient = await queueManager.CreateQueueClient();
@@ -460,7 +460,7 @@ async Task (workflow) =>
460460
new FlowTimeouts(),
461461
() => DateTime.UtcNow,
462462
SettingsWithDefaults.Default,
463-
new FlowsManager(() => DateTime.UtcNow)
463+
new FlowsManager(functionStore, () => DateTime.UtcNow)
464464
);
465465

466466
var queueClient = await queueManager.CreateQueueClient();
@@ -525,7 +525,7 @@ async Task (workflow) =>
525525
new FlowTimeouts(),
526526
() => DateTime.UtcNow,
527527
SettingsWithDefaults.Default,
528-
new FlowsManager(() => DateTime.UtcNow)
528+
new FlowsManager(functionStore, () => DateTime.UtcNow)
529529
);
530530

531531
var queueClient = await queueManager.CreateQueueClient();
@@ -552,7 +552,7 @@ async Task (workflow) =>
552552
new FlowTimeouts(),
553553
() => DateTime.UtcNow,
554554
SettingsWithDefaults.Default,
555-
new FlowsManager(() => DateTime.UtcNow)
555+
new FlowsManager(functionStore, () => DateTime.UtcNow)
556556
);
557557

558558
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
332332
session,
333333
clearChildren: true
334334
);
335-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
335+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
336336

337337
effect.TryGet<int>("alias", out _).ShouldBeFalse();
338338

@@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
380380
storageSession: null,
381381
clearChildren: true
382382
);
383-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
383+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
384384

385385
// Verify the effect is immediately available (eager loading)
386386
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
@@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
714714
session,
715715
clearChildren: true
716716
);
717-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
717+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(store, () => DateTime.UtcNow), storedId);
718718

719719
var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
720720
result.ShouldBe("hello world");

Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
4343
clearChildren: true
4444
);
4545

46-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
46+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
4747
var output = effect.ExecutionTree();
4848

4949
var expected = "└─ ✓ [1]\n";
@@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
7575
clearChildren: true
7676
);
7777

78-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
78+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
7979
var output = effect.ExecutionTree();
8080

8181
var expected = "└─ ✓ [1] my-effect\n";
@@ -111,7 +111,7 @@ public void PrintFailedEffect()
111111
clearChildren: true
112112
);
113113

114-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
114+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
115115
var output = effect.ExecutionTree();
116116

117117
var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
@@ -143,7 +143,7 @@ public void PrintStartedEffect()
143143
clearChildren: true
144144
);
145145

146-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
146+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
147147
var output = effect.ExecutionTree();
148148

149149
var expected = "└─ ⋯ [1] in-progress\n";
@@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
189189
clearChildren: true
190190
);
191191

192-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
192+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
193193
var output = effect.ExecutionTree();
194194

195195
var expected =
@@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
245245
clearChildren: true
246246
);
247247

248-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
248+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
249249
var output = effect.ExecutionTree();
250250

251251
var expected =
@@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
295295
clearChildren: true
296296
);
297297

298-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
298+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
299299
var output = effect.ExecutionTree();
300300

301301
var expected =
@@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
330330
clearChildren: true
331331
);
332332

333-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
333+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
334334
var output = effect.ExecutionTree();
335335

336336
var expected =
@@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
365365
clearChildren: false
366366
);
367367

368-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
368+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
369369
var output = effect.ExecutionTree();
370370

371371
var expected =
@@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
398398
clearChildren: false
399399
);
400400

401-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
401+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(new InMemoryFunctionStore(), () => DateTime.UtcNow), storedId);
402402
var output = effect.ExecutionTree();
403403

404404
var expected =

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ public class FlowsManager : IDisposable
1313
{
1414
private readonly Dictionary<StoredId, FlowState> _dict = new();
1515
private readonly Lock _lock = new();
16+
private readonly IFunctionStore _functionStore;
1617
private readonly UtcNow _utcNow;
1718
private volatile bool _disposed;
1819

19-
public FlowsManager(UtcNow utcNow)
20+
public FlowsManager(IFunctionStore functionStore, UtcNow utcNow)
2021
{
22+
_functionStore = functionStore;
2123
_utcNow = utcNow;
2224
_ = Task.Run(TimeoutCheckLoop);
2325
}
@@ -54,20 +56,19 @@ public void RemoveFlow(StoredId id)
5456
_dict.Remove(id);
5557
}
5658

57-
public void Interrupt(IEnumerable<StoredId> ids)
59+
public async Task Interrupt(IReadOnlyList<StoredId> ids)
5860
{
61+
await _functionStore.ResetInterrupted(ids);
62+
5963
lock (_lock)
60-
{
6164
foreach (var id in ids)
6265
{
6366
if (!_dict.TryGetValue(id, out var flowState))
6467
continue;
65-
68+
6669
flowState.Interrupt();
6770
Task.Run(() => flowState.QueueManager.FetchMessagesOnce());
6871
}
69-
70-
}
7172
}
7273

7374
public void StartThread(StoredId id)

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public FunctionsRegistry(IFunctionStore functionStore, Settings? settings = null
4040
_shutdownCoordinator = new ShutdownCoordinator();
4141
_settings = SettingsWithDefaults.Default.Merge(settings);
4242
var utcNow = _settings.UtcNow;
43-
_flowsManager = new FlowsManager(utcNow);
43+
_flowsManager = new FlowsManager(_functionStore, utcNow);
4444

4545
ClusterInfo = new ClusterInfo(ReplicaId.NewId());
4646

0 commit comments

Comments
 (0)