我正在使用Databricks,并且在数据框中有一列,我需要使用外部Web服务调用为每条记录更新该列。在这种情况下,它使用Azure机器学习服务SDK并进行服务调用。这段代码在未作为Spark的UDF(例如python)运行时可以正常工作,但是当我尝试将其作为UDF调用时,它将引发序列化错误。如果我使用lambda和带有rdd的地图,也会发生同样的情况。
该模型使用fastText,可以通过正常的http调用或使用AMLS的WebService SDK从Postman或python很好地调用它-仅当它是UDF时,它会失败并显示以下消息:
TypeError:无法腌制_thread._local对象
我能想到的唯一解决方法是依次遍历数据帧中的每个记录,并通过调用来更新记录,但这并不是很有效。我不知道这是一个火花错误还是因为该服务正在加载快速文本模型。当我使用UDF并模拟返回值时,它仍然有效。
底部错误...
from azureml.core.webservice import Webservice,AciWebservice
from azureml.core import Workspace
def predictModelValue2(summary,modelName,modelLabel):
raw_data = '[{"label": "' + modelLabel + '","model": "' + modelName + '","as_full_account": "' + summary + '"}]'
prediction = service.run(raw_data)
return prediction
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
predictModelValueUDF = udf(predictModelValue2)
DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result",predictModelValueUDF("Summary","ModelName","ModelLabel"))
TypeError:无法腌制_thread._local对象
在处理上述异常期间,发生了另一个异常:
PicklingError Traceback(最近的通话) 最后) ----> 2 x = df.withColumn(“ Result”,ForecastModelValueUDF(“ Summary”, “ ModelName”,“ ModelLabel”))
包装器中的/databricks/spark/python/pyspark/sql/udf.py(* args) (194)第194章 195 def包装器(* args): -> 196返回self(* args) 197 198包装器。名称 = self._name
呼叫中的/databricks/spark/python/pyspark/sql/udf.py(自身,* cols) 172 173 def 通话(自身,* cols): -> 174 judf =自我._judf 175 sc = SparkContext._active_spark_context 176 return Column(judf.apply(_to_seq(sc,cols,_to_java_column)))
_judf中的/databricks/spark/python/pyspark/sql/udf.py(自) 156#并且对性能的影响应该最小。 157如果self._judf_placeholder为None: -> 158 self._judf_placeholder = self._create_judf() 159返回self._judf_placeholder 160
_create_judf中的/databricks/spark/python/pyspark/sql/udf.py(自已) 165点= spark.sparkContext 166 -> 167 wrapper_func = _wrap_function(sc,self.func,self.returnType) 第168章 第169章(六更) _wrap_function(sc,中的
/databricks/spark/python/pyspark/sql/udf.py func,returnType) 33 def _wrap_function(sc,func,returnType): 34命令=(func,returnType) ---> 35个pickled_command,broadcast_vars,env,包括= _prepare_for_python_RDD(sc,command) 36 return sc._jvm.PythonFunction(bytearray(pickled_command),env,包括sc.pythonExec, 37 sc.pythonVer,broadcast_vars,sc._javaaccumulator)
_prepare_for_python_RDD中的/databricks/spark/python/pyspark/rdd.py(sc, 命令)2461#序列化的命令将被压缩 广播2462 ser = CloudPickleSerializer() -> 2463 pickled_command = ser.dumps(命令)2464如果len(pickled_command)> sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):#默认值1M
转储中的
2465#广播的生命周期与创建的生命周期相同 PythonRDD/databricks/spark/python/pyspark/serializers.py(self,obj) 709 msg =“无法序列化对象:%s:%s”%(例如,类。名称,emsg) (710)第710章 -> 711提高pickle.PicklingError(msg) 712 713
PicklingError:无法序列化对象:TypeError:无法腌制 _thread._local对象