我正在从事机器学习项目。我最初使用scikit-learn(sklearn)库。在模型优化过程中,我使用了sklearn的经典GridSearchCV类。当前,它使用来自运行api的主机(joblib库)的所有资源进行并行化。在下面,您有一个示例,
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from datetime import datetime
import numpy as np
def count_trials(param_grid):
total_trials = 0
for k,v in param_grid.items():
total_trials += len(v)
return total_trials
# Load data
digits = datasets.load_digits()
X,y = digits.data,digits.target
print("")
print("Iris data set: ")
print("X: {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")
param_grid = {"max_depth": [3,None],"max_features": ["auto"],"min_samples_split": [2,3,4,5,10,20],"min_samples_leaf": [1,6,"bootstrap": [True],"criterion": ["entropy"],"n_estimators": [40,80],}
cv = 5
n_models = count_trials(param_grid)
print("trying {} models,with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))
start_time = datetime.now()
print("Starting at {}".format(start_time))
gs = GridSearchCV(estimator = RandomForestClassifier(),param_grid=param_grid,cv=cv,refit=True,scoring="accuracy",n_jobs = -1)
gs.fit(X,y)
end_time = datetime.now()
print("Ending at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))
我最近发现它已被扩展为使用Spark集群的资源(pyspark和spark_sklearn库)。我设法建立了一个由一个主控和两名工人组成的火花集群。下面的代码运行与以前相同的任务,但使用的是Spark集群资源。
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV as SKGridSearchCV
from spark_sklearn import GridSearchCV as SparkGridSearchCV
from pyspark import SparkConf,SparkContext
from datetime import datetime
import numpy as np
def get_context():
sc_conf = SparkConf()
sc_conf.setappName("test-sklearn-spark-app")
sc_conf.setMaster('spark://<master-IP>:7077')
sc_conf.set('spark.cores.max','40')
sc_conf.set('spark.logConf',True)
print(sc_conf.getall())
return SparkContext(conf=sc_conf)
def count_trials(param_grid):
total_trials = 0
for k,n_models*cv))
sc = get_context()
gs = SparkGridSearchCV(sc = sc,estimator = RandomForestClassifier(),n_jobs = -1)
start_time = datetime.now()
print("Starting at {}".format(start_time))
gs.fit(X,y)
end_time = datetime.now()
print("Ending at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))
其中master-IP是主节点的IP。该代码可以使用spark集群中的所有可用资源完美运行。
然后,我配置了一个具有一个主节点和一个从属节点的kubernetes集群。然后,我运行与之前相同的代码,但将行替换为
sc_conf.setMaster('spark://<master-IP>:7077')
作者
sc_conf.setMaster('k8s://<master-IP>:<PORT>')
其中master-IP和PORT是我通过在主节点上运行命令获得的,
kubectl cluster-info
问题是我的代码不再起作用。它显示以下错误消息,
19/11/07 12:57:32 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.<init>(BasicExecutorFeatureStep.scala:40)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorpodsAllocator$$onNewsnapshots$1.apply$mcVI$sp(ExecutorpodsAllocator.scala:133)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator.org$apache$spark$scheduler$cluster$k8s$ExecutorpodsAllocator$$onNewsnapshots(ExecutorpodsAllocator.scala:126)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator$$anonfun$start$1.apply(ExecutorpodsAllocator.scala:68)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator$$anonfun$start$1.apply(ExecutorpodsAllocator.scala:68)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorpodssnapshotsStoreImpl$$callSubscriber$1.apply$mcV$sp(ExecutorpodssnapshotsStoreImpl.scala:102)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorpodssnapshotsStoreImpl$$callSubscriber(ExecutorpodssnapshotsStoreImpl.scala:99)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl$$anonfun$addSubscriber$1.apply$mcV$sp(ExecutorpodssnapshotsStoreImpl.scala:71)
at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl$$anon$1.run(ExecutorpodssnapshotsStoreImpl.scala:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.Futuretask.runAndReset(Futuretask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFuturetask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFuturetask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
似乎它说我必须指定一个docker映像,但我不知道该怎么做。
有人对此有任何经验吗?我一直在网上浏览,但没有答案。
先谢谢您
强麦