FLOGO-18450: Concurrent execution of parallel transition branches#218
Open
awakchau-tibco wants to merge 3 commits into
Open
FLOGO-18450: Concurrent execution of parallel transition branches#218awakchau-tibco wants to merge 3 commits into
awakchau-tibco wants to merge 3 commits into
Conversation
Activities on parallel transitions (fan-out branches) previously executed
sequentially even though the flow models them as parallel. This adds opt-in
concurrent execution of ready branches, gated behind the existing
FLOGO_FLOW_CONCURRENT_TASK_EXECUTION env flag (default off).
When the flag is off the sequential DoStep loop is byte-for-byte unchanged, so
no existing flow regresses unless explicitly opted in. When on, FlowAction.Run
drives a new IndependentInstance.RunConcurrent worker pool that runs ready tasks
concurrently while preserving join semantics (a task runs only after all its
incoming links resolve) and order-independent results.
- support/env.go: GetConcurrentExecution(); instance/util.go
IsConcurrentTaskExcutionEnabled delegates to it (one source of truth, shared
with the change-tracker lock).
- Two-lock design on IndependentInstance: coarse stateLock (traversal/scheduling,
taskInsts/linkInsts/subflows) + attrsLock RWMutex (shared scope + returnData),
both nil in sequential mode. Lock order: stateLock -> changeTracker -> attrsLock.
- stepID/wiCounter/subflowCtr promoted to atomics.
- Per-task cancellable context (TaskInst.evalCtx) so context-aware activities
abort on sibling failure.
- Drain-then-fail: first unhandled branch error cancels siblings, the pool waits
for in-flight branches to return, then the global error handler fires once.
- execTask and the new execTaskConcurrent share extracted evalTaskBehavior +
handleEvalResult helpers (removes duplication).
- Tests: unit + integration (parallelism, join-once, drain-then-fail, sequential
parity), ~87% coverage of the new code; clean under -race. Also fixed
pre-existing map-order-flaky TestGetErrorObject_* (def.Tasks()[0] ->
def.GetTask("LogStart")).
- Benchmarks + a when-to-enable guide (instance/PARALLEL_BENCHMARKS.md).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Adds opt-in concurrent execution of activities on parallel transition branches (fan-out) in the flow action. Branches modeled as parallel now run at the same time instead of one activity being processed at a time.
Why
The flow drains work sequentially, so independent branches were serialized — total latency equaled the sum of branch times. Concurrent execution reduces this to roughly the slowest branch, especially impactful for I/O-bound branches (delay / REST / DB).
Design & safety
FLOGO_FLOW_CONCURRENT_TASK_EXECUTIONenv flag (default off). When off, the sequentialDoSteploop is byte-for-byte unchanged, so no existing flow regresses unless explicitly opted in.FlowAction.Rundrives a newIndependentInstance.RunConcurrentworker pool (min(GOMAXPROCS, 32)workers) that runs ready tasks concurrently while preserving join semantics (a task runs only after all its incoming links resolve) and order-independent results.support/env.go:GetConcurrentExecution();instance/util.goIsConcurrentTaskExcutionEnableddelegates to it (one source of truth, shared with the change-tracker lock).IndependentInstance: coarsestateLock(traversal/scheduling, taskInsts/linkInsts/subflows) +attrsLockRWMutex (shared scope + returnData), both nil in sequential mode. Lock order:stateLock -> changeTracker -> attrsLock.stepID/wiCounter/subflowCtrpromoted to atomics.TaskInst.evalCtx) so context-aware activities abort on sibling failure.execTaskand the newexecTaskConcurrentshare extractedevalTaskBehavior+handleEvalResulthelpers (removes duplication).Testing
go test -race.TestGetErrorObject_*(def.Tasks()[0]->def.GetTask("LogStart")).Performance
instance/PARALLEL_BENCHMARKS.md. Crossover ≈ 50–100 µs of work per branch; recommend enabling only for 2+ branches each doing blocking I/O or ≳ ~100 µs of CPU.🤖 Generated with Claude Code