From 14b7a045e89da1e2071e66e065a27c994b833f52 Mon Sep 17 00:00:00 2001 From: luotao02 Date: Thu, 21 May 2026 14:53:14 +0800 Subject: [PATCH] Agent: fix worker premature exit in parallel_extract.py with sentinel-based shutdown Problem: When processing large model lists (tens of thousands), most workers exited early with "queue empty" while only a few remained active. This caused severe worker imbalance and dramatically slowed down extraction. Root cause: multiprocessing.Queue uses a background feeder thread to write items into the pipe. When the main process puts many tasks at once, there is a delay before all items are actually available in the pipe. Workers using get_nowait() see Empty during this window and incorrectly assume the queue is drained, then exit. Fix: 1. Replace get_nowait() with blocking get() in worker_fn(). 2. Add a done sentinel (None): the main process appends num_workers None values to the task queue after all model IDs. 3. Each worker consumes tasks via get(); when it receives None, it breaks the loop and exits cleanly. This guarantees all workers stay alive until every real task is processed, eliminating the race condition between feeder thread writes and worker reads. --- graph_net/agent/parallel_extract.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index ad1ab7384..0a6997814 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -226,9 +226,8 @@ def _orphan_watcher(): return while True: - try: - model_id = task_queue.get_nowait() - except queue.Empty: + model_id = task_queue.get() + if model_id is None: break print(f"{prefix} Extracting: {model_id}", flush=True) @@ -523,6 +522,9 @@ def main() -> int: task_queue: multiprocessing.Queue = multiprocessing.Queue() for mid in model_ids: task_queue.put(mid) + # Sentinel: each worker gets one None to signal shutdown + for _ in range(num_workers): + task_queue.put(None) # --- Launch workers --- result_queue: multiprocessing.Queue = multiprocessing.Queue()