diff --git a/graph_net/agent/parallel_extract.py b/graph_net/agent/parallel_extract.py index ad1ab7384..0a6997814 100644 --- a/graph_net/agent/parallel_extract.py +++ b/graph_net/agent/parallel_extract.py @@ -226,9 +226,8 @@ def _orphan_watcher(): return while True: - try: - model_id = task_queue.get_nowait() - except queue.Empty: + model_id = task_queue.get() + if model_id is None: break print(f"{prefix} Extracting: {model_id}", flush=True) @@ -523,6 +522,9 @@ def main() -> int: task_queue: multiprocessing.Queue = multiprocessing.Queue() for mid in model_ids: task_queue.put(mid) + # Sentinel: each worker gets one None to signal shutdown + for _ in range(num_workers): + task_queue.put(None) # --- Launch workers --- result_queue: multiprocessing.Queue = multiprocessing.Queue()