Skip to content

Agent: fix worker premature exit in parallel_extract.py with sentinel-based shutdown#723

Merged
luotao1 merged 1 commit into
PaddlePaddle:developfrom
luotao1:test
May 21, 2026
Merged

Agent: fix worker premature exit in parallel_extract.py with sentinel-based shutdown#723
luotao1 merged 1 commit into
PaddlePaddle:developfrom
luotao1:test

Conversation

@luotao1
Copy link
Copy Markdown
Collaborator

@luotao1 luotao1 commented May 21, 2026

PR Category

Feature Enhancement

Description

修复 parallel_extract.py 中 worker 提前退出的问题:引入哨兵值机制

问题描述

在运行 parallel_extract.py 处理大规模模型列表(数万个模型)时,我们观察到严重的 worker 失衡问题:大多数 worker 提前退出,打印 Worker finished (queue empty),最终只剩下 1-2 个 worker 处理全部剩余任务,导致抽取吞吐量下降了一个数量级。

在我们的 Batch 7 抽取任务中(70,724 个模型,16 个 worker),观察到的现象如下:

  • 14 个 worker 在最初几分钟内就提前退出
  • 只剩 2 个 worker 在后续过程中保持活跃
  • 由于并行度严重不足,整体处理时间被大幅拉长

各 Worker 任务处理情况(第五轮实测)

现在问题很清楚了!workers 处理模型数量极不均衡。

由于使用的是 multiprocessing.Queue.get_nowait() 抢任务机制,任务并非由主进程均匀分配,而是由各个 worker 自行争抢。在第五轮实际运行中(总计 22,827 次尝试),各 worker 实际处理数量如下:

处理模型数较多的 worker:

Worker 处理模型数 状态
CPU 5 5,280 已退出
CPU 9 4,977 仍在跑
CPU 13 3,385 已退出
CPU 4 2,179 已退出
CPU 12 2,146 已退出

处理模型数极少的 worker:

Worker 处理模型数 状态
CPU 0 0 已退出(启动后立即退出)
CPU 10 10 已退出
CPU 14 16 已退出
CPU 1 20 已退出
CPU 3 23 已退出
CPU 2 64 已退出

结果

  • 最快的 worker(CPU 5)处理了 5,280 个模型,而最慢的 worker(CPU 0)一个都没处理到就直接退出了
  • 14 个 worker 在抢到少量任务(甚至 0 个)后提前退出,剩余任务全部落到 CPU 9 等少数 worker 上
  • 16 个 worker 的理论并行度实际退化成了 2 个 worker 的串行处理,速度从约 514 个/小时 骤降至约 220 个/小时

根因分析

multiprocessing.Queue 内部使用了一个后台 feeder 线程将数据写入底层管道。当主进程一次性向队列放入大量任务时,feeder 线程需要一定时间才能将所有数据写入管道。

而 worker 函数使用了 get_nowait(),该方法在当前时刻队列为空时就会立即返回 Empty。这就产生了一个竞态条件:

  1. 主进程向队列中放入 N 个模型 ID
  2. Feeder 线程开始将数据写入管道(但尚未完成)
  3. 多个 worker 启动并调用 get_nowait()
  4. 执行速度快的 worker 抢到了前面几个任务
  5. 执行速度慢的 worker(或启动稍晚的 worker)看到 Empty错误地认为队列已经空了
  6. 它们打印 "Worker finished (queue empty)" 并退出
  7. Feeder 线程最终把剩余任务全部写入管道,但已经没有 worker 来消费了

解决方案

get_nowait() 轮询循环替换为阻塞式 get() + 哨兵值(sentinel)退出机制:

  1. 主进程:在所有模型 ID 入队后,向任务队列追加恰好 num_workersNone 哨兵值
  2. Worker 进程:使用阻塞式 task_queue.get() 获取任务。如果取到 None,则跳出循环并正常退出;否则正常处理模型

这一机制保证了:

  • Worker 永远不会因为队列暂时为空而退出
  • 每个 worker 只有在明确收到哨兵值时才会退出
  • 放入 num_workers 个哨兵值,确保即使有 worker 崩溃或提前完成,每个存活的 worker 最终都能收到一个哨兵值

影响评估

  • 可靠性:消除了 worker 饥饿问题,确保所有 worker 都能充分参与并行处理
  • 性能:对于大规模批量抽取任务,吞吐量应该随 worker 数量线性扩展,而不是跌到只剩 1-2 个活跃 worker
  • 向后兼容性:无 API 变更;哨兵值 None 不可能与合法的模型 ID 冲突(模型 ID 始终为非空字符串)
  • 对小规模任务无影响:对于小模型列表,竞态条件本就不太可能触发,因此行为不变

修复验证

在 machine B 上应用修复后重新启动 Batch 7 第六轮(35,105 个模型,16 CPU workers),运行约 1 小时后观察到的 worker 分布:

当前进度:1,779/35,105(约 5.1%),成功率 22.2%

16 个 worker 全部保持活跃,无一提前退出

Worker 已处理 Worker 已处理
Worker-0 17 Worker-8 164
Worker-7 40 Worker-1 169
Worker-3 51 Worker-9 169
Worker-13 56 Worker-5 162
Worker-10 66 Worker-15 162
Worker-14 88 Worker-6 152
Worker-12 94 Worker-4 137
Worker-11 131 Worker-2 136

修复前后对比

指标 修复前(第五轮) 修复后(第六轮)
活跃 worker 数 2/16 16/16
提前退出 worker 数 14 0
处理最多/最少 5,280 / 0 169 / 17
速度 ~220/小时 正常并行

Worker-0 只处理了 17 个是因为它正在处理较大的模型(每个耗时更长),这是正常的——multiprocessing.Queue 本身不保证均匀分配,但关键是所有 worker 都不会提前退出,它们会持续运行直到收到 None 哨兵值。

测试说明

本修复直接针对在 machine B 上 Batch 7 抽取任务中观察到的 worker 失衡问题。应用此修复后重新启动抽取进程,所有 16 个 worker 在整个运行期间保持活跃,无一人提前退出。

…-based shutdown

Problem:
When processing large model lists (tens of thousands), most workers exited
early with "queue empty" while only a few remained active. This caused severe
worker imbalance and dramatically slowed down extraction.

Root cause:
multiprocessing.Queue uses a background feeder thread to write items into the
pipe. When the main process puts many tasks at once, there is a delay before
all items are actually available in the pipe. Workers using get_nowait() see
Empty during this window and incorrectly assume the queue is drained, then exit.

Fix:
1. Replace get_nowait() with blocking get() in worker_fn().
2. Add a done sentinel (None): the main process appends num_workers None
   values to the task queue after all model IDs.
3. Each worker consumes tasks via get(); when it receives None, it breaks
   the loop and exits cleanly.

This guarantees all workers stay alive until every real task is processed,
eliminating the race condition between feeder thread writes and worker reads.
@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented May 21, 2026

Thanks for your contribution!

@luotao1 luotao1 merged commit d23d036 into PaddlePaddle:develop May 21, 2026
3 checks passed
@luotao1 luotao1 deleted the test branch May 21, 2026 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants