我有一个 Azure Eventhub ,它正在流式传输数据(JSON格式)。
我将其读取为Spark数据帧,并使用from_json(col("body"),schema)
解析传入的“ body”,其中schema
是预定义的。在代码中,它看起来像:
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import *
schema = StructType().add(...) # define the incoming JSON schema
df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"),schema)
)
现在=如果传入的JSON模式与定义的模式之间存在不一致(例如,源eventhub开始以新格式发送数据而不会发出通知), {{1} }函数不会抛出错误=,而是将from_json()
放入字段,这些字段在我的NULL
定义中,但在eventhub发送的JSON中不存在。。
我想捕获此信息并将其记录在某处(Spark的log4j,Azure Monitor,警告电子邮件等)。
我的问题是:实现这一目标的最佳方法是什么?
我的一些想法:
-
我首先想到的是拥有一个UDF,它检查
schema
,如果有任何问题,它将引发Exception。我相信不可能通过PySpark将日志发送到log4j,因为“ spark”上下文无法在UDF中(在工作人员上)启动,并且一个人想使用默认值:log4jLogger = sc._jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger('PySpark Logger')
-
我能想到的第二件事是使用“ foreach / foreachBatch”功能并将此检查逻辑放在这里。
但是我觉得这两种方法都像..过于自定义一样-我希望Spark可以为这些目的内置一些东西。