这是我第一次尝试并发编程,我需要一点帮助来解决问题。
我正在尝试自行实现一个学习曲线功能,例如在sklearn.model_selection模块中找到的功能。我有两个循环,一个循环遍历要使用的样本数(本例中为[em,size_i 的分数[0,1]),另一个循环为每个循环重复 fit_and_score n_sample, j 次。
目标是使 fit_and_score 函数并行化,该函数将通过模型,n_samples,训练和测试集进行评分并调用评分器。我已经实现的方法(请参见下面的代码)运行良好,但是比以前的串行代码慢许多倍。我假设这是由于序列化传递给 fit_and_score 函数的所有参数而产生的开销。
由于这些参数不会在 j 迭代中发生变化,因此我认为必须可以将它们传递一次,从而减少序列化开销。但是,我找不到解决方法。
这是我的代码:
def lc_fit_and_score(size_i,model,X_train,y_train,X_test,y_test,scorer,strat):
model_i = clone(model)
if size_i == 1:
#last iteration
X_i,y_i = shuffle_in_unison_2d(X_train,y_train)
else:
X_i,_,y_i,_ = train_test_split(X_train,train_size=size_i,stratify=strat)
model_i.fit(X_i,y_i.ravel())
return scorer(model_i,X_i,y_i),scorer(model_i,y_test)
def learning_curve_mp(X_train,**kwargs):
n_jobs = kwargs.pop('n_jobs',1)
n_jobs = len(os.sched_getaffinity(0)) if n_jobs==-1 else n_jobs
n = kwargs.pop('n',20)
cv = kwargs.pop('cv',10)
y_window = kwargs.pop('y_window',None)
test_idx = kwargs.pop('test_idx',[])
fname = kwargs.pop('fname',None)
order = kwargs.pop('order',None)
if kwargs:
raise TypeError("Invalid parameters passed: {}".format(kwargs))
strat = y_train if is_classifier(model) else np.array(y_train > 0,dtype=int)
n_samples = []
train_loss = np.zeros((n,cv))
test_loss = np.zeros((n,cv))
if n_jobs == 1:
# serial version
for i,size_i in enumerate(np.linspace(0,1,n+1)[1:]):
for j in range(cv):
train,test = lc_fit_and_score(size_i,strat)
train_loss[i,j] = train
test_loss[i,j] = test
n_samples.append(int(X_train.shape[0]*size_i))
else:
# parallel version
for i,n+1)[1:]):
pool = Pool(n_jobs,initializer=init_pool,initargs=(,))
result_objects = [pool.apply_async(lc_fit_and_score,args=(size_i,strat)) for x in range(cv)]
pool.close()
pool.join()
train_loss[i] = [x.get()[0] for x in result_objects]
test_loss[i] = [x.get()[1] for x in result_objects]
n_samples.append(int(X_train.shape[0]*size_i))
理想地,遍历样本数量的循环也可以并行化,但是我也不确定如何做到这一点。任何帮助表示赞赏!