Python多处理pool.map无法并行工作

我写了一个简单的并行python程序

import multiprocessing as mp
import time

def test_function(i):
    print("function starts" + str(i))
    time.sleep(1)
    print("function ends" + str(i))

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count())
    pool.map(test_function,[i for i in range(4)])
    pool.close()
    pool.join()

我希望在输出中看到的内容:

function starts0
function starts2
function starts1
function starts3
function ends1
function ends3
function ends2
function ends0

我实际上看到的是

function starts1
function ends1
function starts3
function ends3
function starts2
function ends2
function starts0
function ends0

当我查看输出时,好像pool.map正在运行一个函数,等待完成,然后再运行另一个函数,但是当我计算整个程序的持续时间约为2秒钟时,除非test_function正在并行运行


编辑:

此代码在MacOS和Linux上运行良好,但未显示 windows 10 上的预期输出。 python版本是3.6.4

wwz1989 回答:Python多处理pool.map无法并行工作

multiprocessing.Pool() 文档(自Py27 incl.以来)有意阻止处理队列中很明显-of-calls,是由迭代器生成的Just -4-调用集创建的,由上述发布的示例顺序生成。

multiprocessing 模块的文档介绍了其 Pool.map() 方法:

  

map(func,iterable[,chunksize])
  
      与map()内置函数的并行等效项(尽管它仅支持一个可迭代的参数)。 它会阻塞直到结果准备就绪。

这应该是观察到的行为,而不同的实例化方法将产生不同的附加(与过程复制相关的)开销成本。

无论如何,mp.cpu_count()不需要 是因为这样的调度.Pool()实例工作人员的任务将继续执行的CPU核心数。亲和性的O / S(与用户/流程相关的限制策略)设置:

您的代码将必须“服从”那些multiprocessing请求的子进程允许利用的CPU核心子集,其数量为不高于: len( os.sched_getaffinity( 0 ) )


最佳下一步:重新评估您的整个代码执行生态系统

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time,os,platform,inspect                                      # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),"PID:{0:} with PPID:{1:} runs".format( os.getpid(),os.getppid() ),"{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,_INFO_.lineno,i ),"invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function,_CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),"PID:{0:} with PPID:{1:} ends".format( os.getpid(),i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(),"mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0),"os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function,[i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )

可能会在linux级操作系统上查看类似的内容:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0,1,2,3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork','spawn','forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:

检查隐藏的详细信息-您的操作系统使用什么来调用test_function()- mapstar() (不是普遍确定的选择)是本地SMP-linux-class操作系统选择其默认子过程实例化方法的方式,该方法通过' fork 执行>”。

,

我怀疑您可能在多处理过程中遇到了常见问题:

从多个执行线程(或进程)(同时)打印到共享日志/屏幕会产生令人困惑的结果!

这也解释了为什么您会看到不同的行为,具体取决于操作系统。不同的操作系统将以略有不同的方式解决此问题。基础缓冲方案,访问控制等将有所作为。

您可能正在获得所需的多重处理,但打印输出可能会产生误导。

我知道您提供了此代码作为示例,以演示一个实际问题。因此,只需返回原始代码并再次考虑上述经常被忽视的事实:打印(或记录到文件)正在访问共享资源。您可能需要锁定或排队或其他技术。在不知道您的实际问题的细节的情况下,没有其他建议。

本文链接:https://www.f2er.com/3148864.html

大家都在问