在kubernetes集群中使用spark_sklearn

我正在从事机器学习项目。我最初使用scikit-learn(sklearn)库。在模型优化过程中,我使用了sklearn的经典GridSearchCV类。当前,它使用来自运行api的主机(joblib库)的所有资源进行并行化。在下面,您有一个示例,

from sklearn                  import datasets
from sklearn.ensemble         import RandomForestClassifier
from sklearn.model_selection  import GridSearchCV
from datetime                 import datetime
import numpy as np

def count_trials(param_grid):
    total_trials = 0
    for k,v in param_grid.items():
        total_trials += len(v)

    return total_trials

# Load data
digits = datasets.load_digits()

X,y = digits.data,digits.target

print("")
print("Iris data set: ")
print("X:      {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")

param_grid = {"max_depth":         [3,None],"max_features":      ["auto"],"min_samples_split": [2,3,4,5,10,20],"min_samples_leaf":  [1,6,"bootstrap":         [True],"criterion":         ["entropy"],"n_estimators":      [40,80],}
cv = 5

n_models = count_trials(param_grid)

print("trying {} models,with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))

start_time = datetime.now()
print("Starting at {}".format(start_time))
gs = GridSearchCV(estimator = RandomForestClassifier(),param_grid=param_grid,cv=cv,refit=True,scoring="accuracy",n_jobs = -1)
gs.fit(X,y)
end_time   = datetime.now()

print("Ending   at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))

我最近发现它已被扩展为使用Spark集群的资源(pyspark和spark_sklearn库)。我设法建立了一个由一个主控和两名工人组成的火花集群。下面的代码运行与以前相同的任务,但使用的是Spark集群资源。

from sklearn                  import datasets
from sklearn.ensemble         import RandomForestClassifier
from sklearn.model_selection  import GridSearchCV as SKGridSearchCV
from spark_sklearn            import GridSearchCV as SparkGridSearchCV
from pyspark                  import SparkConf,SparkContext
from datetime                 import datetime
import numpy as np

def get_context():
    sc_conf = SparkConf()
    sc_conf.setappName("test-sklearn-spark-app")
    sc_conf.setMaster('spark://<master-IP>:7077')
    sc_conf.set('spark.cores.max','40')
    sc_conf.set('spark.logConf',True)
    print(sc_conf.getall())

    return SparkContext(conf=sc_conf)

def count_trials(param_grid):
    total_trials = 0
    for k,n_models*cv))

sc = get_context()
gs = SparkGridSearchCV(sc = sc,estimator = RandomForestClassifier(),n_jobs = -1)

start_time = datetime.now()
print("Starting at {}".format(start_time))
gs.fit(X,y)
end_time   = datetime.now()

print("Ending   at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))

其中master-IP是主节点的IP。该代码可以使用spark集群中的所有可用资源完美运行。

然后,我配置了一个具有一个主节点和一个从属节点的kubernetes集群。然后,我运行与之前相同的代码,但将行替换为

sc_conf.setMaster('spark://<master-IP>:7077')

作者

sc_conf.setMaster('k8s://<master-IP>:<PORT>')

其中master-IP和PORT是我通过在主节点上运行命令获得的,

kubectl  cluster-info

问题是我的代码不再起作用。它显示以下错误消息,

19/11/07 12:57:32 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.<init>(BasicExecutorFeatureStep.scala:40)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorpodsAllocator$$onNewsnapshots$1.apply$mcVI$sp(ExecutorpodsAllocator.scala:133)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator.org$apache$spark$scheduler$cluster$k8s$ExecutorpodsAllocator$$onNewsnapshots(ExecutorpodsAllocator.scala:126)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator$$anonfun$start$1.apply(ExecutorpodsAllocator.scala:68)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodsAllocator$$anonfun$start$1.apply(ExecutorpodsAllocator.scala:68)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorpodssnapshotsStoreImpl$$callSubscriber$1.apply$mcV$sp(ExecutorpodssnapshotsStoreImpl.scala:102)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorpodssnapshotsStoreImpl$$callSubscriber(ExecutorpodssnapshotsStoreImpl.scala:99)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl$$anonfun$addSubscriber$1.apply$mcV$sp(ExecutorpodssnapshotsStoreImpl.scala:71)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorpodssnapshotsStoreImpl$$anon$1.run(ExecutorpodssnapshotsStoreImpl.scala:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.Futuretask.runAndReset(Futuretask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFuturetask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFuturetask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

似乎它说我必须指定一个docker映像,但我不知道该怎么做。

有人对此有任何经验吗?我一直在网上浏览,但没有答案。

先谢谢您

强麦

wudan713 回答:在kubernetes集群中使用spark_sklearn

我建议您先阅读doc

在Kubernetes上运行Spark时,您的进程将被提交到Docker容器内的Kubernetes集群,这在Job提交过程中应该是已知的。设置为SparkConf:

  • spark.kubernetes.container.image

  • spark.kubernetes.driver.container.image
  • spark.kubernetes.executor.container.image

还要确保您的Kubernetes集群可以提取这些映像,最简单的方法是将它们推送到DockerHub。请按照guide的操作来构建Spark Docker映像。

似乎您是在“客户端”模式下运行作业,因此请考虑到networking notes。基本上,您需要确保可以从Kubernetes网络(特别是执行程序Pods)内部访问驱动程序(可能在本地计算机上运行)的过程,这不太明显。同样,您的驱动程序进程也应具有对执行程序Pod的网络访问权限。实际上,将客户端模式下的Spark Jobs从本地工作站提交到远程Kubernetes集群真的很棘手,我建议您先尝试一下集群模式。

如果您想以集群模式提交作业,则需要确保可以从Spark Driver和Executor Pods访问Job工件(在您的情况下为python脚本)及其依赖项(最简单的方法是放置脚本)以及Spark类路径上Spark Docker映像内的所有依赖项。

与其以通常的方式为您工作,不应该采用这种方式。

您还可以参考Helm chart of Spark on Kubernetes cluster,其中包括Jupyter笔记本电脑的集成,这使得在Kubernetes上运行交互式Spark会话更加容易。

本文链接:https://www.f2er.com/3145043.html

大家都在问