Abstract worker protocol from gRPC#512
Conversation
| pub(crate) fn now_ns() -> u64 { | ||
| pub(crate) fn now_ns<T>() -> T | ||
| where | ||
| T: 'static + Copy, | ||
| u128: AsPrimitive<T>, | ||
| { |
There was a problem hiding this comment.
This now_ns() now needs to be used in places that require u64 and other places that require a usize, so it can work on both now. Thanks num_traits
| } | ||
|
|
||
| impl ProducerHead { | ||
| pub(crate) fn to_spec(&self, cfg: &SessionConfig) -> Result<ProducerHeadSpec> { |
There was a problem hiding this comment.
See point 1 in PR description.
| // Unlike TaskContext, a DataFusion RuntimeEnv does not allow to introduce user-defined extensions. | ||
| // For the default implementation of the ChannelResolvers, we cannot inject one DefaultChannelResolver | ||
| // per TaskContext, as this holds reference to Tonic channels that must outlive a single TaskContext. | ||
| // | ||
| // The Tonic channels need to be established and reused under a whole RuntimeEnv scope, not a single | ||
| // TaskContext, which forces us to put the default implementation in a static global variable that | ||
| // stores and reuses tonic channels per RuntimeEnv's pointer address. | ||
| static DEFAULT_CHANNEL_RESOLVER_PER_RUNTIME: LazyLock< | ||
| moka::sync::Cache< | ||
| /* Arc<RuntimeEnv> pointer address */ usize, | ||
| /* ChannelResolver that reuses built channels */ Arc<DefaultChannelResolver>, | ||
| >, | ||
| > = LazyLock::new(|| moka::sync::Cache::builder().max_capacity(256).build()); | ||
|
|
There was a problem hiding this comment.
All the code in this file was not deleted, it was just moved untouched somewhere else
| #!/usr/bin/env bash | ||
|
|
||
| set -e | ||
|
|
||
| repo_root=$(git rev-parse --show-toplevel) | ||
| cd "$repo_root" && cargo run --manifest-path src/protocol/grpc/observability/gen/Cargo.toml |
There was a problem hiding this comment.
Not new, moved untouched, not sure why this shows as an actual diff.
| impl TaskData { | ||
| /// Returns the number of partitions remaining to be processed. | ||
| fn num_partitions_remaining(&self) -> usize { | ||
| self.num_partitions_remaining.load(Ordering::SeqCst) | ||
| } | ||
|
|
||
| /// Returns the total number of partitions in this task. | ||
| fn total_partitions(&self) -> usize { | ||
| match self.final_plan.get() { | ||
| Some(Ok(plan)) => plan.output_partitioning().partition_count(), | ||
| _ => self | ||
| .base_plan | ||
| .properties() | ||
| .output_partitioning() | ||
| .partition_count(), | ||
| } | ||
| } |
There was a problem hiding this comment.
Not new, moved from other place untouched.
| impl TaskData { | ||
| /// Returns the number of partitions remaining to be processed. | ||
| pub(crate) fn num_partitions_remaining(&self) -> usize { | ||
| self.num_partitions_remaining.load(Ordering::SeqCst) | ||
| } | ||
|
|
||
| /// Returns the total number of partitions in this task. | ||
| pub(crate) fn total_partitions(&self) -> usize { | ||
| match self.final_plan.get() { | ||
| Some(Ok(plan)) => plan.output_partitioning().partition_count(), | ||
| _ => self | ||
| .base_plan | ||
| .properties() | ||
| .output_partitioning() | ||
| .partition_count(), | ||
| } |
There was a problem hiding this comment.
Moved somewhere else
| }; | ||
| let Some(producer_head) = body.producer_head.as_ref().cloned() else { | ||
| return internal_err!("Missing producer_head"); | ||
| }; |
There was a problem hiding this comment.
Diff is very unfortunate in this file...
Not much changed actually, just most of its contents that were gRPC-specific where copy-pasted to worker_service.rs.
There do is one thing that got removed related to point 2) in the PR descritpion, I'll add a comment some lines below
| let stream = on_drop_stream(stream, move || { | ||
| // Stream was dropped before fully consumed -- see https://github.com/datafusion-contrib/datafusion-distributed/issues/412 | ||
| // Send metrics via the coordinator channel so they are not lost. | ||
| if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 { | ||
| // Fire-and-forget background tokio task to handle async | ||
| // invalidate() within synchronous on_drop_stream. | ||
| #[allow(clippy::disallowed_methods)] | ||
| tokio::spawn(async move { | ||
| task_data_entries.invalidate(&key).await; | ||
| }); | ||
| task_data_metrics.mark_execution_finished(); | ||
| if send_metrics { | ||
| send_metrics_via_channel(&metrics_tx, &plan, d_ctx, &task_data_metrics); | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
This whole on_drop_stream was there before and it was actually removed. It's now on impl_coordinator_chanel.rs because of point 2) in the PR description.
| struct RemoteWorkerConnection { | ||
| task: Arc<SpawnedTask<()>>, | ||
| not_consumed_streams: Arc<AtomicUsize>, | ||
| cancel_token: CancellationToken, | ||
| per_partition_rx: DashMap<usize, UnboundedReceiver<WorkerMsg>>, | ||
|
|
||
| first_poll_notify: Arc<Notify>, | ||
| // Signals the demux task that buffered memory has been freed by a consumer. | ||
| mem_available_notify: Arc<Notify>, | ||
|
|
||
| // Metrics collection stuff. | ||
| memory_reservation: Arc<MemoryReservation>, | ||
| elapsed_compute: Time, | ||
| } | ||
|
|
There was a problem hiding this comment.
Pretty much all this file was moved to worker_client.rs. There's just a few bits left that were greatly simplified as it no longer makes sense to have here the WorkerConnection abstraction, the worker protocol + gRPC layer provides already all the abstractions we want.
| /// | ||
| /// ``` | ||
| /// # use datafusion_distributed::Worker; | ||
| /// # use tonic::transport::Server; |
There was a problem hiding this comment.
Not removed, just copy-pasted somewhere else
85ae73e to
a2aed2e
Compare
a2aed2e to
ede9176
Compare
ca5df89 to
2203b3b
Compare
ede9176 to
fa78bca
Compare
29bbd80 to
9a3269e
Compare
fa78bca to
569bbf6
Compare
9a3269e to
440e018
Compare
569bbf6 to
0e56a1b
Compare
There was a problem hiding this comment.
This looks great: thanks a lot for picking up the torch on this one: I agree that it ended up better encapsulated.
Do you expect #432 to land before this one, or was basing it atop #432 an exercise to ensure that extracting trait WorkerChannel would not hamper landing that one?
@mdashti drafted paradedb#27 atop this one to additionally add an in-process/in-memory worker implementation, which allows the full test suite to run without grpc.
How can we help to get this landed?
112c941 to
6b563ad
Compare
6d7efd8 to
83fd703
Compare
cb5bda7 to
8a3bfba
Compare
5e6a0a4 to
6166ac9
Compare
5294d64 to
125b90f
Compare
Closes #521 The old lifecycle assumed every producer partition stream would be materialized and dropped. That still tends to hold in the current `main` branch because the WorkerConnectionPool owns eager demux/drop behavior, so there is no natural failing test here. In the following PR: - #512 The gRPC abstraction refactor changes stream ownership and returns already-demuxed streams through the protocol boundary, which exposes the latent issue: query output can finish while some producer partition streams never reach the final drop path, leaving the metrics oneshot unresolved. Coordinator-channel EOS is the better task-finalization signal because it represents query-scoped shutdown independently of whether every partition stream was consumed or dropped normally. Deadlock sequence (WARN: explained by Codex, I still don't understand it 100%) Why the bug does not reproduce in `main`: 1. Metrics are sent from the worker when the worker-side execution streams for a task finish or are dropped. 2. In `main`, the remote gRPC path owns the full execute-task request lifecycle internally: once `WorkerConnectionPool` initializes a remote connection, the request is driven by the connection/demux machinery. 3. Local self-calls are also effectively per-partition: the local path asks `execute_task` for one partition at a time instead of returning a batch of partition streams where some may never be polled. 4. Because of that, the streams that count toward task finalization are normally materialized and eventually dropped. 5. The metrics oneshot resolves before the coordinator needs the query-scoped coordinator->worker stream to close. 6. `drain_pending_tasks()` can complete, and only after that the query-end guard is dropped. There is no dependency cycle. Why the bug reproduces in #512: 1. Metrics are still tied to the old final stream-drop path. 2. The refactor makes `WorkerChannel::execute_task` return already-demuxed partition streams through the protocol boundary. 3. The gRPC implementation keeps those streams lazy: it does not send the actual `execute_task` RPC until one returned partition stream is first polled. 4. In shapes like `custom_routing_join`, DataFusion may construct some remote partition streams that are never polled. 5. For those streams, the first-poll gate never opens, so the worker never receives the corresponding `execute_task` request. 6. Since the worker never receives that request, it never materializes the worker-side execution streams whose drops would decrement the task-finalization counter. 7. The worker therefore never sends metrics for that task, leaving the worker->coordinator response stream open. 8. The coordinator is waiting in `drain_pending_tasks()` for that response stream to finish, while the query-end guard is still alive. 9. If metrics are moved to coordinator-channel EOS without also dropping the query-end guard before draining, the worker then waits for coordinator->worker EOS, but that EOS is gated by the same query-end guard. That creates the cycle. The following changes are made here: - Finalize worker task metrics when the coordinator channel reaches EOS instead of from the last partition stream drop. - Drop the query-end guard before draining coordinator background tasks so workers can observe the EOS being waited on. - Keep task invalidation tied to coordinator-channel shutdown. I think this is something #509 also noticed
cb55f58 to
6395232
Compare
6395232 to
954e2ac
Compare
Even if this PR looks big, it's mostly just mechanical changes that have been written fully by hand (no AI). The vast majority of this PR is moving files and code blocks around so that we can abstract away gRPC from the worker protocol.
There's just a few non-mechanical changes that where necessary though:
1. introduce
ProducerHeadSpec(protocol level) as intermediary betweenProducerHead(planner level) andpb::ProducerHead(grpc-specific level)introduce the intermediate
ProducerHeadSpec(protocol level) as a middle step betweenProducerHead(planner level) andpb::ProducerHead(grpc-specific level). The problem is that theProducerHead(planner level) containsVec<Arc<dyn PhysicalExpr>>, that need the presence of aTaskContextandPhysicalExtensionCodecs to be encoded/decoded. This means that people implementing their own transport protocols would need to deal with all theseTaskContextandPhysicalExtensionCodecinside their code for dealing with it, which is not ideal. Instead, we provide protocol implementors withProducerHeadSpec(protocol level) with an already serializedVec<u8>instead ofVec<Arc<dyn PhysicalExpr>>, so that they don't need to worry about encoding those themselves.2. Metrics collection deadlock
This is addressed in a preliminary PR:
Before this refactor, worker metrics were emitted from
Worker::execute_taskwhen the last partition stream for a task was dropped. That implicitly assumed every producer partition stream would be materialized and then dropped. Onceexecute_taskstarted returning already-demuxed streams and the local/in-memory path was simplified,custom_routing_joinexposed a case where query results had completed but some upstream partition streams had not reached that last-drop path. The metrics oneshot never resolved, so the coordinator kept waiting for the worker-to-coordinator stream while draining pending tasks.The fix is to treat coordinator-channel EOS as the task finalization signal for metrics/state cleanup. When the coordinator closes the query-scoped channel, the worker sends the collected metrics through the existing oneshot and invalidates task state. The coordinator also drops the query-end guard before draining pending tasks so that workers can observe the EOS it is waiting on.