Finalize worker metrics on coordinator channel EOS#524
Conversation
e733def to
5e6a0a4
Compare
5e6a0a4 to
6166ac9
Compare
jayshrivastava
left a comment
There was a problem hiding this comment.
This looks right but I need some help filling in some blanks.
Before, query_coordinator.drain_pending_tasks().await?; would have to finish before dropping the _guard. The reason we deadlock is fundamentally because, after your changes, we don't guarantee that every partition in a task is run. The partition count in the worker would not hit 0, so the worker -> coordinator stream would be kept alive. Since you've deleted this partition counting, the problem goes away. We now drop the guard in the coordinator which kicks off everything, including metrics collection.
I'd like to understand why not all partitions are executed
- The PR desc mentions
custom_routing_join. What's that? Does that cause some partitions to possibly not be executed? - In your original AQE PR you mentioned join order swapping, did you actually implement that? I forgot
- You also mentioned that some tasks may be set but not executed. Ex. if the
SamplerExecsees no rows.
Ah yeah sorry, I should have stated that more clearly, it's one of the integration test that is showing the problem in the other PR (#512). It shows it because that test has a JOIN that drops some partitions of the probe side mid-query (I think).
Nope, I have a very drafty draft (https://github.com/datafusion-contrib/datafusion-distributed/tree/gabrielmusat/aqe), but no, not yet
Not really, I think it mostly happens just on JOINs that decide to drop the probe side early. Today we force-execute all the partitions of the probe side regarding of whether they will be consumed at some point or not, however future implementations of the worker protocol might decide to not force-execute them, which is what happens on the next PR. |
43d7c33 to
2b108ea
Compare
|
Updated the PR description with a better explanation about what happened. |
Closes #521
The old lifecycle assumed every producer partition stream would be materialized and dropped. That still tends to hold in the current
mainbranch because the WorkerConnectionPool owns eager demux/drop behavior, so there is no natural failing test here.In the following PR:
The gRPC abstraction refactor changes stream ownership and returns already-demuxed streams through the protocol boundary, which exposes the latent issue: query output can finish while some producer partition streams never reach the final drop path, leaving the metrics oneshot unresolved.
Coordinator-channel EOS is the better task-finalization signal because it represents query-scoped shutdown independently of whether every partition stream was consumed or dropped normally.
Deadlock sequence (WARN: explained by Codex, I still don't understand it 100%)
Why the bug does not reproduce in
main:main, the remote gRPC path owns the full execute-task request lifecycle internally: onceWorkerConnectionPoolinitializes a remote connection, the request is driven by the connection/demux machinery.execute_taskfor one partition at a time instead of returning a batch of partition streams where some may never be polled.drain_pending_tasks()can complete, and only after that the query-end guard is dropped. There is no dependency cycle.Why the bug reproduces in #512:
WorkerChannel::execute_taskreturn already-demuxed partition streams through the protocol boundary.execute_taskRPC until one returned partition stream is first polled.custom_routing_join, DataFusion may construct some remote partition streams that are never polled.execute_taskrequest.drain_pending_tasks()for that response stream to finish, while the query-end guard is still alive.The following changes are made here:
I think this is something #509 also noticed