将python函数传递给pyspark中的Scala RDD

我有一个Scala库,(简单地说)它接收一个函数,将其应用于RDD并返回另一个RDD

def runFunction(rdd: RDD,function: Any => Any) = {
    ....
    val res = rdd.map(function) 
    ...
}

在scala中,用法为

import mylibrary.runFunction
runFunction(myRdd,myScalaFun)

此库打包在一个jar中,我现在也想在python中使用它。我想做的是在Python中加载该库并将其传递给python函数。在Python中的用法是:

spark._jvm.mylibrary.runFunction(myPythonRdd,myPythonFun)

这将允许我使用python函数以及Scala函数,而无需将整个库移植到python。通过在Python和JVM之间来回运行的Spark功能是否可以实现这一点?

mademeicidouyaozhucc 回答:将python函数传递给pyspark中的Scala RDD

PySpark中的Python和JVM通讯方式有些细微之处。桥使用Java对象,即JavaRDD而不是RDD,并且这些对象需要在Scala中进行显式拆箱。由于您的Scala函数采用RDD,因此您需要在Scala中编写一个包装,该包装会收到JavaRDD并首先执行拆箱操作:

def runFunctionWrapper(jrdd: JavaRDD,...) = {
  runFunction(jrdd.rdd,...)
}

然后称呼它

spark._jvm.mylibrary.runFunctionWrapper(myPythonRdd._jrdd,...)

请注意,根据Python约定,_jrdd被视为Python RDD类的私有成员,因此,这实际上依赖于未记录的实现细节。 _jvm的{​​{1}}成员也是如此。

真正的问题是使Scala回调Python以应用SparkContext。在PySpark中,Python RDD的function方法创建一个map()的实例,该实例保存对Python映射器函数及其环境的腌制引用。然后,将每个RDD分区进行序列化,并将腌制后的材料通过TCP发送到与Spark执行程序位于同一位置的Python进程,在该进程中反序列化并迭代该分区。最后,结果再次被序列化并发送回执行器。整个过程由org.apache.spark.api.python .PythonFunction的实例协调。这与围绕Python函数构建包装并将其传递给org.apache.spark.api.python.PythonRunner实例的map()方法非常不同。

我认为最好是在Python中简单地复制RDD的功能,或者(在性能上更好)在Scala中复制runFunction的功能。或者,如果您可以交互式地进行操作,请遵循@EnzoBnl的建议,并使用Zeppelin或Polynote等多语言笔记本环境。

本文链接:https://www.f2er.com/3103246.html

大家都在问