在我的Spark流应用程序中,我试图从Azure EventHub流化数据,然后根据该数据写入hdfs blob中的几个目录。基本上遵循链接multiple writeStream with spark streaming
下面是代码:
def writeStreamer(input: DataFrame,checkPointFolder: String,output: String): StreamingQuery = {
input
.writeStream
.format("com.databricks.spark.avro")
.partitionBy("year","month","day")
.option("checkpointLocation",checkPointFolder)
.option("path",output)
.outputMode(OutputMode.Append)
.start()
}
writeStreamer(dtcFinalDF,"/qmctdl/DTC_CheckPoint","/qmctdl/DTC_DATA")
val query1 = writeStreamer(canFinalDF,"/qmctdl/CAN_CheckPoint","/qmctdl/CAN_DATA")
query1.awaitTermination()
我目前观察到的是,数据已成功写入“ / qmctdl / CAN_DATA目录,但是没有数据被写入” / qmctdl / DTC_DATA。我在这里做错了什么,任何帮助将不胜感激。