Spark Streaming应用程序上的多个writestream

在我的Spark流应用程序中,我试图从Azure EventHub流化数据,然后根据该数据写入hdfs blob中的几个目录。基本上遵循链接multiple writeStream with spark streaming

下面是代码:

def writeStreamer(input: DataFrame,checkPointFolder: String,output: String): StreamingQuery = {
  input
    .writeStream
    .format("com.databricks.spark.avro")
    .partitionBy("year","month","day")
    .option("checkpointLocation",checkPointFolder)
    .option("path",output)
    .outputMode(OutputMode.Append)
    .start()
}

writeStreamer(dtcFinalDF,"/qmctdl/DTC_CheckPoint","/qmctdl/DTC_DATA")

val query1 = writeStreamer(canFinalDF,"/qmctdl/CAN_CheckPoint","/qmctdl/CAN_DATA")

query1.awaitTermination()

我目前观察到的是,数据已成功写入“ / qmctdl / CAN_DATA目录,但是没有数据被写入” / qmctdl / DTC_DATA。我在这里做错了什么,任何帮助将不胜感激。

GUDUNVHAI 回答:Spark Streaming应用程序上的多个writestream

看看这个答案: Executing separate streaming queries in spark structured streaming

我不了解Azure EventHub,但基本上,我认为一个流正在读取所有数据,而另一个流则不会获得任何数据。

,

您可以尝试

spark.streams.awaitAnyTermination() 

代替

query1.awaittTermination()
本文链接:https://www.f2er.com/3146624.html

大家都在问