Skip to content

Abstract worker protocol from gRPC#512

Draft
gabotechs wants to merge 1 commit into
mainfrom
gabrielmusat/abstract-grpc
Draft

Abstract worker protocol from gRPC#512
gabotechs wants to merge 1 commit into
mainfrom
gabrielmusat/abstract-grpc

Conversation

@gabotechs

@gabotechs gabotechs commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

Even if this PR looks big, it's mostly just mechanical changes that have been written fully by hand (no AI). The vast majority of this PR is moving files and code blocks around so that we can abstract away gRPC from the worker protocol.

There's just a few non-mechanical changes that where necessary though:

1. introduce ProducerHeadSpec (protocol level) as intermediary between ProducerHead (planner level) and pb::ProducerHead (grpc-specific level)

introduce the intermediate ProducerHeadSpec (protocol level) as a middle step between ProducerHead (planner level) and pb::ProducerHead (grpc-specific level). The problem is that the ProducerHead (planner level) contains Vec<Arc<dyn PhysicalExpr>>, that need the presence of a TaskContext and PhysicalExtensionCodecs to be encoded/decoded. This means that people implementing their own transport protocols would need to deal with all these TaskContext and PhysicalExtensionCodec inside their code for dealing with it, which is not ideal. Instead, we provide protocol implementors with ProducerHeadSpec (protocol level) with an already serialized Vec<u8> instead of Vec<Arc<dyn PhysicalExpr>>, so that they don't need to worry about encoding those themselves.

2. Metrics collection deadlock

This is addressed in a preliminary PR:

Before this refactor, worker metrics were emitted from Worker::execute_task when the last partition stream for a task was dropped. That implicitly assumed every producer partition stream would be materialized and then dropped. Once execute_task started returning already-demuxed streams and the local/in-memory path was simplified, custom_routing_join exposed a case where query results had completed but some upstream partition streams had not reached that last-drop path. The metrics oneshot never resolved, so the coordinator kept waiting for the worker-to-coordinator stream while draining pending tasks.

The fix is to treat coordinator-channel EOS as the task finalization signal for metrics/state cleanup. When the coordinator closes the query-scoped channel, the worker sends the collected metrics through the existing oneshot and invalidates task state. The coordinator also drops the query-end guard before draining pending tasks so that workers can observe the EOS it is waiting on.

Comment thread src/common/time.rs
Comment on lines -4 to +9
pub(crate) fn now_ns() -> u64 {
pub(crate) fn now_ns<T>() -> T
where
T: 'static + Copy,
u128: AsPrimitive<T>,
{

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now_ns() now needs to be used in places that require u64 and other places that require a usize, so it can work on both now. Thanks num_traits

}

impl ProducerHead {
pub(crate) fn to_spec(&self, cfg: &SessionConfig) -> Result<ProducerHeadSpec> {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See point 1 in PR description.

Comment on lines -68 to -81
// Unlike TaskContext, a DataFusion RuntimeEnv does not allow to introduce user-defined extensions.
// For the default implementation of the ChannelResolvers, we cannot inject one DefaultChannelResolver
// per TaskContext, as this holds reference to Tonic channels that must outlive a single TaskContext.
//
// The Tonic channels need to be established and reused under a whole RuntimeEnv scope, not a single
// TaskContext, which forces us to put the default implementation in a static global variable that
// stores and reuses tonic channels per RuntimeEnv's pointer address.
static DEFAULT_CHANNEL_RESOLVER_PER_RUNTIME: LazyLock<
moka::sync::Cache<
/* Arc<RuntimeEnv> pointer address */ usize,
/* ChannelResolver that reuses built channels */ Arc<DefaultChannelResolver>,
>,
> = LazyLock::new(|| moka::sync::Cache::builder().max_capacity(256).build());

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the code in this file was not deleted, it was just moved untouched somewhere else

Comment on lines +1 to +6
#!/usr/bin/env bash

set -e

repo_root=$(git rev-parse --show-toplevel)
cd "$repo_root" && cargo run --manifest-path src/protocol/grpc/observability/gen/Cargo.toml

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not new, moved untouched, not sure why this shows as an actual diff.

Comment on lines +162 to +178
impl TaskData {
/// Returns the number of partitions remaining to be processed.
fn num_partitions_remaining(&self) -> usize {
self.num_partitions_remaining.load(Ordering::SeqCst)
}

/// Returns the total number of partitions in this task.
fn total_partitions(&self) -> usize {
match self.final_plan.get() {
Some(Ok(plan)) => plan.output_partitioning().partition_count(),
_ => self
.base_plan
.properties()
.output_partitioning()
.partition_count(),
}
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not new, moved from other place untouched.

Comment thread src/worker/task_data.rs
Comment on lines -112 to -127
impl TaskData {
/// Returns the number of partitions remaining to be processed.
pub(crate) fn num_partitions_remaining(&self) -> usize {
self.num_partitions_remaining.load(Ordering::SeqCst)
}

/// Returns the total number of partitions in this task.
pub(crate) fn total_partitions(&self) -> usize {
match self.final_plan.get() {
Some(Ok(plan)) => plan.output_partitioning().partition_count(),
_ => self
.base_plan
.properties()
.output_partitioning()
.partition_count(),
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved somewhere else

};
let Some(producer_head) = body.producer_head.as_ref().cloned() else {
return internal_err!("Missing producer_head");
};

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diff is very unfortunate in this file...

Not much changed actually, just most of its contents that were gRPC-specific where copy-pasted to worker_service.rs.

There do is one thing that got removed related to point 2) in the PR descritpion, I'll add a comment some lines below

Comment thread src/worker/impl_execute_task.rs Outdated
Comment on lines -97 to -112
let stream = on_drop_stream(stream, move || {
// Stream was dropped before fully consumed -- see https://github.com/datafusion-contrib/datafusion-distributed/issues/412
// Send metrics via the coordinator channel so they are not lost.
if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
// Fire-and-forget background tokio task to handle async
// invalidate() within synchronous on_drop_stream.
#[allow(clippy::disallowed_methods)]
tokio::spawn(async move {
task_data_entries.invalidate(&key).await;
});
task_data_metrics.mark_execution_finished();
if send_metrics {
send_metrics_via_channel(&metrics_tx, &plan, d_ctx, &task_data_metrics);
}
}
});

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole on_drop_stream was there before and it was actually removed. It's now on impl_coordinator_chanel.rs because of point 2) in the PR description.

Comment on lines -165 to -179
struct RemoteWorkerConnection {
task: Arc<SpawnedTask<()>>,
not_consumed_streams: Arc<AtomicUsize>,
cancel_token: CancellationToken,
per_partition_rx: DashMap<usize, UnboundedReceiver<WorkerMsg>>,

first_poll_notify: Arc<Notify>,
// Signals the demux task that buffered memory has been freed by a consumer.
mem_available_notify: Arc<Notify>,

// Metrics collection stuff.
memory_reservation: Arc<MemoryReservation>,
elapsed_compute: Time,
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much all this file was moved to worker_client.rs. There's just a few bits left that were greatly simplified as it no longer makes sense to have here the WorkerConnection abstraction, the worker protocol + gRPC layer provides already all the abstractions we want.

///
/// ```
/// # use datafusion_distributed::Worker;
/// # use tonic::transport::Server;

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not removed, just copy-pasted somewhere else

@stuhood stuhood left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great: thanks a lot for picking up the torch on this one: I agree that it ended up better encapsulated.

Do you expect #432 to land before this one, or was basing it atop #432 an exercise to ensure that extracting trait WorkerChannel would not hamper landing that one?

@mdashti drafted paradedb#27 atop this one to additionally add an in-process/in-memory worker implementation, which allows the full test suite to run without grpc.

How can we help to get this landed?

Base automatically changed from gabrielmusat/dynamic-task-count to main July 1, 2026 06:27
@gabotechs gabotechs force-pushed the gabrielmusat/abstract-grpc branch 2 times, most recently from 112c941 to 6b563ad Compare July 1, 2026 16:43
@gabotechs gabotechs force-pushed the gabrielmusat/abstract-grpc branch 2 times, most recently from 6d7efd8 to 83fd703 Compare July 2, 2026 07:40
@gabotechs gabotechs changed the base branch from main to gabrielmusat/fix-deadlock July 2, 2026 07:41
@gabotechs gabotechs force-pushed the gabrielmusat/abstract-grpc branch 4 times, most recently from cb5bda7 to 8a3bfba Compare July 2, 2026 08:03
@gabotechs gabotechs force-pushed the gabrielmusat/fix-deadlock branch 2 times, most recently from 5e6a0a4 to 6166ac9 Compare July 2, 2026 13:33
@gabotechs gabotechs force-pushed the gabrielmusat/abstract-grpc branch 7 times, most recently from 5294d64 to 125b90f Compare July 3, 2026 07:52
gabotechs added a commit that referenced this pull request Jul 3, 2026
Closes
#521

The old lifecycle assumed every producer partition stream would be
materialized and dropped. That still tends to hold in the current `main`
branch because the WorkerConnectionPool owns eager demux/drop behavior,
so there is no natural failing test here.

In the following PR:
- #512

The gRPC abstraction refactor changes stream ownership and returns
already-demuxed streams through the protocol boundary, which exposes the
latent issue: query output can finish while some producer partition
streams never reach the final drop path, leaving the metrics oneshot
unresolved.

Coordinator-channel EOS is the better task-finalization signal because
it represents query-scoped shutdown independently of whether every
partition stream was consumed or dropped normally.

Deadlock sequence (WARN: explained by Codex, I still don't understand it
100%)

Why the bug does not reproduce in `main`:

1. Metrics are sent from the worker when the worker-side execution
streams for a task finish or are dropped.
2. In `main`, the remote gRPC path owns the full execute-task request
lifecycle internally: once `WorkerConnectionPool` initializes a remote
connection, the request is driven by the connection/demux machinery.
3. Local self-calls are also effectively per-partition: the local path
asks `execute_task` for one partition at a time instead of returning a
batch of partition streams where some may never be polled.
4. Because of that, the streams that count toward task finalization are
normally materialized and eventually dropped.
5. The metrics oneshot resolves before the coordinator needs the
query-scoped coordinator->worker stream to close.
6. `drain_pending_tasks()` can complete, and only after that the
query-end guard is dropped. There is no dependency cycle.

Why the bug reproduces in
#512:

1. Metrics are still tied to the old final stream-drop path.
2. The refactor makes `WorkerChannel::execute_task` return
already-demuxed partition streams through the protocol boundary.
3. The gRPC implementation keeps those streams lazy: it does not send
the actual `execute_task` RPC until one returned partition stream is
first polled.
4. In shapes like `custom_routing_join`, DataFusion may construct some
remote partition streams that are never polled.
5. For those streams, the first-poll gate never opens, so the worker
never receives the corresponding `execute_task` request.
6. Since the worker never receives that request, it never materializes
the worker-side execution streams whose drops would decrement the
task-finalization counter.
7. The worker therefore never sends metrics for that task, leaving the
worker->coordinator response stream open.
8. The coordinator is waiting in `drain_pending_tasks()` for that
response stream to finish, while the query-end guard is still alive.
9. If metrics are moved to coordinator-channel EOS without also dropping
the query-end guard before draining, the worker then waits for
coordinator->worker EOS, but that EOS is gated by the same query-end
guard. That creates the cycle.

The following changes are made here:
- Finalize worker task metrics when the coordinator channel reaches EOS
instead of from the last partition stream drop.
- Drop the query-end guard before draining coordinator background tasks
so workers can observe the EOS being waited on.
- Keep task invalidation tied to coordinator-channel shutdown.

I think this is something
#509
also noticed
Base automatically changed from gabrielmusat/fix-deadlock to main July 3, 2026 10:13
@gabotechs gabotechs force-pushed the gabrielmusat/abstract-grpc branch 2 times, most recently from cb55f58 to 6395232 Compare July 3, 2026 14:47
@gabotechs gabotechs force-pushed the gabrielmusat/abstract-grpc branch from 6395232 to 954e2ac Compare July 3, 2026 15:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants