Skip to content

Commit 4e93e3a

Browse files
committed
Move FlowsManager thread notification into Effect.RunParallelle
1 parent 58981a5 commit 4e93e3a

2 files changed

Lines changed: 11 additions & 10 deletions

File tree

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,7 @@ public Task<T> Message<T>(Func<T, bool> filter, TimeSpan? maxWait = null) where
108108

109109
public Task AppendMessage(object msg, string? idempotencyKey = null) => MessageWriter.AppendMessage(msg, idempotencyKey);
110110

111-
public Task<T> Parallelle<T>(Func<Task<T>> work)
112-
{
113-
_flowsManager.StartThread(StoredId);
114-
return Effect.RunParallelle(work).ContinueWith(t =>
115-
{
116-
_flowsManager.CompleteThread(StoredId);
117-
return t.GetAwaiter().GetResult();
118-
});
119-
}
111+
public Task<T> Parallelle<T>(Func<Task<T>> work) => Effect.RunParallelle(work);
120112

121113
public string ExecutionTree()
122114
{

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,5 +276,14 @@ public async Task<TSeed> AggregateEach<T, TSeed>(
276276

277277
internal string ExecutionTree() => EffectPrinter.Print(effectResults);
278278

279-
public Task<T> RunParallelle<T>(Func<Task<T>> work) => Capture(() => Task.Run(work));
279+
public Task<T> RunParallelle<T>(Func<Task<T>> work)
280+
{
281+
flowsManager.StartThread(storedId);
282+
var task = Capture(() => Task.Run(work));
283+
return task.ContinueWith(t =>
284+
{
285+
flowsManager.CompleteThread(storedId);
286+
return t.GetAwaiter().GetResult();
287+
});
288+
}
280289
}

0 commit comments

Comments
 (0)