[Data] Two-mode block metadata fetch behind a single MetadataFetcher interface#64378
[Data] Two-mode block metadata fetch behind a single MetadataFetcher interface#64378xinyuangui2 wants to merge 7 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces an asynchronous metadata prefetching mechanism for the streaming executor in Ray Data. By shifting metadata fetches to a background thread via MetadataPrefetcher, the main scheduling loop is prevented from blocking on straggling metadata references. The changes add MetadataFetcher strategies (ThreadedMetadataFetcher and InlineMetadataFetcher), update DataOpTask to manage deferred emissions and generator lifecycles, and integrate these components into the streaming executor state processing. Extensive unit tests are also added to verify correctness, ordering, and error handling under the new threaded mode. I have no feedback to provide as there are no review comments.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
…taFetcher interface
Moves the blocking ray.get(meta_ref) out of the streaming scheduler's
process_completed_tasks onto a background thread, and exposes it as two explicit
modes selected by one env var with the per-mode divergence isolated in a single
small interface.
- New MetadataFetcher interface (metadata_fetcher.py) with two impls:
- InlineMetadataFetcher: synchronous, master-identical — fetch metadata inline
via ray.get and emit immediately, budgeting off meta.size_bytes; the
done-callback fires inline at end-of-stream and a task failure re-raises.
- ThreadedMetadataFetcher: defers each pulled pair and fetches its metadata on
a background MetadataPrefetcher thread, budgeting off the block's local
object_size; completion is postponed until the deferred pairs emit.
- make_metadata_fetcher() picks the mode from
RAY_DATA_METADATA_PREFETCH_ON_THREAD (default: threaded).
- MetadataPrefetcher: per-operator FIFO emission preserving order; fetches the
refs ray.wait(fetch_local=True) reports ready; fetch failures surfaced for
max_errored_blocks accounting rather than raised.
- DataOpTask.on_data_ready keeps one shared pull loop and delegates per-pair
fetch/emit to fetcher.in_loop_get_size; completion fires inline only when the
fetcher doesn't defer it. Adds an object_size_ready_callback test seam.
- process_completed_tasks / StreamingExecutor take the fetcher; both default to
the inline fetcher when none is passed.
- Tests: existing sync tests run against the inline default; threaded tests
drive ThreadedMetadataFetcher; adds FakeMetadataFetcher branch-coverage,
object_size_ready_callback seam, and make_metadata_fetcher mode-selection
tests.
Signed-off-by: xgui <xgui@anyscale.com>
f8af7e1 to
c9ce291
Compare
- MetadataFetcher now owns deferred pairs (drop the deferred_emits param from on_data_ready / in_loop_get_size); submit() flushes the fetcher's own list. - on_data_ready requires a metadata_fetcher (no internal default); drop default_metadata_fetcher / the shared inline singleton. - Replace the defers_completion branch with a polymorphic in_loop_done(task) (inline fires the done-callback + re-raises; threaded is a no-op). - Trim redundant comments (TaskGeneratorState statuses, end-of-stream, task-failure, module/class docstrings). - Tests: rename test_process_completed_tasks -> _inline and the threaded variant -> test_process_completed_tasks_threaded; drive the deferred tests through the real emit flow (prefetcher.emit_ready) instead of manual emit_block; drop the cross-task ordering test (only per-op order is guaranteed); assert exact block sizes (108/208/...) instead of approx. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 85fe4a2. Configure here.
- Trim interface docstrings (drop subclass-behavior note from in_loop_done, drop "(no-op unless overridden)" and the "owned here" comment). - Rename ThreadedMetadataFetcher._deferred -> _pending_deferred. - Fix: run metadata_fetcher.submit()/register_drained() in a finally so a re-raised max_errored_blocks error can't strand pairs already deferred into the fetcher this iteration. - Tests: rename test_on_data_ready_deferred_does_not_emit_inline -> test_on_data_ready_deferred_threading; drop the redundant object_size_ready_callback_seam test; add a multi-step threaded process_completed_tasks test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
…polish - Merge metadata_prefetcher.py into metadata_fetcher.py: ThreadedMetadataFetcher now owns the background-thread fetch loop directly (delete the separate MetadataPrefetcher class and its file). - Drop "master-identical"/historical phrasing (this is the new master). - Rename register_drained -> check_if_drained (it only registers drained tasks). - Simplify the finally comment (don't spell out the error type). - Tests: drive the threaded tests through ThreadedMetadataFetcher directly (no separate prefetcher); _drain_completed_tasks_threaded raises on timeout instead of returning silently. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
…elper - Rename _drain_completed_tasks_threaded -> _process_completed_tasks_threaded: the single helper that starts the threaded fetcher, runs process_completed_tasks, drains to completion (raising on timeout), and stops the fetcher — the threaded counterpart of _process_completed_tasks_sync. - Drive the executor-level tests (update_operator_states_drains_upstream, output_backpressure_policy_tracking, the zero-limit unblock test) through it, since threaded is the executor default. The inline test and the on_data_ready micro-tests stay on _process_completed_tasks_sync / InlineMetadataFetcher. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
pyrefly flags relying on a transitively-loaded ray.exceptions (implicit-import). Import it explicitly so the GetTimeoutError references resolve. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>
- register_if_drained (was check_if_drained): it registers drained tasks. - DataOpTask.add_pending_metadata_ref (was mark_pending): clearer that it records one more pending metadata ref awaiting emission. - Add an ASCII data-flow diagram to ThreadedMetadataFetcher showing the two threads, the shared request queue, and the per-operator emit FIFO. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: xgui <xgui@anyscale.com>

Why
At scale the streaming scheduler blocks on
ray.get(meta_ref)for everyblock-metadata pair inside
process_completed_tasks— the largest avoidablecost in the scheduling loop. This moves that fetch off the scheduling thread,
behind a single interface with two explicit modes so the old synchronous path
stays available unchanged.
Flow
DataOpTask.on_data_readyowns one shared pull loop; the per-pair fetch/emitand the completion signal are delegated to the
MetadataFetcher, so the twomodes differ only behind the interface (not in the loop).
flowchart TB subgraph SCHED["process_completed_tasks — scheduling thread (per iteration)"] ODR["DataOpTask.on_data_ready()<br/>shared pull loop: pull (block_ref, meta_ref)"] IGS["fetcher.in_loop_get_size(pair) → budget size"] DONE["on drain: fetcher.in_loop_done(task)"] SUB["after per-op loop:<br/>fetcher.submit(op) + register_if_drained()"] ALB["end of call: fetcher.after_loop_batch()"] ODR --> IGS --> DONE --> SUB --> ALB end IGS -->|inline| INL["InlineMetadataFetcher<br/>ray.get(meta_ref) + emit now<br/>budget = meta.size_bytes"] DONE -->|inline| INLD["fire done-callback now<br/>(re-raise task failure)"] IGS -->|threaded| THR["ThreadedMetadataFetcher<br/>defer pair; budget = local object_size (no RPC)"] DONE -->|threaded| THRD["no-op (completion postponed)"] SUB -->|threaded| PF[["ThreadedMetadataFetcher background thread<br/>ray.wait(fetch_local=True) → ray.get(meta_refs)"]] PF -. publish bytes .-> ALBT ALB -->|threaded| ALBT["emit ready pairs (per-op FIFO order)<br/>+ fire postponed done-callbacks"]What
MetadataFetcherinterface (metadata_fetcher.py), selected byRAY_DATA_METADATA_PREFETCH_ON_THREAD(default: threaded):InlineMetadataFetcher— synchronous: fetch metadata inline viaray.getand emit immediately, budget offmeta.size_bytes;in_loop_donefires the done-callback at end-of-stream and re-raises a task failure.
ThreadedMetadataFetcher— defers each pulled pair and fetches itsmetadata on its own background thread, budgets off the block's local
object_size; completion is postponed until the deferred pairs emit, andthe per-operator FIFO preserves emission order. (See the class docstring for
a data-flow diagram of the two threads, the shared queue, and the FIFO.)
DataOpTask.on_data_readykeeps one shared pull loop and delegates per-pairfetch/emit to
in_loop_get_sizeand completion toin_loop_done(inlinefires it, threaded is a no-op).
process_completed_tasks/StreamingExecutorrequire aMetadataFetcher;the executor builds one via
make_metadata_fetcher(). Backgroundmetadata-fetch failures go through the same
max_errored_blocksaccounting asinline errors, and
submit/register_if_drainedrun in afinallyso athrown error can't strand already-deferred pairs.
Most executor-level tests drive the threaded mode through a shared
_process_completed_tasks_threadedhelper; a few stay inline(
_process_completed_tasks_sync). AFakeFetchertest driveson_data_ready'sper-pair branches deterministically, plus a mode-selection test.
Test
| variant | total sched (s) | Δ |
|---|--:|--:|--:|--:|--:|--:|
| 2000_actors_1ops | 86 -> 58 | -32% |
| 2000_actors_15ops | 1099 -> 726 | -33% |
| 2000_tasks_1ops | 77 -> 50 | -34% |
| 2000_tasks_15ops | 1142 → 747 | -34% |
| 5000_actors_1ops | 204 → 218|
| 5000_actors_15ops | 3003 → 1834 | -38% |
| 5000_tasks_1ops | 229 → 189 | -17% |
| 5000_tasks_15ops | 2813 → 1863 | -33% |