Skip to content

Finalize worker metrics on coordinator channel EOS#524

Merged
gabotechs merged 2 commits into
mainfrom
gabrielmusat/fix-deadlock
Jul 3, 2026
Merged

Finalize worker metrics on coordinator channel EOS#524
gabotechs merged 2 commits into
mainfrom
gabrielmusat/fix-deadlock

Conversation

@gabotechs

@gabotechs gabotechs commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

Closes #521

The old lifecycle assumed every producer partition stream would be materialized and dropped. That still tends to hold in the current main branch because the WorkerConnectionPool owns eager demux/drop behavior, so there is no natural failing test here.

In the following PR:

The gRPC abstraction refactor changes stream ownership and returns already-demuxed streams through the protocol boundary, which exposes the latent issue: query output can finish while some producer partition streams never reach the final drop path, leaving the metrics oneshot unresolved.

Coordinator-channel EOS is the better task-finalization signal because it represents query-scoped shutdown independently of whether every partition stream was consumed or dropped normally.

Deadlock sequence (WARN: explained by Codex, I still don't understand it 100%)

Why the bug does not reproduce in main:

  1. Metrics are sent from the worker when the worker-side execution streams for a task finish or are dropped.
  2. In main, the remote gRPC path owns the full execute-task request lifecycle internally: once WorkerConnectionPool initializes a remote connection, the request is driven by the connection/demux machinery.
  3. Local self-calls are also effectively per-partition: the local path asks execute_task for one partition at a time instead of returning a batch of partition streams where some may never be polled.
  4. Because of that, the streams that count toward task finalization are normally materialized and eventually dropped.
  5. The metrics oneshot resolves before the coordinator needs the query-scoped coordinator->worker stream to close.
  6. drain_pending_tasks() can complete, and only after that the query-end guard is dropped. There is no dependency cycle.

Why the bug reproduces in #512:

  1. Metrics are still tied to the old final stream-drop path.
  2. The refactor makes WorkerChannel::execute_task return already-demuxed partition streams through the protocol boundary.
  3. The gRPC implementation keeps those streams lazy: it does not send the actual execute_task RPC until one returned partition stream is first polled.
  4. In shapes like custom_routing_join, DataFusion may construct some remote partition streams that are never polled.
  5. For those streams, the first-poll gate never opens, so the worker never receives the corresponding execute_task request.
  6. Since the worker never receives that request, it never materializes the worker-side execution streams whose drops would decrement the task-finalization counter.
  7. The worker therefore never sends metrics for that task, leaving the worker->coordinator response stream open.
  8. The coordinator is waiting in drain_pending_tasks() for that response stream to finish, while the query-end guard is still alive.
  9. If metrics are moved to coordinator-channel EOS without also dropping the query-end guard before draining, the worker then waits for coordinator->worker EOS, but that EOS is gated by the same query-end guard. That creates the cycle.

The following changes are made here:

  • Finalize worker task metrics when the coordinator channel reaches EOS instead of from the last partition stream drop.
  • Drop the query-end guard before draining coordinator background tasks so workers can observe the EOS being waited on.
  • Keep task invalidation tied to coordinator-channel shutdown.

I think this is something #509 also noticed

@gabotechs gabotechs force-pushed the gabrielmusat/fix-deadlock branch from 5e6a0a4 to 6166ac9 Compare July 2, 2026 13:33

@jayshrivastava jayshrivastava left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks right but I need some help filling in some blanks.

Before, query_coordinator.drain_pending_tasks().await?; would have to finish before dropping the _guard. The reason we deadlock is fundamentally because, after your changes, we don't guarantee that every partition in a task is run. The partition count in the worker would not hit 0, so the worker -> coordinator stream would be kept alive. Since you've deleted this partition counting, the problem goes away. We now drop the guard in the coordinator which kicks off everything, including metrics collection.

I'd like to understand why not all partitions are executed

  • The PR desc mentions custom_routing_join. What's that? Does that cause some partitions to possibly not be executed?
  • In your original AQE PR you mentioned join order swapping, did you actually implement that? I forgot
  • You also mentioned that some tasks may be set but not executed. Ex. if the SamplerExec sees no rows.

Comment thread src/worker/impl_execute_task.rs
@gabotechs

Copy link
Copy Markdown
Collaborator Author

The PR desc mentions custom_routing_join. What's that? Does that cause some partitions to possibly not be executed?

Ah yeah sorry, I should have stated that more clearly, it's one of the integration test that is showing the problem in the other PR (#512). It shows it because that test has a JOIN that drops some partitions of the probe side mid-query (I think).

In your original AQE PR you mentioned join order swapping, did you actually implement that? I forgot

Nope, I have a very drafty draft (https://github.com/datafusion-contrib/datafusion-distributed/tree/gabrielmusat/aqe), but no, not yet

You also mentioned that some tasks may be set but not executed. Ex. if the SamplerExec sees no rows.

Not really, I think it mostly happens just on JOINs that decide to drop the probe side early. Today we force-execute all the partitions of the probe side regarding of whether they will be consumed at some point or not, however future implementations of the worker protocol might decide to not force-execute them, which is what happens on the next PR.
(disclaimer: I'm not 100% sure that statement is right, my understanding about this problem is a bit fuzzy)

Comment thread src/worker/impl_coordinator_channel.rs
Comment thread src/worker/impl_execute_task.rs
@gabotechs gabotechs force-pushed the gabrielmusat/fix-deadlock branch from 43d7c33 to 2b108ea Compare July 2, 2026 18:18
@gabotechs gabotechs merged commit ad0de11 into main Jul 3, 2026
31 checks passed
@gabotechs gabotechs deleted the gabrielmusat/fix-deadlock branch July 3, 2026 10:13
@gabotechs

Copy link
Copy Markdown
Collaborator Author

Updated the PR description with a better explanation about what happened.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dynamic planning can deadlock on set-but-never-executed stages

2 participants