Skip to content

Commit fc2aa65

Browse files
committed
Move suspension into FlowTimeouts.AddTimeout via maxWait parameter
1 parent 68ae905 commit fc2aa65

4 files changed

Lines changed: 17 additions & 23 deletions

File tree

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowTimeouts.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading;
55
using System.Threading.Tasks;
66
using Cleipnir.ResilientFunctions.Domain;
7+
using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands;
78

89
namespace Cleipnir.ResilientFunctions.CoreRuntime;
910

@@ -24,7 +25,7 @@ public DateTime? MinimumTimeout
2425
private DateTime? GetMinimumTimeout()
2526
=> Timeouts.Values.Count != 0 ? Timeouts.Values.Min(t => t.Item1) : (DateTime?)null;
2627

27-
public Task AddTimeout(EffectId effectId, DateTime timeout)
28+
public async Task AddTimeout(EffectId effectId, DateTime timeout, TimeSpan? maxWait = null)
2829
{
2930
TaskCompletionSource tcs;
3031
lock (_lock)
@@ -33,7 +34,15 @@ public Task AddTimeout(EffectId effectId, DateTime timeout)
3334
Timeouts[effectId] = Tuple.Create(timeout, tcs);
3435
}
3536

36-
return tcs.Task;
37+
if (maxWait == null || timeout <= DateTime.UtcNow)
38+
{
39+
await tcs.Task;
40+
return;
41+
}
42+
43+
var completed = await Task.WhenAny(tcs.Task, Task.Delay(maxWait.Value));
44+
if (completed != tcs.Task)
45+
throw new SuspendInvocationException();
3746
}
3847

3948
public void RemoveTimeout(EffectId effectId)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,8 @@ async Task Inner()
5454
return;
5555
}
5656

57-
var timeoutTask = Effect.FlowTimeouts.AddTimeout(timeoutId, expiry.ToDateTime());
58-
if (!suspend)
59-
await timeoutTask;
60-
else
61-
{
62-
var delay = (expiry.ToDateTime() - _utcNow()).RoundUpToZero();
63-
if (delay > TimeSpan.Zero)
64-
await _flowsManager.Suspend(StoredId);
65-
}
57+
var maxWait = suspend ? TimeSpan.Zero : (TimeSpan?)null;
58+
await Effect.FlowTimeouts.AddTimeout(timeoutId, expiry.ToDateTime(), maxWait);
6659

6760
await Effect.Upsert(timeoutId, -1L, alias, flush: false);
6861
Effect.FlowTimeouts.RemoveTimeout(timeoutId);

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ public class Effect(EffectResults effectResults, UtcNow utcNow, FlowTimeouts flo
2424
internal IEnumerable<EffectId> EffectIds => effectResults.EffectIds;
2525
internal FlowTimeouts FlowTimeouts => flowTimeouts;
2626

27-
internal Task Suspend() => flowsManager.Suspend(storedId);
28-
2927
internal WorkStatus? GetStatus(int id)
3028
{
3129
var effectId = CreateEffectId(id);

Core/Cleipnir.ResilientFunctions/Domain/RetryPolicy.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,7 @@ public async Task<T> Invoke<T>(Func<Task<T>> work, Effect effect, UtcNow utcNow,
108108
var hasDelayUntil = effect.TryGet<long>(delayUntilId, out var delayUntilValue);
109109
var delayUntil = hasDelayUntil ? delayUntilValue.ToDateTime() : DateTime.MinValue;
110110
if (hasDelayUntil && delayUntil > utcNow())
111-
{
112-
flowTimeouts.AddTimeout(delayUntilId, delayUntil);
113-
await effect.Suspend();
114-
}
111+
await flowTimeouts.AddTimeout(delayUntilId, delayUntil, maxWait: TimeSpan.Zero);
115112

116113
var iterationId = effect.CreateEffectId(1);
117114
var iteration = await effect.CreateOrGet(iterationId, 0, alias: null, flush: false);
@@ -149,12 +146,9 @@ await effect.Upserts(
149146
throw;
150147

151148
if (delay >= suspendThreshold)
152-
{
153-
flowTimeouts.AddTimeout(delayUntilId, delayUntil);
154-
await effect.Suspend();
155-
}
156-
157-
await Task.Delay(delay);
149+
await flowTimeouts.AddTimeout(delayUntilId, delayUntil, maxWait: TimeSpan.Zero);
150+
else
151+
await Task.Delay(delay);
158152
}
159153
}
160154
}

0 commit comments

Comments
 (0)