NORM PPF 函数 pyspark

使用的代码:

from pyspark.sql.types import FloatType
from scipy import stats 
from scipy.stats import norm
mylist = [0.083,0.219,0.126] 
df = spark.createDataFrame(mylist,FloatType())
df.show() 
norm_ppf = F.udf(lambda x: float(norm.ppf(x)))
df.withColumn("var2",norm_ppf(df['value'])).show()

系统上已经安装了模块 scipy。我们如何解决这个问题?有什么方法可以在 pyspark 的列上应用 norm.ppf 函数

错误信息:

/var/opt/teradata/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/dataframe.py 第 376 章 如果 isinstance(truncate,bool) 和 truncate: --> 378
打印(self._jdf.showString(n,20,垂直))379否则: 第380话 垂直))
/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py 中 调用(self,*args) 1255 answer = self.gateway_client.send_command(command) 1256
return_value = get_return_value( -> 1257 答案, self.gateway_client,self.target_id,self.name) 1258 1259 对于 temp_args 中的 temp_arg:
/var/opt/teradata/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/utils.py 在 deco(*a,**kw) 61 def deco(*a,**kw): 62
尝试:---> 63 返回 f(*a,**kw) 64
除了 py4j.protocol.Py4JJavaError 为 e: 65 s = e.java_exception.toString()
/usr/local/lib/python3.6/site-packages/py4j/protocol.py 中 get_return_value(answer,gateway_client,target_id,name) 326
raise Py4JJavaError( 327 "发生错误 在调用 {0}{1}{2} 时。\n"。--> 328
格式(target_id,".",name),value) 329 else:
330 引发 Py4JError(Py4JJavaError:一个错误 调用 o2145.showString 时发生。 : org.apache.spark.SparkException:由于阶段失败,作业中止: 阶段 19.0 中的任务 0 失败了 4 次,最近一次失败:丢失任务 0.3 阶段 19.0(TID 82,d8-td-cdh.boigroup.net,执行程序 38):org.apache.spark.api.python.PythonException:Traceback(最近 最后调用):文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 361 行,在 main func、profiler、deserializer、serializer = read_udfs(pickleSer,infile,eval_type) 文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 236 行,在 read_udfs arg_offsets 中,udf = read_single_udf(pickleSer,eval_type,runner_conf) 文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 163 行,在 read_single_udf f 中,return_type = read_command(pickleSer,infile) 文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 64 行,在 read_command 命令中 = serializer._read_with_length(file) 文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 172 行,在 _read_with_length 中返回 self.loads(obj) 文件 "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 577 行,在负载中返回 pickle.loads(obj,encoding=encoding) ModuleNotFoundError: 没有名为“scipy”的模块 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) 在 org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81) 在 org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasnext(PythonRunner.scala:406) 在 org.apache.spark.InterruptibleIterator.hasnext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator$$anon$12.hasnext(Iterator.scala:440)
在 scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
在 scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processnext(未知 来源)在 org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43) 在 org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$11$$anon$1.hasnext(WholeStagecodegenExec.scala:624) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
在 org.apache.spark.scheduler.Task.run(Task.scala:121) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 驱动程序堆栈跟踪:
在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handletasksetfailed$1.apply(DAGScheduler.scala:929) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handletasksetfailed$1.apply(DAGScheduler.scala:929) 在 scala.Option.foreach(Option.scala:257) 在 org.apache.spark.scheduler.DAGScheduler.handletasksetfailed(DAGScheduler.scala:929) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
在 org.apache.spark.sql.execution.SparkPlan.executetake(SparkPlan.scala:365) 在 org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) 在 org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) 在 org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) 在 org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) 在 org.apache.spark.sql.Dataset.withaction(Dataset.scala:3363) 在 org.apache.spark.sql.Dataset.head(Dataset.scala:2544) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:27​​58) 在 org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) 在 org.apache.spark.sql.Dataset.showString(Dataset.spark:291) 在 sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62) 在 sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) norm_cdf = F.udf(lambda x: float(norm.cdf(x))) df.withColumn("var2",norm_cdf(df['value'])).show()

businiao555 回答:NORM PPF 函数 pyspark

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/14148.html

大家都在问