我正在使用结构化流(Spark 2.4.0)通过kafka读取avro消息并使用 融合架构注册中心以接收/读取架构
我无法访问深度嵌套的字段。
模式以压缩的avsc格式如下所示:
{"type":"record","name":"KafkaMessage","namespace":"avro.pojo","fields":[{"name":"context","type":["null",{"type":"record","name":"Context","fields":[{"name":"businessInteractionId","string"]},{"name":"referenceNumber",{"name":"serviceName",{"name":"status",{"name":"sourceSystems",{"type":"array","items":{"type":"record","name":"SourceSystem","fields":[{"name":"orderId",{"name":"revisionNumber",{"name":"systemId","string"]}]}}]},{"name":"sysDate","string"]}]}]}]}
在spark中解析
context
|-- businessInteractionId: string (nullable = true)
|-- referenceNumber: string (nullable = true)
|-- serviceName: string (nullable = true)
|-- sourceSystems: array (nullable = true)
| |-- element: struct (containsnull = true)
| | |-- orderId: string (nullable = true)
| | |-- revisionNumber: string (nullable = true)
| | |-- systemId: string (nullable = true)
|-- status: string (nullable = true)
|-- sysDate: string (nullable = true)
我的方法:将返回的对象强制转换为GenericRecord,并将数组强制转换为GenericData.Array [GenericRecord] Link
代码
val client = new CachedSchemaRegistryClient(schemaRegUrl,100)
val brdDeser = spark.sparkContext.broadcast(new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]])
val results = df.select(col("value").as[Array[Byte]]).map {
rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val deser = brdDeser.value
val decoded = deser.deserialize(topics,rawBytes)
val context_GR =
decoded.get("context").asInstanceOf[GenericRecord]
val c_businessInteractionId =
context_GR.get("businessInteractionId").toString //this works
val c1_sourceSystems =
context_GR
.get("sourceSystems")
.asInstanceOf[GenericData.Array[GenericRecord]]
val c_orderId = c1_sourceSystems.get(0).get("orderId").toString //NullPointerException
val c_revisionNumber = c1_sourceSystems.get(0).get("revisionNumber").toString
val c_systemId = c1_sourceSystems.get(0).get("systemId").toString
new CaseMessage(
c_businessInteractionId,c_orderId,c_revisionNumber,c_systemId )
}
case class CaseMessage(c_businessInteractionId: String,c_orderId: String,c_revisionNumber: String,c_systemId: String,)
每次尝试评估c_orderId时,我都会收到java.lang.NullPointerException