multiprocessing.pool在关闭/加入后无限期挂起

我遇到了一个不确定的生产问题,其中multiprocessing.Pool被卡住,从不退回join

我设法将问题简化为这个小例子,并使其在某种程度上可以挂起。

工作示例:

#!/usr/bin/env python3
import os
import time
import multiprocessing.pool

def run_task(i):
    print(f'[{os.getpid()}] task({i}) complete')

if __name__ == '__main__':
    tasks = iter(range(10))
    processes = 4

    pool = multiprocessing.pool.Pool(processes=processes,maxtasksperchild=1)
    running = []
    while True:
        try:
            running = [ f for f in running if not f.ready() ]
            avail = processes - len(running)
            if avail:
                for _ in range(avail):
                    i = next(tasks)
                    print(f'[{os.getpid()}] add task({i})')
                    future = pool.apply_async(run_task,( i,))
                    running.append(future)
            else:
                time.sleep(0.1)
        except StopIteration:
            print(f'[{os.getpid()}] all tasks scheduled')
            break

    print(f'[{os.getpid()}] close and join pool')
    pool.close()
    pool.join()
    print(f'[{os.getpid()}] all done')

由于故障是不确定的,因此问题可能是时机之一。因此,我必须循环运行它才能使其挂起(尽管根据我的经验,它将挂在前几次迭代之一上)。

for i in {1..100}; do ./test.py; done   

挂起时的输出:

[15243] add task(0)
[15243] add task(1)
[15243] add task(2)
[15243] add task(3)
[15244] task(0) complete
[15245] task(1) complete
[15246] task(2) complete
[15247] task(3) complete
[15243] add task(4)
[15243] add task(5)
[15251] task(4) complete
[15243] add task(6)
[15243] add task(7)
[15252] task(5) complete
[15253] task(6) complete
[15243] add task(8)
[15243] add task(9)
[15243] all tasks scheduled
[15255] task(8) complete
[15256] task(9) complete
[15243] close and join pool     <-- hangs here indefinitely

主进程的gdb回溯:

#0  0x00007fb132b7c6c2 in __GI___waitpid (pid=22857,stat_loc=0x7fff8ef55d5c,options=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
#1  0x00000000005d10e5 in os_waitpid_impl (module=<optimised out>,options=0,pid=22857) at ../Modules/posixmodule.c:6941
#2  os_waitpid.lto_priv () at ../Modules/clinic/posixmodule.c.h:2995
#3  0x000000000050a84f in _PyCFunction_FastCallDict (kwargs=<optimised out>,nargs=<optimised out>,args=<optimised out>,func_obj=0x7fb132fea0d8) at ../Objects/methodobject.c:234
#4  _PyCFunction_FastCallKeywords (kwnames=<optimised out>,stack=<optimised out>,func=<optimised out>) at ../Objects/methodobject.c:294
#5  call_function.lto_priv () at ../Python/ceval.c:4851

子进程的gdb回溯:

#0  0x00007fb1328896d6 in futex_abstimed_wait_cancelable (private=0,abstime=0x0,expected=0,futex_word=0x1c68e40) at ../sysdeps/unix/sysv/linux/futex-internal.h:205
#1  do_futex_wait (sem=sem@entry=0x1c68e40,abstime=0x0) at sem_waitcommon.c:111
#2  0x00007fb1328897c8 in __new_sem_wait_slow (sem=0x1c68e40,abstime=0x0) at sem_waitcommon.c:181
#3  0x00000000005ab535 in PyThread_acquire_lock_timed (intr_flag=<optimised out>,microseconds=<optimised out>,lock=<optimised out>) at ../Python/thread_pthread.h:386
#4  PyThread_acquire_lock () at ../Python/thread_pthread.h:595
#5  0x0000000000446bf1 in _enter_buffered_busy (self=self@entry=0x7fb13307aa98) at ../Modules/_io/bufferedio.c:292
#6  0x00000000004ce743 in buffered_flush.lto_priv () at ../Python/thread_pthread.h:416

实施说明:

仅在有工作人员可用时计划任务:

在等待执行每个任务时,其优先级可以更改,因此我不能一开始就将所有任务排入队列。

点击running列表并检查AsyncResult.ready以确定我是否可以执行其他任务

maxtasksperchild = 1:

任务会泄漏内存,因此为了回收在每个任务运行后丢失的内存,我作弊并使用maxtasksperchild=1


观察:

睡眠与繁忙等待:

有趣的是,如果我将time.sleep(0.1)更改为忙碌等待,挂起就会消失。

wait = time.time() + 0.1
while time.time() < wait:
    pass

在父母的睡眠过程中是否可能错过了来自孩子的过程的信号?

maxtasksperchild = 1:

如果我重用原始的子进程,则挂起将消失。


因此,似乎每个任务完成后进程都将被销毁,而父进程却在睡眠之间存在一些相互作用。

作为生产中的快速解决方案,我已经改变了睡眠时间以忙于等待,但这感觉像是一个丑陋的黑客,我想了解到底是什么导致了挂起。

  • 为什么池永远不会从join返回?
  • 为什么忙于等待而不是“解决”问题?
  • 为什么每次都“修复”问题时重用进程而不是创建新进程?
q306400986 回答:multiprocessing.pool在关闭/加入后无限期挂起

我认为问题是一个例外,从技术上讲,它不应该存在,并且可能已经在更高版本的python中得到解决。

[15243] add task(4)
[15243] add task(5)
[15251] task(4) complete
[15243] add task(6)
[15243] add task(7)
[15252] task(5) complete
[15253] task(6) complete
[15243] add task(8)
[15243] add task(9)
[15243] all tasks scheduled <-- Exception Called but [15254] or task(7) is not completed
[15255] task(8) complete
[15256] task(9) complete
[15243] close and join pool     <-- hangs here indefinitely

在异常调用的那点发生了一些事情,这可能导致task(7)进入怪异状态,apply_async允许回调,这意味着3.6可能以不稳定的方式创建线程。

阻塞等待意味着您的主设备不睡觉,并且可能会更快地处理此问题。检查是否增加等待时间或使用apply()是否有所不同。

我不确定为什么要重用“解决”问题,但是可能只是访问时间更快且更容易处理。

本文链接:https://www.f2er.com/3107558.html

大家都在问