Skip to content

[Data] Two-mode block metadata fetch behind a single MetadataFetcher interface#64378

Open
xinyuangui2 wants to merge 7 commits into
ray-project:masterfrom
xinyuangui2:xgui/metadata-fetch-modes
Open

[Data] Two-mode block metadata fetch behind a single MetadataFetcher interface#64378
xinyuangui2 wants to merge 7 commits into
ray-project:masterfrom
xinyuangui2:xgui/metadata-fetch-modes

Conversation

@xinyuangui2

@xinyuangui2 xinyuangui2 commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

Why

At scale the streaming scheduler blocks on ray.get(meta_ref) for every
block-metadata pair inside process_completed_tasks — the largest avoidable
cost in the scheduling loop. This moves that fetch off the scheduling thread,
behind a single interface with two explicit modes so the old synchronous path
stays available unchanged.

Flow

DataOpTask.on_data_ready owns one shared pull loop; the per-pair fetch/emit
and the completion signal are delegated to the MetadataFetcher, so the two
modes differ only behind the interface (not in the loop).

flowchart TB
    subgraph SCHED["process_completed_tasks — scheduling thread (per iteration)"]
      ODR["DataOpTask.on_data_ready()<br/>shared pull loop: pull (block_ref, meta_ref)"]
      IGS["fetcher.in_loop_get_size(pair) → budget size"]
      DONE["on drain: fetcher.in_loop_done(task)"]
      SUB["after per-op loop:<br/>fetcher.submit(op) + register_if_drained()"]
      ALB["end of call: fetcher.after_loop_batch()"]
      ODR --> IGS --> DONE --> SUB --> ALB
    end

    IGS -->|inline| INL["InlineMetadataFetcher<br/>ray.get(meta_ref) + emit now<br/>budget = meta.size_bytes"]
    DONE -->|inline| INLD["fire done-callback now<br/>(re-raise task failure)"]

    IGS -->|threaded| THR["ThreadedMetadataFetcher<br/>defer pair; budget = local object_size (no RPC)"]
    DONE -->|threaded| THRD["no-op (completion postponed)"]
    SUB -->|threaded| PF[["ThreadedMetadataFetcher background thread<br/>ray.wait(fetch_local=True) → ray.get(meta_refs)"]]
    PF -. publish bytes .-> ALBT
    ALB -->|threaded| ALBT["emit ready pairs (per-op FIFO order)<br/>+ fire postponed done-callbacks"]
Loading

What

  • New MetadataFetcher interface (metadata_fetcher.py), selected by
    RAY_DATA_METADATA_PREFETCH_ON_THREAD (default: threaded):
    • InlineMetadataFetcher — synchronous: fetch metadata inline via
      ray.get and emit immediately, budget off meta.size_bytes; in_loop_done
      fires the done-callback at end-of-stream and re-raises a task failure.
    • ThreadedMetadataFetcher — defers each pulled pair and fetches its
      metadata on its own background thread, budgets off the block's local
      object_size; completion is postponed until the deferred pairs emit, and
      the per-operator FIFO preserves emission order. (See the class docstring for
      a data-flow diagram of the two threads, the shared queue, and the FIFO.)
  • DataOpTask.on_data_ready keeps one shared pull loop and delegates per-pair
    fetch/emit to in_loop_get_size and completion to in_loop_done (inline
    fires it, threaded is a no-op).
  • process_completed_tasks / StreamingExecutor require a MetadataFetcher;
    the executor builds one via make_metadata_fetcher(). Background
    metadata-fetch failures go through the same max_errored_blocks accounting as
    inline errors, and submit/register_if_drained run in a finally so a
    thrown error can't strand already-deferred pairs.

Most executor-level tests drive the threaded mode through a shared
_process_completed_tasks_threaded helper; a few stay inline
(_process_completed_tasks_sync). A FakeFetcher test drives on_data_ready's
per-pair branches deterministically, plus a mode-selection test.

Test

| variant | total sched (s) | Δ |
|---|--:|--:|--:|--:|--:|--:|
| 2000_actors_1ops | 86 -> 58 | -32% |
| 2000_actors_15ops | 1099 -> 726 | -33% |
| 2000_tasks_1ops | 77 -> 50 | -34% |
| 2000_tasks_15ops | 1142 → 747 | -34% |
| 5000_actors_1ops | 204 → 218|
| 5000_actors_15ops | 3003 → 1834 | -38% |
| 5000_tasks_1ops | 229 → 189 | -17% |
| 5000_tasks_15ops | 2813 → 1863 | -33% |

@xinyuangui2 xinyuangui2 requested a review from a team as a code owner June 26, 2026 18:34

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an asynchronous metadata prefetching mechanism for the streaming executor in Ray Data. By shifting metadata fetches to a background thread via MetadataPrefetcher, the main scheduling loop is prevented from blocking on straggling metadata references. The changes add MetadataFetcher strategies (ThreadedMetadataFetcher and InlineMetadataFetcher), update DataOpTask to manage deferred emissions and generator lifecycles, and integrate these components into the streaming executor state processing. Extensive unit tests are also added to verify correctness, ordering, and error handling under the new threaded mode. I have no feedback to provide as there are no review comments.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

…taFetcher interface

Moves the blocking ray.get(meta_ref) out of the streaming scheduler's
process_completed_tasks onto a background thread, and exposes it as two explicit
modes selected by one env var with the per-mode divergence isolated in a single
small interface.

- New MetadataFetcher interface (metadata_fetcher.py) with two impls:
  - InlineMetadataFetcher: synchronous, master-identical — fetch metadata inline
    via ray.get and emit immediately, budgeting off meta.size_bytes; the
    done-callback fires inline at end-of-stream and a task failure re-raises.
  - ThreadedMetadataFetcher: defers each pulled pair and fetches its metadata on
    a background MetadataPrefetcher thread, budgeting off the block's local
    object_size; completion is postponed until the deferred pairs emit.
  - make_metadata_fetcher() picks the mode from
    RAY_DATA_METADATA_PREFETCH_ON_THREAD (default: threaded).
- MetadataPrefetcher: per-operator FIFO emission preserving order; fetches the
  refs ray.wait(fetch_local=True) reports ready; fetch failures surfaced for
  max_errored_blocks accounting rather than raised.
- DataOpTask.on_data_ready keeps one shared pull loop and delegates per-pair
  fetch/emit to fetcher.in_loop_get_size; completion fires inline only when the
  fetcher doesn't defer it. Adds an object_size_ready_callback test seam.
- process_completed_tasks / StreamingExecutor take the fetcher; both default to
  the inline fetcher when none is passed.
- Tests: existing sync tests run against the inline default; threaded tests
  drive ThreadedMetadataFetcher; adds FakeMetadataFetcher branch-coverage,
  object_size_ready_callback seam, and make_metadata_fetcher mode-selection
  tests.

Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 force-pushed the xgui/metadata-fetch-modes branch from f8af7e1 to c9ce291 Compare June 26, 2026 18:40
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Jun 26, 2026
- MetadataFetcher now owns deferred pairs (drop the deferred_emits param from
  on_data_ready / in_loop_get_size); submit() flushes the fetcher's own list.
- on_data_ready requires a metadata_fetcher (no internal default); drop
  default_metadata_fetcher / the shared inline singleton.
- Replace the defers_completion branch with a polymorphic in_loop_done(task)
  (inline fires the done-callback + re-raises; threaded is a no-op).
- Trim redundant comments (TaskGeneratorState statuses, end-of-stream,
  task-failure, module/class docstrings).
- Tests: rename test_process_completed_tasks -> _inline and the threaded
  variant -> test_process_completed_tasks_threaded; drive the deferred tests
  through the real emit flow (prefetcher.emit_ready) instead of manual
  emit_block; drop the cross-task ordering test (only per-op order is
  guaranteed); assert exact block sizes (108/208/...) instead of approx.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 85fe4a2. Configure here.

Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
@xinyuangui2 xinyuangui2 added the go add ONLY when ready to merge, run all tests label Jun 27, 2026
xinyuangui2 and others added 5 commits June 27, 2026 21:38
- Trim interface docstrings (drop subclass-behavior note from in_loop_done,
  drop "(no-op unless overridden)" and the "owned here" comment).
- Rename ThreadedMetadataFetcher._deferred -> _pending_deferred.
- Fix: run metadata_fetcher.submit()/register_drained() in a finally so a
  re-raised max_errored_blocks error can't strand pairs already deferred into
  the fetcher this iteration.
- Tests: rename test_on_data_ready_deferred_does_not_emit_inline ->
  test_on_data_ready_deferred_threading; drop the redundant
  object_size_ready_callback_seam test; add a multi-step threaded
  process_completed_tasks test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…polish

- Merge metadata_prefetcher.py into metadata_fetcher.py: ThreadedMetadataFetcher
  now owns the background-thread fetch loop directly (delete the separate
  MetadataPrefetcher class and its file).
- Drop "master-identical"/historical phrasing (this is the new master).
- Rename register_drained -> check_if_drained (it only registers drained tasks).
- Simplify the finally comment (don't spell out the error type).
- Tests: drive the threaded tests through ThreadedMetadataFetcher directly
  (no separate prefetcher); _drain_completed_tasks_threaded raises on timeout
  instead of returning silently.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…elper

- Rename _drain_completed_tasks_threaded -> _process_completed_tasks_threaded:
  the single helper that starts the threaded fetcher, runs
  process_completed_tasks, drains to completion (raising on timeout), and stops
  the fetcher — the threaded counterpart of _process_completed_tasks_sync.
- Drive the executor-level tests (update_operator_states_drains_upstream,
  output_backpressure_policy_tracking, the zero-limit unblock test) through it,
  since threaded is the executor default. The inline test and the on_data_ready
  micro-tests stay on _process_completed_tasks_sync / InlineMetadataFetcher.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
pyrefly flags relying on a transitively-loaded ray.exceptions (implicit-import).
Import it explicitly so the GetTimeoutError references resolve.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
- register_if_drained (was check_if_drained): it registers drained tasks.
- DataOpTask.add_pending_metadata_ref (was mark_pending): clearer that it
  records one more pending metadata ref awaiting emission.
- Add an ASCII data-flow diagram to ThreadedMetadataFetcher showing the two
  threads, the shared request queue, and the per-operator emit FIFO.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant