multiprocessing.Pool()
文档(自Py27 incl.以来)在有意阻止处理队列中很明显-of-calls,是由迭代器生成的Just -4-调用集创建的,由上述发布的示例顺序生成。
multiprocessing
模块的文档介绍了其 Pool.map()
方法:
map(func,iterable[,chunksize])
与map()
内置函数的并行等效项(尽管它仅支持一个可迭代的参数)。 它会阻塞直到结果准备就绪。
这应该是观察到的行为,而不同的实例化方法将产生不同的附加(与过程复制相关的)开销成本。
无论如何,mp.cpu_count()
不需要 是因为这样的调度.Pool()
实例工作人员的任务将继续执行的CPU核心数。亲和性的O / S(与用户/流程相关的限制策略)设置:
您的代码将必须“服从”那些multiprocessing
请求的子进程允许利用的CPU核心子集,其数量为不高于: len( os.sched_getaffinity( 0 ) )
最佳下一步:重新评估您的整个代码执行生态系统
import multiprocessing as mp # file: ~/p/StackOverflow_multiprocessing.py
import time,os,platform,inspect # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642
def test_function( i = -1 ):
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; callerframerecord = inspect.stack()[1] # 1 represents line at caller
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] )
pass; _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
print( "{0:_>30.10f} ::".format( time.monotonic() ),"PID:{0:} with PPID:{1:} runs".format( os.getpid(),os.getppid() ),"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function,_INFO_.lineno,i ),"invoked from {0:}()-LINE[{1:_>4d}]".format( _CALLER_.function,_CALLER_.lineno )
)
time.sleep( 10 )
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] ) # 1 represents line at caller
print( "{0:_>30.10f} ::".format( time.monotonic() ),"PID:{0:} with PPID:{1:} ends".format( os.getpid(),i )
)
if __name__ == '__main__':
print( "{0:} cores reported by {1:}".format( mp.cpu_count(),"mp.cpu_count()" ) )
print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0),"os.sched_getaffinity(0)" ) )
print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
#------mp.Pool()-----------------------------------------------------
pool = mp.Pool( mp.cpu_count() )
print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
#---.map()--------------------------------------?
#---.map( ?
pool.map( test_function,[i for i in range(4) ] )
#---.map( ?
#---.map()--------------------------------------?
print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
pool.close()
#---.close()
print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
pool.join()
#---.join()
print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic() ) )
print( "EXECUTED on {0:}".format( platform.version() ) )
print( "USING: python-{0:}:".format( platform.python_version() ) )
可能会在linux级操作系统上查看类似的内容:
(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0,1,2,3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork','spawn','forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7],invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d
EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:
检查隐藏的详细信息-您的操作系统使用什么来调用test_function()
- mapstar()
(不是普遍确定的选择)是本地SMP-linux-class操作系统选择其默认子过程实例化方法的方式,该方法通过' fork
执行>”。
,
我怀疑您可能在多处理过程中遇到了常见问题:
从多个执行线程(或进程)(同时)打印到共享日志/屏幕会产生令人困惑的结果!
这也解释了为什么您会看到不同的行为,具体取决于操作系统。不同的操作系统将以略有不同的方式解决此问题。基础缓冲方案,访问控制等将有所作为。
您可能正在获得所需的多重处理,但打印输出可能会产生误导。
我知道您提供了此代码作为示例,以演示一个实际问题。因此,只需返回原始代码并再次考虑上述经常被忽视的事实:打印(或记录到文件)正在访问共享资源。您可能需要锁定或排队或其他技术。在不知道您的实际问题的细节的情况下,没有其他建议。
本文链接:https://www.f2er.com/3148864.html