Skip to content

Commit 68ae905

Browse files
committed
Centralize suspension through FlowsManager.Suspend
1 parent 34b24b2 commit 68ae905

12 files changed

Lines changed: 85 additions & 59 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,8 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
572572
unhandledExceptionHandler,
573573
new FlowTimeouts(),
574574
() => DateTime.UtcNow,
575-
SettingsWithDefaults.Default
575+
SettingsWithDefaults.Default,
576+
new FlowsManager(() => DateTime.UtcNow)
576577
);
577578

578579
var queueClient = await queueManager.CreateQueueClient();
@@ -633,7 +634,8 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
633634
unhandledExceptionHandler,
634635
minimumTimeout,
635636
() => DateTime.UtcNow,
636-
SettingsWithDefaults.Default
637+
SettingsWithDefaults.Default,
638+
new FlowsManager(() => DateTime.UtcNow)
637639
);
638640

639641
var queueClient = await queueManager.CreateQueueClient();
@@ -690,7 +692,8 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
690692
unhandledExceptionHandler,
691693
new FlowTimeouts(),
692694
() => DateTime.UtcNow,
693-
SettingsWithDefaults.Default
695+
SettingsWithDefaults.Default,
696+
new FlowsManager(() => DateTime.UtcNow)
694697
);
695698

696699
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
4343
unhandledExceptionHandler,
4444
new FlowTimeouts(),
4545
() => DateTime.UtcNow,
46-
SettingsWithDefaults.Default
46+
SettingsWithDefaults.Default,
47+
new FlowsManager(() => DateTime.UtcNow)
4748
);
4849

4950
queueClient = await queueManager.CreateQueueClient();
@@ -92,7 +93,8 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
9293
unhandledExceptionHandler,
9394
flowTimeouts,
9495
() => DateTime.UtcNow,
95-
SettingsWithDefaults.Default
96+
SettingsWithDefaults.Default,
97+
new FlowsManager(() => DateTime.UtcNow)
9698
);
9799

98100
var flowsManager = new FlowsManager(() => DateTime.UtcNow);
@@ -138,7 +140,8 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
138140
unhandledExceptionHandler,
139141
flowTimeouts,
140142
() => DateTime.UtcNow,
141-
SettingsWithDefaults.Default
143+
SettingsWithDefaults.Default,
144+
new FlowsManager(() => DateTime.UtcNow)
142145
);
143146

144147
var flowsManager = new FlowsManager(() => DateTime.UtcNow);
@@ -189,7 +192,8 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
189192
unhandledExceptionHandler,
190193
new FlowTimeouts(),
191194
() => DateTime.UtcNow,
192-
SettingsWithDefaults.Default
195+
SettingsWithDefaults.Default,
196+
new FlowsManager(() => DateTime.UtcNow)
193197
);
194198

195199
var queueClient = await queueManager.CreateQueueClient();
@@ -239,7 +243,8 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
239243
unhandledExceptionHandler,
240244
new FlowTimeouts(),
241245
() => DateTime.UtcNow,
242-
SettingsWithDefaults.Default
246+
SettingsWithDefaults.Default,
247+
new FlowsManager(() => DateTime.UtcNow)
243248
);
244249

245250
var queueClient = await queueManager.CreateQueueClient();
@@ -290,7 +295,8 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
290295
unhandledExceptionHandler,
291296
new FlowTimeouts(),
292297
() => DateTime.UtcNow,
293-
SettingsWithDefaults.Default
298+
SettingsWithDefaults.Default,
299+
new FlowsManager(() => DateTime.UtcNow)
294300
);
295301

296302
var queueClient = await queueManager.CreateQueueClient();
@@ -342,7 +348,8 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
342348
unhandledExceptionHandler,
343349
new FlowTimeouts(),
344350
() => DateTime.UtcNow,
345-
SettingsWithDefaults.Default
351+
SettingsWithDefaults.Default,
352+
new FlowsManager(() => DateTime.UtcNow)
346353
);
347354

348355
var queueClient = await queueManager.CreateQueueClient();
@@ -392,7 +399,8 @@ async Task (workflow) =>
392399
unhandledExceptionHandler,
393400
new FlowTimeouts(),
394401
() => DateTime.UtcNow,
395-
SettingsWithDefaults.Default
402+
SettingsWithDefaults.Default,
403+
new FlowsManager(() => DateTime.UtcNow)
396404
);
397405

398406
var queueClient = await queueManager.CreateQueueClient();
@@ -454,7 +462,8 @@ async Task (workflow) =>
454462
unhandledExceptionHandler,
455463
new FlowTimeouts(),
456464
() => DateTime.UtcNow,
457-
SettingsWithDefaults.Default
465+
SettingsWithDefaults.Default,
466+
new FlowsManager(() => DateTime.UtcNow)
458467
);
459468

460469
var queueClient = await queueManager.CreateQueueClient();
@@ -518,7 +527,8 @@ async Task (workflow) =>
518527
unhandledExceptionHandler,
519528
new FlowTimeouts(),
520529
() => DateTime.UtcNow,
521-
SettingsWithDefaults.Default
530+
SettingsWithDefaults.Default,
531+
new FlowsManager(() => DateTime.UtcNow)
522532
);
523533

524534
var queueClient = await queueManager.CreateQueueClient();
@@ -544,7 +554,8 @@ async Task (workflow) =>
544554
unhandledExceptionHandler,
545555
new FlowTimeouts(),
546556
() => DateTime.UtcNow,
547-
SettingsWithDefaults.Default
557+
SettingsWithDefaults.Default,
558+
new FlowsManager(() => DateTime.UtcNow)
548559
);
549560

550561
var queueClient = await queueManager.CreateQueueClient();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ await store.EffectsStore.GetEffectResults(storedId),
332332
session,
333333
clearChildren: true
334334
);
335-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
335+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
336336

337337
effect.TryGet<int>("alias", out _).ShouldBeFalse();
338338

@@ -380,7 +380,7 @@ public async Task ExistingEffectsFuncIsOnlyInvokedAfterGettingValue(Task<IFuncti
380380
storageSession: null,
381381
clearChildren: true
382382
);
383-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
383+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
384384

385385
// Verify the effect is immediately available (eager loading)
386386
effect.TryGet<int>("test_alias", out var result).ShouldBeTrue();
@@ -714,7 +714,7 @@ public async Task CaptureUsingAtLeastOnceWithoutFlushResiliencyDelaysFlush(Task<
714714
session,
715715
clearChildren: true
716716
);
717-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
717+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
718718

719719
var result = await effect.Capture(() => "hello world", ResiliencyLevel.AtLeastOnceDelayFlush);
720720
result.ShouldBe("hello world");

Core/Cleipnir.ResilientFunctions.Tests/UtilsTests/PrintEffectsTests.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void PrintSingleCompletedEffect()
4343
clearChildren: true
4444
);
4545

46-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
46+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
4747
var output = effect.ExecutionTree();
4848

4949
var expected = "└─ ✓ [1]\n";
@@ -75,7 +75,7 @@ public void PrintEffectWithAlias()
7575
clearChildren: true
7676
);
7777

78-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
78+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
7979
var output = effect.ExecutionTree();
8080

8181
var expected = "└─ ✓ [1] my-effect\n";
@@ -111,7 +111,7 @@ public void PrintFailedEffect()
111111
clearChildren: true
112112
);
113113

114-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
114+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
115115
var output = effect.ExecutionTree();
116116

117117
var expected = "└─ ✗ [1] failed-operation (System.InvalidOperationException)\n";
@@ -143,7 +143,7 @@ public void PrintStartedEffect()
143143
clearChildren: true
144144
);
145145

146-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
146+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
147147
var output = effect.ExecutionTree();
148148

149149
var expected = "└─ ⋯ [1] in-progress\n";
@@ -189,7 +189,7 @@ public void PrintEffectHierarchy()
189189
clearChildren: true
190190
);
191191

192-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
192+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
193193
var output = effect.ExecutionTree();
194194

195195
var expected =
@@ -245,7 +245,7 @@ public void PrintDeepEffectHierarchy()
245245
clearChildren: true
246246
);
247247

248-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
248+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
249249
var output = effect.ExecutionTree();
250250

251251
var expected =
@@ -295,7 +295,7 @@ public void PrintMultipleRootEffects()
295295
clearChildren: true
296296
);
297297

298-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
298+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
299299
var output = effect.ExecutionTree();
300300

301301
var expected =
@@ -330,7 +330,7 @@ public void PrintComplexEffectTree()
330330
clearChildren: true
331331
);
332332

333-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
333+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
334334
var output = effect.ExecutionTree();
335335

336336
var expected =
@@ -365,7 +365,7 @@ public void PrintEffectTreeWithMissingIntermediateEffect()
365365
clearChildren: false
366366
);
367367

368-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
368+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
369369
var output = effect.ExecutionTree();
370370

371371
var expected =
@@ -398,7 +398,7 @@ public void PrintEffectTreeWithMultipleMissingAncestors()
398398
clearChildren: false
399399
);
400400

401-
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts());
401+
var effect = new Effect(effectResults, utcNow: () => DateTime.UtcNow, new FlowTimeouts(), new FlowsManager(() => DateTime.UtcNow), storedId);
402402
var output = effect.ExecutionTree();
403403

404404
var expected =

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics.CodeAnalysis;
34
using System.Threading;
45
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
57
using Cleipnir.ResilientFunctions.Queuing;
68
using Cleipnir.ResilientFunctions.Storage;
79

@@ -118,4 +120,7 @@ public void SuspendThread(StoredId id)
118120
}
119121
}
120122

123+
[DoesNotReturn]
124+
public async Task Suspend(StoredId id) => throw new SuspendInvocationException();
125+
121126
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ await _functionStore.BulkScheduleFunctions(
393393
public MessageWriter CreateMessageWriter(StoredId storedId)
394394
=> new MessageWriter(storedId, _functionStore.MessageStore, Serializer);
395395

396-
public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<StoredEffect> storedEffects, FlowTimeouts flowTimeouts, IStorageSession? storageSession)
396+
public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<StoredEffect> storedEffects, FlowTimeouts flowTimeouts, IStorageSession? storageSession, FlowsManager flowsManager)
397397
{
398398
var effectsStore = _functionStore.EffectsStore;
399399

@@ -406,8 +406,8 @@ public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList<Store
406406
storageSession,
407407
_clearChildren
408408
);
409-
410-
var effect = new Effect(effectResults, UtcNow, flowTimeouts);
409+
410+
var effect = new Effect(effectResults, UtcNow, flowTimeouts, flowsManager, storedId);
411411
return effect;
412412
}
413413

@@ -430,11 +430,11 @@ public async Task<ExistingSemaphores> CreateExistingSemaphores(FlowId flowId)
430430
return new ExistingSemaphores(MapToStoredId(flowId), _functionStore, existingEffects);
431431
}
432432

433-
public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect)
434-
=> new(effect, _functionStore.SemaphoreStore, storedId, Interrupt);
433+
public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect, FlowsManager flowsManager)
434+
=> new(effect, _functionStore.SemaphoreStore, storedId, Interrupt, flowsManager);
435435

436-
public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler)
437-
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, unhandledExceptionHandler, timeouts, UtcNow, _settings);
436+
public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler, FlowsManager flowsManager)
437+
=> new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, unhandledExceptionHandler, timeouts, UtcNow, _settings, flowsManager);
438438

439439
public StoredId MapToStoredId(FlowId flowId) => StoredId.Create(_storedType, flowId.Instance.Value);
440440

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,17 +277,19 @@ await _invocationHelper.PersistFunctionInStore(
277277
success = persisted;
278278

279279
var flowTimeouts = new FlowTimeouts();
280+
280281
var effect = _invocationHelper.CreateEffect(
281282
storedId,
282283
flowId,
283284
initialState == null ? [] : _invocationHelper.MapInitialEffects(initialState.Effects, flowId),
284285
flowTimeouts,
285-
storageSession
286+
storageSession,
287+
_flowsManager
286288
);
287289

288290
var correlations = _invocationHelper.CreateCorrelations(flowId);
289-
var semaphores = _invocationHelper.CreateSemaphores(storedId, effect);
290-
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler);
291+
var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager);
292+
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler, _flowsManager);
291293
disposables.Add(queueManager);
292294
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);
293295
var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, semaphores, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager);
@@ -333,11 +335,12 @@ private async Task<PreparedReInvocation> PrepareForReInvocation(StoredId storedI
333335
disposables.Add(isWorkflowRunningDisposable);
334336

335337
var flowTimeouts = new FlowTimeouts();
336-
var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession);
338+
339+
var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, _flowsManager);
337340

338341
var correlations = _invocationHelper.CreateCorrelations(flowId);
339-
var semaphores = _invocationHelper.CreateSemaphores(storedId, effect);
340-
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler);
342+
var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager);
343+
var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowTimeouts, _unhandledExceptionHandler, _flowsManager);
341344
disposables.Add(queueManager);
342345
var messageWriter = _invocationHelper.CreateMessageWriter(storedId);
343346

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System;
22
using System.Threading.Tasks;
33
using Cleipnir.ResilientFunctions.Domain;
4-
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
54
using Cleipnir.ResilientFunctions.Messaging;
65
using Cleipnir.ResilientFunctions.Helpers;
76
using Cleipnir.ResilientFunctions.Queuing;
@@ -62,7 +61,7 @@ async Task Inner()
6261
{
6362
var delay = (expiry.ToDateTime() - _utcNow()).RoundUpToZero();
6463
if (delay > TimeSpan.Zero)
65-
throw new SuspendInvocationException();
64+
await _flowsManager.Suspend(StoredId);
6665
}
6766

6867
await Effect.Upsert(timeoutId, -1L, alias, flush: false);

0 commit comments

Comments
 (0)