feat: added a pluggable WorkerTransport, with Arrow Flight optional.#508
feat: added a pluggable WorkerTransport, with Arrow Flight optional.#508mdashti wants to merge 4 commits into
WorkerTransport, with Arrow Flight optional.#508Conversation
85c691d to
a5f6672
Compare
| /// This is the extension point the in-crate shared-memory transport (and its in-process test) executes | ||
| /// through: it owns the runtime and drives the head stage itself. The transport's dispatch must | ||
| /// complete synchronously: a transport that spawns background delivery work, or a plan that | ||
| /// declares work-unit feeds (which are pumped by that background work), is rejected rather than | ||
| /// left to stall. |
There was a problem hiding this comment.
This references code that will never be upstreamed, right?
There was a problem hiding this comment.
Right. Reworded it on its own terms in 6c4373d. It's the extension point an embedded in-process transport drives the head stage through.
| // One token per execution. In-process transports watch it to tear down the moment this | ||
| // output stream drops, whether the consumer read to the end or abandoned it early (a LIMIT | ||
| // gather). It rides the dispatch context so every worker fragment shares it. | ||
| let token = CancellationToken::new(); | ||
| let context = Arc::new(task_ctx_with_extension( | ||
| &context, | ||
| DistributedCancellationToken(token.clone()), | ||
| )); |
There was a problem hiding this comment.
Is this still necessary with the new per-stream cancellation that you implemented?
There was a problem hiding this comment.
Different scope. This one fires once, when the whole output stream drops, so an in-process transport tears down on an early LIMIT gather, not just clean EOF. The per-stream cancel lives in the shm transport, which isn't in this PR.
| /// which closes the coordinator->worker streams and propagates EOS to the workers so they can clean | ||
| /// up; the coordinator holds it until the query's result stream is drained. | ||
| #[derive(Default)] | ||
| pub(crate) struct FlightWorkerDispatch { |
There was a problem hiding this comment.
Is this supposed to be feature-flagged for flight, or is it not actually specific to that backend?
There was a problem hiding this comment.
It is flight-gated. The #[cfg(feature = "flight")] sits on mod query_coordinator; in coordinator/mod.rs, so it's easy to miss from the struct. Added a module note in 6c4373d so the gating's visible here too.
There was a problem hiding this comment.
A followup PR should probably split this module (and others with large swaths of feature flags) into transport and non-transport specific modules.
There was a problem hiding this comment.
Agreed, that's a good followup. I'd rather keep this PR to the abstraction and leave the module split as its own change, to make this PR easier to review, too. The follow-up can be a refactor-only change.
Carries the extension points a transport needs beyond plan delivery and reads: a per-execution cancellation token that `DistributedExec::execute` attaches and fires when the output stream drops, `NetworkBoundary` partition routing (`route_partition` / `partitions_per_consumer_task` / `PartitionRoute`), and `DistributedExec::prepare_in_process_plan` for a transport that drives the head stage itself. The default stays Arrow Flight; in-memory and shared-memory transports plug in through these extension points without further changes to the core.
This PR adds a `WorkerTransport` that hosts its workers in the current process: plans are delivered with a direct `Worker::set_task_plan` call and partitions are read straight from the local task registry, with no gRPC underneath. It is the reference implementation for the transport extension points and the basis for running distributed plans without the Flight stack. To keep the dispatch paths from drifting, plan delivery is factored into transport-neutral pieces both transports share: `encode_task_plan` (task specialization + codec), the `Worker::set_task_plan` core that the Flight coordinator stream now wraps, and `collect_task_work_unit_feeds`. The in-memory read side runs one `execute_local_task` over the whole partition range and pumps each partition into a buffer, so a consumer that interleaves partition polls of a partitioned join can't leave partitions empty.
With `flight` off there is no `tonic` / `arrow-flight` and the in-memory transport is the default, so distributed plans still run. The integration suite runs over the in-memory transport in both build configurations: `start_localhost_context` builds an `InMemoryWorkerTransport` cluster instead of a gRPC one, and the gRPC harness moved to `start_localhost_flight_context`. A `unit-test-flight-transport` job sets `DATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight` to keep full Flight coverage. For the no-flight suite to run, not just build, the `tpch` / `tpcds` / `clickbench` / `stateful_data_cleanup` dataset tests move into the benchmarks crate: as a dev-dependency of the library they re-enabled `flight` on every test build through feature unification. With that gone, `cargo test --no-default-features --features integration --lib --tests` is genuinely Flight-free, and a `unit-test-no-flight` job runs it. The only tests still gated on `flight` are the ones that need a real wire: the `URLEmitter` routing tests, which assert per-URL worker identity, and the network-boundary connection metrics (`bytes_transferred`, latency). The rest, including the worker-hook and metrics tests, register `InMemoryWorkerTransport` directly and run either way.
a5f6672 to
53ff817
Compare
The `prepare_in_process_plan` doc referenced an out-of-crate transport that won't be upstreamed; reworded it to stand on its own. The `query_coordinator` module note makes its `flight` gating visible without opening `mod.rs`.
| /// This is the extension point the in-crate shared-memory transport (and its in-process test) executes | ||
| /// through: it owns the runtime and drives the head stage itself. The transport's dispatch must | ||
| /// complete synchronously: a transport that spawns background delivery work, or a plan that | ||
| /// declares work-unit feeds (which are pumped by that background work), is rejected rather than | ||
| /// left to stall. |
There was a problem hiding this comment.
Right. Reworded it on its own terms in 6c4373d. It's the extension point an embedded in-process transport drives the head stage through.
| // One token per execution. In-process transports watch it to tear down the moment this | ||
| // output stream drops, whether the consumer read to the end or abandoned it early (a LIMIT | ||
| // gather). It rides the dispatch context so every worker fragment shares it. | ||
| let token = CancellationToken::new(); | ||
| let context = Arc::new(task_ctx_with_extension( | ||
| &context, | ||
| DistributedCancellationToken(token.clone()), | ||
| )); |
There was a problem hiding this comment.
Different scope. This one fires once, when the whole output stream drops, so an in-process transport tears down on an early LIMIT gather, not just clean EOF. The per-stream cancel lives in the shm transport, which isn't in this PR.
| /// which closes the coordinator->worker streams and propagates EOS to the workers so they can clean | ||
| /// up; the coordinator holds it until the query's result stream is drained. | ||
| #[derive(Default)] | ||
| pub(crate) struct FlightWorkerDispatch { |
There was a problem hiding this comment.
It is flight-gated. The #[cfg(feature = "flight")] sits on mod query_coordinator; in coordinator/mod.rs, so it's easy to miss from the struct. Added a module note in 6c4373d so the gating's visible here too.
There was a problem hiding this comment.
Agreed, that's a good followup. I'd rather keep this PR to the abstraction and leave the module split as its own change, to make this PR easier to review, too. The follow-up can be a refactor-only change.
WorkerTransport, with Arrow Flight optional.WorkerTransport, with Arrow Flight optional.
There was a problem hiding this comment.
Such a humongous work! thanks for putting this together 🙏
However, I want to steer the approach in a different direction. I see this PR takes more like a big-bang rewrite rather than focusing on just abstracting away the gRPC protocol, exposing a lot of the internals of the project in the way.
I think this is mainly because there might be better places for choosing the split point between what's a transport detail VS what's actually part of the core project.
I think taking an approach closer to what the original issue describes #483 (comment), has better chances of providing a more elegant abstraction point for introducing customization at the worker level.
However, it's very easy for me to say this if I've not been in the battlefield fighting against this problem, so my expectations might be a bit skewed.
Some traits that I think we should see in a PR that abstracts away the worker protocol is:
- 0 public API change. We can talk about what's worth making public in a follow up, but I think for the first one it's keeping the change scoped to the internals
- No rewrite of the existing abstractions. Rather than rewriting code that has been carefully thought to be future proof, I'd try to just adapt it to not be coupled to gRPC, which should yield very minor or 0 code changes to those pieces.
- Make the split point at the lowest level possible, mirroring the current gRPC spec. The resulting abstraction should look almost identical to what
prostautogenerates today out of the worker.proto specification, just happening that it's not coupled to gRPC and are normal Rust methods. - Almost no increase in LOC in the project. As the scope is not really to introduce new concepts, just hiding existing ones behind a trait, I would not expect there to be a huge positive diff in LOC, at most a few hundred.
As it's very easy for me to say this before actually trying something out, I'll actually try to ship something that adheres to this vision and see what blockers I hit in the way, and maybe I find out that I'm wrong in some assumptions.
Either way, thanks so much for this amount of work 🙏
| # The same suite over the Arrow-Flight gRPC transport. `start_localhost_context` switches to it | ||
| # when `DATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight`, so both transports get full coverage. | ||
| unit-test-flight-transport: |
There was a problem hiding this comment.
There might be a miss conception here. This project does not use the Arrow-Flight protocol, more info about that decision here:
We still rely on the arrow-flight library for IPC decoding, but we don't adhere to the Arrow Flight protocol
| pub use common::{ | ||
| TreeNodeExt, deserialize_uuid, get_distributed_cancellation_token, serialize_uuid, | ||
| }; | ||
| pub use config_extension_ext::get_config_extension_propagation_headers; | ||
| pub use coordinator::{ | ||
| CoordinatorToWorkerMetrics, DistributedExec, EncodedTaskPlan, LatencyMetric, MetricsStore, | ||
| encode_task_plan, | ||
| }; | ||
| pub use distributed_ext::DistributedExt; |
There was a problem hiding this comment.
This does not seem right, I see this is exposing a big amount of internals that are not supposed to be part of the public API.
Thinking about splitting up work for driving this effort forward, a good first step would be to ship a change that has 0 changes to the public API, and evaluate public API changes in isolation.
| /// head stage itself. The transport's dispatch must complete synchronously: a transport that | ||
| /// spawns background delivery work, or a plan that declares work-unit feeds (which are pumped by | ||
| /// that background work), is rejected rather than left to stall. | ||
| pub fn prepare_in_process_plan( |
There was a problem hiding this comment.
Plan preparation should be abstracted from the worker transport implementation.
I don't think we should be seeing methods named after specific transport protocols, instead, transport details should be gathered behind a trait, and code whose domain is planning, execution, etc... should not care whether the transport protocol is in-process, gRPC, or something else.
Also, functions here should not be public. DistributedExec must behave as a normal ExecutionPlan regarding planning and execution. Users should not care about calling foreing methods not part of datafusion unless it's strictly necessary.
| /// | ||
| /// The [QueryCoordinator]'s lifetime is scoped to a single query , and will instantiate independent | ||
| /// [StageCoordinator] scoped to each individual stage. | ||
| pub(super) struct QueryCoordinator { |
There was a problem hiding this comment.
I don't think we should be dismanteling QueryCoordinator. Probably the way to go is making QueryCoordinator not coupled to gRPC, but I do think it still serves well as an abstraction.
Thinking about future work, this is still going to be needed for dynamic planning, so I don't think we can just delete it.
| /// `P_c`: how many partitions each consumer task reads in the sliced layout | ||
| /// (`global = P_c * consumer_task + local`) that shuffle and broadcast reads use. Surfaced so a | ||
| /// transport that has to place a produced partition does not re-derive it from node properties. | ||
| /// Meaningless for [NetworkCoalesceExec], whose consumers read whole per-producer-task groups. | ||
| fn partitions_per_consumer_task(&self) -> usize { | ||
| self.properties().partitioning.partition_count() | ||
| } | ||
|
|
There was a problem hiding this comment.
This method is not necessary. NetworkBoundary already extends ExecutionPlan, which has the necessary methods for people to retrieve the partition count.
| /// Boundaries whose consumers do not read that layout must override this with an error; the | ||
| /// default would silently misroute them. A zero-partition boundary is a planner bug, so it | ||
| /// errors instead of routing everything to task `0`. | ||
| fn route_partition(&self, output_partition: usize) -> Result<PartitionRoute> { |
There was a problem hiding this comment.
🤔 I don't fully understand how this change is related to abstracting away the WorkerTransport. Is it really related?
If not, I'd suggest to avoid introducing unrelated changes in this PR and focus exclusively on abstracting away the gRPC protocol.
|
Gave it a try myself to implement something closer to what the original issue describes #483: I do think an approach closer to that has better chances of being successful, and the disruptions are minimal, both in terms of LOC and new concepts. |
What
This PR puts a pluggable
WorkerTransportin front of the worker comms, with Arrow Flight as one implementation behind a default-onflightfeature. It adds an in-memory transport and surfaces partition routing onNetworkBoundary.Why
The transport is hardcoded to Flight. The coordinator ships plans over gRPC and the consumer reads partitions through an internal, non-swappable connection, so an embedder can't supply its own transport without reaching into crate internals. The crate also can't build without
tonic/arrow-flight.Closed #483
How
WorkerTransportabstraction and a dispatch seam, with Flight as the reference implementation. No behaviour change for the Flight path.route_partition/partitions_per_consumer_taskonNetworkBoundary, so a transport can place a produced partition without re-deriving the slice formula by hand.start_localhost_contexthonoursDATAFUSION_DISTRIBUTED_TEST_TRANSPORT=flight, so the generic suite runs over either transport.flightfeature gatestonic/arrow-flightand the gRPC surface. In-memory is the default when it's off.Tests
CI runs the integration suite over Flight, over the in-memory transport, and once more with
--no-default-features.clippyandrustfmtare clean in both feature configs.