调用外部Web服务的Databricks UDF无法序列化(PicklingError)

我正在使用Databricks,并且在数据框中有一列,我需要使用外部Web服务调用为每条记录更新该列。在这种情况下,它使用Azure机器学习服务SDK并进行服务调用。这段代码在未作为Spark的UDF(例如python)运行时可以正常工作,但是当我尝试将其作为UDF调用时,它将引发序列化错误。如果我使用lambda和带有rdd的地图,也会发生同样的情况。

该模型使用fastText,可以通过正常的http调用或使用AMLS的WebService SDK从Postman或python很好地调用它-仅当它是UDF时,它会失败并显示以下消息:

TypeError:无法腌制_thread._local对象

我能想到的唯一解决方法是依次遍历数据帧中的每个记录,并通过调用来更新记录,但这并不是很有效。我不知道这是一个火花错误还是因为该服务正在加载快速文本模型。当我使用UDF并模拟返回值时,它仍然有效。

底部错误...

from azureml.core.webservice import Webservice,AciWebservice
from azureml.core import Workspace

def predictModelValue2(summary,modelName,modelLabel):  
    raw_data = '[{"label": "' + modelLabel + '","model": "' + modelName + '","as_full_account": "' + summary + '"}]'
    prediction = service.run(raw_data)
    return prediction

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

predictModelValueUDF = udf(predictModelValue2)

DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result",predictModelValueUDF("Summary","ModelName","ModelLabel"))
  

TypeError:无法腌制_thread._local对象

     

在处理上述异常期间,发生了另一个异常:

     

PicklingError Traceback(最近的通话)   最后)   ----> 2 x = df.withColumn(“ Result”,ForecastModelValueUDF(“ Summary”,   “ ModelName”,“ ModelLabel”))

     包装器中的

/databricks/spark/python/pyspark/sql/udf.py(* args)       (194)第194章       195 def包装器(* args):   -> 196返回self(* args)       197       198包装器。名称 = self._name

     呼叫中的

/databricks/spark/python/pyspark/sql/udf.py(自身,* cols)       172       173 def 通话(自身,* cols):   -> 174 judf =自我._judf       175 sc = SparkContext._active_spark_context       176 return Column(judf.apply(_to_seq(sc,cols,_to_java_column)))

     _judf中的

/databricks/spark/python/pyspark/sql/udf.py(自)       156#并且对性能的影响应该最小。       157如果self._judf_placeholder为None:   -> 158 self._judf_placeholder = self._create_judf()       159返回self._judf_placeholder       160

     _create_judf中的

/databricks/spark/python/pyspark/sql/udf.py(自已)       165点= spark.sparkContext       166   -> 167 wrapper_func = _wrap_function(sc,self.func,self.returnType)       第168章       第169章(六更)      _wrap_function(sc,中的

/databricks/spark/python/pyspark/sql/udf.py   func,returnType)        33 def _wrap_function(sc,func,returnType):        34命令=(func,returnType)   ---> 35个pickled_command,broadcast_vars,env,包括= _prepare_for_python_RDD(sc,command)        36 return sc._jvm.PythonFunction(bytearray(pickled_command),env,包括sc.pythonExec,        37 sc.pythonVer,broadcast_vars,sc._javaaccumulator)

     _prepare_for_python_RDD中的

/databricks/spark/python/pyspark/rdd.py(sc,   命令)2461#序列化的命令将被压缩   广播2462 ser = CloudPickleSerializer()   -> 2463 pickled_command = ser.dumps(命令)2464如果len(pickled_command)>   sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):#默认值1M
  2465#广播的生命周期与创建的生命周期相同   PythonRDD

     转储中的

/databricks/spark/python/pyspark/serializers.py(self,obj)       709 msg =“无法序列化对象:%s:%s”%(例如,名称,emsg)       (710)第710章   -> 711提高pickle.PicklingError(msg)       712       713

     

PicklingError:无法序列化对象:TypeError:无法腌制   _thread._local对象

skyson99599 回答:调用外部Web服务的Databricks UDF无法序列化(PicklingError)

我不是DataBricks或Spark的专家,但是当您触摸诸如service对象之类的复杂对象时,从本地笔记本上下文中进行酸洗功能总是有问题的。在这种特殊情况下,我建议删除对azureML service对象的依赖,而仅使用requests来调用服务。

从服务中提取密钥:

# retrieve the API keys. two keys were generated.
key1,key2 = service.get_keys()
scoring_uri = service.scoring_uri

您应该能够直接在UDF中使用这些字符串,而不会出现酸洗问题-here is an example,说明如何仅通过请求调用服务。以下内容适用于您的UDF:

import requests,json
def predictModelValue2(summary,modelName,modelLabel):  
  input_data = json.dumps({"summary": summary,"modelName":,....})

  headers = {'Content-Type':'application/json','Authorization': 'Bearer ' + key1}

  # call the service for scoring
  resp = requests.post(scoring_uri,input_data,headers=headers)

  return resp.text[1]

但是,在一个侧节点上:UDF将在数据帧中的每一行被调用,并且每次它将进行网络调用-这将非常慢。我建议寻找批处理执行的方法。正如您从构造的json service.run中看到的那样,该数组将接受一系列项目,因此您应该以100左右的批量调用它。

本文链接:https://www.f2er.com/3118319.html

大家都在问