使用的代码:
from pyspark.sql.types import FloatType
from scipy import stats
from scipy.stats import norm
mylist = [0.083,0.219,0.126]
df = spark.createDataFrame(mylist,FloatType())
df.show()
norm_ppf = F.udf(lambda x: float(norm.ppf(x)))
df.withColumn("var2",norm_ppf(df['value'])).show()
系统上已经安装了模块 scipy。我们如何解决这个问题?有什么方法可以在 pyspark 的列上应用 norm.ppf 函数
错误信息:
/var/opt/teradata/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/dataframe.py
第 376 章
如果 isinstance(truncate,bool) 和 truncate: --> 378
打印(self._jdf.showString(n,20,垂直))379否则:
第380话
垂直))
/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py 中
调用(self,*args) 1255 answer = self.gateway_client.send_command(command) 1256
return_value = get_return_value( -> 1257 答案,
self.gateway_client,self.target_id,self.name) 1258 1259
对于 temp_args 中的 temp_arg:
/var/opt/teradata/cloudera/parcels/CDH/lib/spark/python/pyspark/sql/utils.py
在 deco(*a,**kw) 61 def deco(*a,**kw): 62
尝试:---> 63 返回 f(*a,**kw) 64
除了 py4j.protocol.Py4JJavaError 为 e: 65 s =
e.java_exception.toString()
/usr/local/lib/python3.6/site-packages/py4j/protocol.py 中
get_return_value(answer,gateway_client,target_id,name) 326
raise Py4JJavaError( 327 "发生错误
在调用 {0}{1}{2} 时。\n"。--> 328
格式(target_id,".",name),value) 329 else:
330 引发 Py4JError(Py4JJavaError:一个错误
调用 o2145.showString 时发生。 :
org.apache.spark.SparkException:由于阶段失败,作业中止:
阶段 19.0 中的任务 0 失败了 4 次,最近一次失败:丢失任务
0.3 阶段 19.0(TID 82,d8-td-cdh.boigroup.net,执行程序 38):org.apache.spark.api.python.PythonException:Traceback(最近
最后调用):文件
"/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 361 行,在 main func、profiler、deserializer、serializer =
read_udfs(pickleSer,infile,eval_type) 文件
"/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 236 行,在 read_udfs arg_offsets 中,udf =
read_single_udf(pickleSer,eval_type,runner_conf) 文件
"/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 163 行,在 read_single_udf f 中,return_type =
read_command(pickleSer,infile) 文件
"/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 64 行,在 read_command 命令中 =
serializer._read_with_length(file) 文件
"/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 172 行,在 _read_with_length 中返回 self.loads(obj) 文件
"/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 577 行,在负载中返回 pickle.loads(obj,encoding=encoding)
ModuleNotFoundError: 没有名为“scipy”的模块
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
在
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
在
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
在
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasnext(PythonRunner.scala:406)
在
org.apache.spark.InterruptibleIterator.hasnext(InterruptibleIterator.scala:37)
在 scala.collection.Iterator$$anon$12.hasnext(Iterator.scala:440)
在 scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
在 scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
在
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processnext(未知
来源)在
org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43)
在
org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$11$$anon$1.hasnext(WholeStagecodegenExec.scala:624)
在
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
在
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
在
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
在
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
在
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
在 org.apache.spark.scheduler.Task.run(Task.scala:121) 在
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
在
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
在
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
在 java.lang.Thread.run(Thread.java:748) 驱动程序堆栈跟踪:
在
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
在
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
在
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$handletasksetfailed$1.apply(DAGScheduler.scala:929)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$handletasksetfailed$1.apply(DAGScheduler.scala:929)
在 scala.Option.foreach(Option.scala:257) 在
org.apache.spark.scheduler.DAGScheduler.handletasksetfailed(DAGScheduler.scala:929)
在
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
在
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
在
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
在
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
在
org.apache.spark.sql.execution.SparkPlan.executetake(SparkPlan.scala:365)
在
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
在
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
在
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
在
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
在 org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
在
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
在
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
在
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
在 org.apache.spark.sql.Dataset.withaction(Dataset.scala:3363) 在
org.apache.spark.sql.Dataset.head(Dataset.scala:2544) 在
org.apache.spark.sql.Dataset.take(Dataset.scala:2758) 在
org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) 在
org.apache.spark.sql.Dataset.showString(Dataset.spark:291) 在
sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method) 在
sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
在
sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
在 java.lang.reflect.Method.invoke(Method.java:498) 在
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
在 py4j.Gateway.invoke(Gateway.java:282) 在
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在
py4j.GatewayConnection.run(GatewayConnection.java:238) 在
java.lang.Thread.run(Thread.java:748) norm_cdf =
F.udf(lambda x: float(norm.cdf(x))) df.withColumn("var2",norm_cdf(df['value'])).show()