spark_apply适用于一个数据集,但不适用于另一个数据集(两个数据集具有相同的类型和结构)

我正在与Sparklyr合作处理数据块。

我面临的问题是,当我在一个数据集上运行时,sp​​ark_apply()引发错误,但是在另一个数据集(具有相同结构和类型)上运行时,sp​​ark_apply()可以正常工作。我想念什么吗? 错误消息(转载)没有太大帮助。

下面的简单spark_apply函数:

 spark_apply(hr2,function(y) y*2)

hr2的架构和类

$LITRES
$LITRES$name
[1] "LITRES"

$LITRES$type
[1] "DoubleType"


class(hr2)
[1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"    

错误:

Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 6835.0 failed 4 times,most recent failure: Lost task 3.3 in stage 6835.0 (TID 132104,10.139.64.8,executor 1): java.lang.Exception: sparklyr worker rscript failure with status 255,check worker logs for details.
    at sparklyr.Rscript.init(rscript.scala:106)
    at sparklyr.WorkerApply$$anon$2.run(workerapply.scala:116)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handletasksetfailed$1.apply(DAGScheduler.scala:1096)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handletasksetfailed$1.apply(DAGScheduler.scala:1096)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handletasksetfailed(DAGScheduler.scala:1096)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2240)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2828)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3440)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2795)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2795)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3424)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3419)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withaction(Dataset.scala:3419)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2795)
    at sparklyr.Utils$.collect(utils.scala:204)
    at sparklyr.Utils.collect(utils.scala)
    at sun.reflect.GeneratedMethodaccessor398.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: sparklyr worker rscript failure with status 255,check worker logs for details.
    at sparklyr.Rscript.init(rscript.scala:106)
    at sparklyr.WorkerApply$$anon$2.run(workerapply.scala:116)

In addition: Warning messages:
1: Missing values are always removed in SQL.
Use `SUM(x,na.rm = TRUE)` to silence this warning 
2: Missing values are always removed in SQL.
Use `SUM(x,na.rm = TRUE)` to silence this warning 

工作程序节点的错误日志:

19/11/11 16:12:30 INFO sparklyr: Worker (1805) is starting R process
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is starting 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is connecting to backend using port 40277 
19/11/11 16:12:31 INFO sparklyr: Session (1805) accepted connection
19/11/11 16:12:31 INFO sparklyr: Session (1805) is waiting for sparklyr client to connect to port 40277
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is querying ports from backend using port 40277 
19/11/11 16:12:31 INFO sparklyr: Session (1805) received command 0
19/11/11 16:12:31 INFO sparklyr: Session (1805) found requested session matches current session
19/11/11 16:12:31 INFO sparklyr: Session (1805) is creating backend and allocating system resources
19/11/11 16:12:31 INFO sparklyr: Session (1805) is using port 0 for backend channel
19/11/11 16:12:31 INFO sparklyr: Session (1805) created the backend
19/11/11 16:12:31 INFO sparklyr: Session (1805) is waiting for r process to end
19/11/11 16:12:31 INFO sparklyr: RScript (1805) found redirect gateway port 40277 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is connected to backend 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is connecting to backend session 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is connected to backend session 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) created connection 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) is connected 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) retrieved worker context id 0 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) retrieved worker context 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) using bundle name packages.tar 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) using bundle path /databricks/spark/work/app-20191111144228-0000/1/packages.tar 
19/11/11 16:12:31 INFO sparklyr: RScript (1805) updated .libPaths with bundle packages 
19/11/11 16:12:31 ERROR sparklyr: RScript (1805) terminated unexpectedly: java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.getTaskInputMetrics$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.beginUpdateFilesystemSQLMetrics$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processnext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$13$$anon$1.hasnext(WholeStagecodegenExec.scala:638)
    at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.Traversableonce$class.to(Traversableonce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.Traversableonce$class.toBuffer(Traversableonce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.Traversableonce$class.toArray(Traversableonce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at sparklyr.WorkerContext.getsourceArray(workercontext.scala:47)
    at sparklyr.WorkerContext.getsourceArrayLength(workercontext.scala:54)
    at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
    at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)

将相同的代码应用于以下创建的虚拟数据集时,不会出错:

# Create R dataframe
set.seed(21)
x <- rnorm(1e4)
x_df <- data.frame(x)
sc <- spark_connect(method = "databricks") 
x_sprk <- copy_to(sc,x_df,name ="x_sql",overwrite = TRUE)

x_sprk的架构和类为:

sdf_schema(x_sprk)
$PREV_VAL
$PREV_VAL$name
[1] "PREV_VAL"

$PREV_VAL$type
[1] "DoubleType"


class(x_sprk)
[1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"      

相关功能。与先前相同,但适用于x_sprk

spark_apply(x_sprk,function(y) y*2)

如前所述,spark_apply对于第二个数据集工作正常。

能否请您帮我解决导致错误消息的第一个数据集? 数据集没有任何缺失值。

zwm454 回答:spark_apply适用于一个数据集,但不适用于另一个数据集(两个数据集具有相同的类型和结构)

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3122826.html

大家都在问