在python 3.4.7的pool.map函数中添加其他随机参数作为参数

我想对大型数据集使用多重处理来查找两列的乘积,并使用参数中给定参数过滤数据集。我构建了一个测试集,但无法在该集上进行多重处理。

首先,我尝试在parallelize_dataframe函数中对数据集进行除法,然后在subset_col函数中应用乘法函数和过滤器函数。稍后,我将完整的数据集添加回parallelize_dataframe中。

import numpy as np
import pandas as pd
from multiprocessing import Pool
from multiprocessing import Lock

df = pd.DataFrame({'col1': [1,1,1],'col2': ['aa','aa','bb','cc','cc'],'col3': [1,2,3,4,5,6,7,8,9,10],'col4': [21,22,23,24,25,26,27,28,29,30]})



def subset_col(df,p):
    print("Working with number: " + str(p))
    df[col5] = df[col3]*df[col4]
    df= df[df['col1'] == p]


def parallelize_dataframe(df,p,func,n_cores=80):
    df_split = np.array_split(df,n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func,df_split,p))
    pool.close()
    pool.join()
    return df


df3 = parallelize_dataframe(df,subset_col)


结果应为col3和col4的乘积,其中col1使用值过滤。但是我总是会出错:

File "<stdin>",line 1,in <module>
File "<stdin>",line 4,in parallelize_dataframe
struct.error: 'i' format requires -2147483648 <= number <= 2147483647 

但是,如果我从所有函数中删除过滤器“ p”,它就可以正常工作。有人可以帮我调试吗?

dilixinxi123 回答:在python 3.4.7的pool.map函数中添加其他随机参数作为参数

根据multiprocessing.Pool.map的官方文档,它“仅支持一个 iterable 参数”。因此,您需要更改subset_col的接口以采用单个参数。另外,您忘记了使列成为字符串,从而导致名称错误。为了减少计算量,应在相乘之前进行过滤。然后应返回一个值,除非您的函数仅通过副作用进行操作(我假设您不希望这样做,因为您将池结果串联在一起)。

def subset_col(pair):
    df,p = pair
    print("Working with number: " + str(p))
    df = df[df['col1'] == p].copy()
    df['col5'] = df['col3']
    return df

接下来,我们将需要修正您调用pool.map的方式,因为它只根据您的工作接受2个参数(第3个,最后一个参数为chunksize)。由于您希望每个流程使用相同的p,因此我们将dfs与每个{的重复值p压缩在一起。另外,考虑使用上下文管理器来处理关闭资源。

def parallelize_dataframe(df,p,func,n_cores=None):
    if n_cores is None:
        n_cores = os.cpu_count()

    dfs = np.array_split(df,n_cores)
    pairs = zip(dfs,itertools.repeat(p))
    with Pool(n_cores) as pool:
        result = pool.map(func,pairs)

    df = pd.concat(result)
    return df

这现在可以正确返回新的数据帧。但是我绝对怀疑您拥有一台具有80核的机器。考虑使用n_cores=None

实现os.cpu_count以使Python dynamically figure out的计算机上有多少个内核
df3 = parallelize_dataframe(df,1,subset_col)

根据您对Pool.starmap变体的要求:

def subset_col(df,p):
    # remove unpacking line
    ...

def parallelize_dataframe(df,n_cores=None):
    ...
    # change `pool.map(...)` to `pool.starmap(...)`
    ...

不过,您应该注意,Pool没有为imap提供imap_unorderedstarmap的替代方案,它们都是惰性评估版本,它们是否保留顺序都不同是否。

本文链接:https://www.f2er.com/3154291.html

大家都在问