Agent: fix worker premature exit in parallel_extract.py with sentinel-based shutdown#723
Merged
Merged
Conversation
…-based shutdown Problem: When processing large model lists (tens of thousands), most workers exited early with "queue empty" while only a few remained active. This caused severe worker imbalance and dramatically slowed down extraction. Root cause: multiprocessing.Queue uses a background feeder thread to write items into the pipe. When the main process puts many tasks at once, there is a delay before all items are actually available in the pipe. Workers using get_nowait() see Empty during this window and incorrectly assume the queue is drained, then exit. Fix: 1. Replace get_nowait() with blocking get() in worker_fn(). 2. Add a done sentinel (None): the main process appends num_workers None values to the task queue after all model IDs. 3. Each worker consumes tasks via get(); when it receives None, it breaks the loop and exits cleanly. This guarantees all workers stay alive until every real task is processed, eliminating the race condition between feeder thread writes and worker reads.
|
Thanks for your contribution! |
Xreki
approved these changes
May 21, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
PR Category
Feature Enhancement
Description
修复 parallel_extract.py 中 worker 提前退出的问题:引入哨兵值机制
问题描述
在运行
parallel_extract.py处理大规模模型列表(数万个模型)时,我们观察到严重的 worker 失衡问题:大多数 worker 提前退出,打印Worker finished (queue empty),最终只剩下 1-2 个 worker 处理全部剩余任务,导致抽取吞吐量下降了一个数量级。在我们的 Batch 7 抽取任务中(70,724 个模型,16 个 worker),观察到的现象如下:
各 Worker 任务处理情况(第五轮实测)
现在问题很清楚了!workers 处理模型数量极不均衡。
由于使用的是
multiprocessing.Queue.get_nowait()抢任务机制,任务并非由主进程均匀分配,而是由各个 worker 自行争抢。在第五轮实际运行中(总计 22,827 次尝试),各 worker 实际处理数量如下:处理模型数较多的 worker:
处理模型数极少的 worker:
结果:
根因分析
multiprocessing.Queue内部使用了一个后台 feeder 线程将数据写入底层管道。当主进程一次性向队列放入大量任务时,feeder 线程需要一定时间才能将所有数据写入管道。而 worker 函数使用了
get_nowait(),该方法在当前时刻队列为空时就会立即返回Empty。这就产生了一个竞态条件:get_nowait()Empty,错误地认为队列已经空了解决方案
将
get_nowait()轮询循环替换为阻塞式get()+ 哨兵值(sentinel)退出机制:num_workers个None哨兵值task_queue.get()获取任务。如果取到None,则跳出循环并正常退出;否则正常处理模型这一机制保证了:
num_workers个哨兵值,确保即使有 worker 崩溃或提前完成,每个存活的 worker 最终都能收到一个哨兵值影响评估
None不可能与合法的模型 ID 冲突(模型 ID 始终为非空字符串)修复验证
在 machine B 上应用修复后重新启动 Batch 7 第六轮(35,105 个模型,16 CPU workers),运行约 1 小时后观察到的 worker 分布:
当前进度:1,779/35,105(约 5.1%),成功率 22.2%
16 个 worker 全部保持活跃,无一提前退出:
修复前后对比:
Worker-0 只处理了 17 个是因为它正在处理较大的模型(每个耗时更长),这是正常的——
multiprocessing.Queue本身不保证均匀分配,但关键是所有 worker 都不会提前退出,它们会持续运行直到收到None哨兵值。测试说明
本修复直接针对在 machine B 上 Batch 7 抽取任务中观察到的 worker 失衡问题。应用此修复后重新启动抽取进程,所有 16 个 worker 在整个运行期间保持活跃,无一人提前退出。