如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

我有一个 Azure Eventhub ,它正在流式传输数据(JSON格式)。 我将其读取为Spark数据帧,并使用from_json(col("body"),schema)解析传入的“ body”,其中schema是预定义的。在代码中,它看起来像:

from pyspark.sql.functions import col,from_json
from pyspark.sql.types import *

schema = StructType().add(...) # define the incoming JSON schema 

df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"),schema)
)

现在=如果传入的JSON模式与定义的模式之间存在不一致(例如,源eventhub开始以新格式发送数据而不会发出通知), {{1} }函数不会抛出错误=,而是将from_json()放入字段,这些字段在我的NULL定义中,但在eventhub发送的JSON中不存在。

我想捕获此信息并将其记录在某处(Spark的log4j,Azure Monitor,警告电子邮件等)。

我的问题是:实现这一目标的最佳方法是什么?

我的一些想法:

  1. 我首先想到的是拥有一个UDF,它检查schema,如果有任何问题,它将引发Exception。我相信不可能通过PySpark将日志发送到log4j,因为“ spark”上下文无法在UDF中(在工作人员上)启动,并且一个人想使用默认值:

    log4jLogger = sc._jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger('PySpark Logger')

  2. 我能想到的第二件事是使用“ foreach / foreachBatch”功能并将此检查逻辑放在这里。

但是我觉得这两种方法都像..过于自定义一样-我希望Spark可以为这些目的内置一些东西。

yunhaigufan 回答:如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

tl; dr 您必须自己使用foreachBatchcolumnNameOfCorruptRecord运算符来执行此检查逻辑。


事实证明,我误以为case _: BadRecordException => null 选项可能是一个答案。它不起作用。

首先,由于this,该按钮无法正常工作:

PERMISSIVE

其次,由于this仅禁用了任何其他解析模式(包括似乎与columnNameOfCorruptRecord选项一起使用的new JSONOptions(options + ("mode" -> FailFastMode.name),timeZoneId.get)) ):

foreach

换句话说,您唯一的选择是使用列表中的第二项,即foreachBatchfrom_json并自己处理损坏的记录。

解决方案可以使用body,同时保留初始null列。 JSON错误的任何记录都将以结果列foreach*def handleCorruptRecords: // if json == null the body was corrupt // handle it df_stream_input = (spark .readStream .format("eventhubs") .options(**ehConfInput) .load() .select("body",from_json(col("body").cast("string"),schema).as("json")) ).foreach(handleCorruptRecords).start() 结束,例如

{{1}}
本文链接:https://www.f2er.com/3129286.html

大家都在问