Spark流,从套接字读取:java.lang.ClassCastException:java.lang.String无法强制转换为org.apache.spark.unsafe.types.UTF8String

我在Windows 10上尝试使用Spark Streaming(Spark 2.4.4)从TCPsocket源(到目前为止测试目的)中以'\n'分隔多个读取文本行。应该对单词计数,并在控制台上定期显示当前单词计数。这是Spark流的标准测试,可以在几本书和Web帖子中找到,但是似乎在套接字源中失败:

文本字符串是从Java程序发送的,例如:

serverOutsock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutsock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');

在Spark接收方,Scala代码如下:

val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
      .trigger(Trigger.Continuous("1 second"))
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination

因此,我想在控制台上以当前字数为单位每秒写一次。

但是我得到一个错误:

  

java.lang.ClassCastException:无法将java.lang.String强制转换为org.apache.spark.unsafe.types.UTF8String

,Spark似乎没有从源头处理任何内容(由于源输入强制转换?)。至少没有任何内容写在控制台上。可能是什么原因?

全栈跟踪如下:

Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processnext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$11$$anon$1.hasnext(WholeStagecodegenExec.scala:619)
        at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasnext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processnext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$11$$anon$1.hasnext(WholeStagecodegenExec.scala:619)
        at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我尝试删除coalesque(1)并将Continuous触发器替换为ProcessingTime触发器。这使得错误不会发生,但是控制台打印输出变为:


批次:0

+ ----- + ----- + |值|计数| + ----- + ----- + + ----- + ----- +

也就是说,即使确实有许多单词注入到套接字中,也没有输出。另外,此输出仅一次显示,并且比1秒后显示的时间晚得多。

epigram 回答:Spark流,从套接字读取:java.lang.ClassCastException:java.lang.String无法强制转换为org.apache.spark.unsafe.types.UTF8String

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3136871.html

大家都在问