我在Windows 10上尝试使用Spark Streaming(Spark 2.4.4)从TCPsocket源(到目前为止测试目的)中以'\n'
分隔多个读取文本行。应该对单词计数,并在控制台上定期显示当前单词计数。这是Spark流的标准测试,可以在几本书和Web帖子中找到,但是似乎在套接字源中失败:
文本字符串是从Java程序发送的,例如:
serverOutsock = new ServerSocket(9999);
// Establish connection; wait for Spark to connect
sockOut = serverOutsock.accept();
// Set UTF-8 as format
sockOutput = new OutputStreamWriter(sockOut.getOutputStream(),"UTF-8");
// Multiple Java Strings are now written (thousands of them) like
sockOutput.write(string+'\n');
在Spark接收方,Scala代码如下:
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream.format("socket").option("host","localhost").option("port",9999).load
val words = socketDF.as[String].flatMap(_.split(" ")).coalesce(1)
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.trigger(Trigger.Continuous("1 second"))
.outputMode("complete")
.format("console")
.start
.awaitTermination
因此,我想在控制台上以当前字数为单位每秒写一次。
但是我得到一个错误:
java.lang.ClassCastException:无法将java.lang.String强制转换为org.apache.spark.unsafe.types.UTF8String
,Spark似乎没有从源头处理任何内容(由于源输入强制转换?)。至少没有任何内容写在控制台上。可能是什么原因?
全栈跟踪如下:
Exception in thread "null-4" java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processnext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$11$$anon$1.hasnext(WholeStagecodegenExec.scala:619)
at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasnext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasnext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processnext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasnext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStagecodegenExec$$anonfun$11$$anon$1.hasnext(WholeStagecodegenExec.scala:619)
at org.apache.spark.sql.execution.streaming.continuous.shuffle.RPCContinuousShuffleWriter.write(RPCContinuousShuffleWriter.scala:51)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousCoalesceRDD$$anonfun$4$$anon$1.run(ContinuousCoalesceRDD.scala:112)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我尝试删除coalesque(1)
并将Continuous
触发器替换为ProcessingTime
触发器。这使得错误不会发生,但是控制台打印输出变为:
批次:0
+ ----- + ----- + |值|计数| + ----- + ----- + + ----- + ----- +
也就是说,即使确实有许多单词注入到套接字中,也没有输出。另外,此输出仅一次显示,并且比1秒后显示的时间晚得多。