Skip to content

FLOGO-18450: Concurrent execution of parallel transition branches#218

Open
awakchau-tibco wants to merge 3 commits into
masterfrom
flogo-18450-concurrent-parallel-branches
Open

FLOGO-18450: Concurrent execution of parallel transition branches#218
awakchau-tibco wants to merge 3 commits into
masterfrom
flogo-18450-concurrent-parallel-branches

Conversation

@awakchau-tibco

Copy link
Copy Markdown
Collaborator

What

Adds opt-in concurrent execution of activities on parallel transition branches (fan-out) in the flow action. Branches modeled as parallel now run at the same time instead of one activity being processed at a time.

Why

The flow drains work sequentially, so independent branches were serialized — total latency equaled the sum of branch times. Concurrent execution reduces this to roughly the slowest branch, especially impactful for I/O-bound branches (delay / REST / DB).

Design & safety

  • Gated behind the existing FLOGO_FLOW_CONCURRENT_TASK_EXECUTION env flag (default off). When off, the sequential DoStep loop is byte-for-byte unchanged, so no existing flow regresses unless explicitly opted in.
  • When on, FlowAction.Run drives a new IndependentInstance.RunConcurrent worker pool (min(GOMAXPROCS, 32) workers) that runs ready tasks concurrently while preserving join semantics (a task runs only after all its incoming links resolve) and order-independent results.
  • support/env.go: GetConcurrentExecution(); instance/util.go IsConcurrentTaskExcutionEnabled delegates to it (one source of truth, shared with the change-tracker lock).
  • Two-lock design on IndependentInstance: coarse stateLock (traversal/scheduling, taskInsts/linkInsts/subflows) + attrsLock RWMutex (shared scope + returnData), both nil in sequential mode. Lock order: stateLock -> changeTracker -> attrsLock.
  • stepID / wiCounter / subflowCtr promoted to atomics.
  • Per-task cancellable context (TaskInst.evalCtx) so context-aware activities abort on sibling failure.
  • Drain-then-fail: the first unhandled branch error cancels siblings, the pool waits for in-flight branches to return, then the global error handler fires once.
  • execTask and the new execTaskConcurrent share extracted evalTaskBehavior + handleEvalResult helpers (removes duplication).

Testing

  • Unit + integration tests (parallelism, join-fires-once, drain-then-fail, sequential parity), ~87% coverage of the new code; clean under go test -race.
  • Fixed pre-existing map-order-flaky TestGetErrorObject_* (def.Tasks()[0] -> def.GetTask("LogStart")).

Performance

  • Benchmarks + a when-to-enable guide in instance/PARALLEL_BENCHMARKS.md. Crossover ≈ 50–100 µs of work per branch; recommend enabling only for 2+ branches each doing blocking I/O or ≳ ~100 µs of CPU.

🤖 Generated with Claude Code

awakchau-tibco and others added 3 commits June 10, 2026 00:21
Activities on parallel transitions (fan-out branches) previously executed
sequentially even though the flow models them as parallel. This adds opt-in
concurrent execution of ready branches, gated behind the existing
FLOGO_FLOW_CONCURRENT_TASK_EXECUTION env flag (default off).

When the flag is off the sequential DoStep loop is byte-for-byte unchanged, so
no existing flow regresses unless explicitly opted in. When on, FlowAction.Run
drives a new IndependentInstance.RunConcurrent worker pool that runs ready tasks
concurrently while preserving join semantics (a task runs only after all its
incoming links resolve) and order-independent results.

- support/env.go: GetConcurrentExecution(); instance/util.go
  IsConcurrentTaskExcutionEnabled delegates to it (one source of truth, shared
  with the change-tracker lock).
- Two-lock design on IndependentInstance: coarse stateLock (traversal/scheduling,
  taskInsts/linkInsts/subflows) + attrsLock RWMutex (shared scope + returnData),
  both nil in sequential mode. Lock order: stateLock -> changeTracker -> attrsLock.
- stepID/wiCounter/subflowCtr promoted to atomics.
- Per-task cancellable context (TaskInst.evalCtx) so context-aware activities
  abort on sibling failure.
- Drain-then-fail: first unhandled branch error cancels siblings, the pool waits
  for in-flight branches to return, then the global error handler fires once.
- execTask and the new execTaskConcurrent share extracted evalTaskBehavior +
  handleEvalResult helpers (removes duplication).
- Tests: unit + integration (parallelism, join-once, drain-then-fail, sequential
  parity), ~87% coverage of the new code; clean under -race. Also fixed
  pre-existing map-order-flaky TestGetErrorObject_* (def.Tasks()[0] ->
  def.GetTask("LogStart")).
- Benchmarks + a when-to-enable guide (instance/PARALLEL_BENCHMARKS.md).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant