[Data][1/2] - Dynamic work queue for traversals#64388
[Data][1/2] - Dynamic work queue for traversals#64388goutamvenkat-anyscale wants to merge 1 commit into
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a new dynamic work queue implementation (parallel_process_work_stealing) and corresponding unit tests to support parallel processing with dynamic work generation and load balancing. The review feedback highlights several critical robustness improvements: catching BaseException instead of Exception in worker threads to prevent silent failures, updating the type annotation of _WorkerError.exception to Optional[BaseException] to satisfy type checkers, implementing a global timeout pattern when joining threads during shutdown, and wrapping the completion signaler in a try-except block to avoid indefinite hangs on unexpected errors.
| except Exception as e: | ||
| pool.output_queue.put(_WorkerError(e)) |
There was a problem hiding this comment.
In _worker, catching only Exception means that any BaseException (such as KeyboardInterrupt or SystemExit) raised inside process_fn will not be caught. This causes the worker thread to die silently while the task is still marked as done in the finally block, leading to silent data loss and incomplete processing without the main thread knowing. Catching BaseException instead ensures all critical errors are propagated to the main thread.
except BaseException as e:\n pool.output_queue.put(_WorkerError(e))| output queue are unambiguously distinguishable from legitimate result | ||
| values.""" | ||
|
|
||
| exception: BaseException |
There was a problem hiding this comment.
The exception field in _WorkerError is typed as BaseException, but in _raise_if_error it is set to None to avoid reference cycles. This will cause type checkers like mypy to raise an error. The type should be updated to Optional[BaseException].
| exception: BaseException | |
| exception: Optional[BaseException] |
| if join_timeout is not None: | ||
| for t in self._threads: | ||
| t.join(timeout=join_timeout) |
There was a problem hiding this comment.
In _WorkerPool.shutdown, joining threads sequentially with a fixed timeout can block the main thread for up to num_workers * join_timeout if multiple threads hang. It is highly recommended to use a global timeout pattern to ensure the total shutdown time does not exceed join_timeout.
if join_timeout is not None:\n import time\n end_time = time.monotonic() + join_timeout\n for t in self._threads:\n remaining = end_time - time.monotonic()\n if remaining <= 0:\n break\n t.join(timeout=remaining)| if not _interruptible_join(pool.work_queue, pool.interrupted): | ||
| return | ||
| # Stop consumers | ||
| pool.output_queue.put(SENTINEL) | ||
| for _ in range(num_workers): | ||
| # Stop producers | ||
| pool.work_queue.put(SENTINEL) |
There was a problem hiding this comment.
In _signal_completion, if any unexpected exception occurs, the signaler thread will die silently, causing the main thread to block indefinitely on pool.output_queue.get(). Wrapping the signaler logic in a try-except block to propagate unexpected exceptions to the output_queue improves robustness and prevents hangs.
try:\n if not _interruptible_join(pool.work_queue, pool.interrupted):\n return\n # Stop consumers\n pool.output_queue.put(SENTINEL)\n for _ in range(num_workers):\n # Stop producers\n pool.work_queue.put(SENTINEL)\n except InterruptedError:\n pass\n except Exception as e:\n pool.output_queue.put(_WorkerError(e))There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit c843a27. Configure here.
| else: | ||
| for item in iter(pool.output_queue.get, SENTINEL): | ||
| _raise_if_error(item) | ||
| yield item |
There was a problem hiding this comment.
Sentinel equality stops output early
Medium Severity
The main thread drains output_queue with iter(pool.output_queue.get, SENTINEL), which ends when a dequeued value compares equal to the sentinel via ==, not identity. A legitimate add_result value whose __eq__ returns true for the sentinel can terminate draining early, yielding incomplete results without error while workers may still be running.
Reviewed by Cursor Bugbot for commit c843a27. Configure here.


Description
Adds
parallel_process_work_stealing, a thread-pool-backed generator for parallelizing workloads where the full set of work items is discovered at runtime (e.g., recursive directory listing). Unlikemake_async_genwhich maps over a static input iterator, this utility lets workers dynamically enqueue new work items back into a shared queue, enabling work-stealing-style load balancing across threads.dynamic_work_queue.pywithparallel_process_work_stealing_WorkerPool(thread lifecycle management)_raise_if_error(cross-thread error propagation).Supports optional deterministic ordering via preserve_order + order_key, grouping results by their originating seed item.
Adds comprehensive tests covering flat processing, recursive tree traversal, deep chains, error propagation, early stopping, empty inputs, and ordering guarantees.
Diagram:
Related issues
Additional information