反序列化Avro Spark

我正在使用以下代码利用dummy <- data.frame(categorical_1 = c("a","b","a","a"),categorical_2 = c(rep("one",5),rep("two",5)),numeric = sample(1:10,10)) 将数据流推送到Azure EventHub。此代码每5秒运行一次,并简单地将相同的两个Avro序列化项目??:

microsoft.Hadoop.Avro

这里使用的模式还是简单的:

  var strSchema = File.ReadAllText("schema.json");
  var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
  var rootSchema = avroSerializer.WriterSchema as RecordSchema;

  var itemList = new List<AvroRecord>();

  dynamic record_one = new AvroRecord(rootSchema);
  record_one.FirstName = "Some";
  record_one.LastName = "Guy";
  itemList.Add(record_one);

  dynamic record_two = new AvroRecord(rootSchema);
  record_two.FirstName = "A.";
  record_two.LastName = "Person";
  itemList.Add(record_two);

  using (var buffer = new MemoryStream())
  {
      using (var writer = AvroContainer.CreateGenericWriter(strSchema,buffer,Codec.Null))
      {
          using (var streamWriter = new SequentialWriter<object>(writer,itemList.Count))
          {
              foreach (var item in itemList)
              {
                  streamWriter.Write(item);
              }
          }
      }

      eventHubClient.SendAsync(new EventData(buffer.ToArray()));
  }

我已经通过门户上的Azure Stream Analytics中的简单视图验证了这一切都很好:

反序列化Avro Spark

到目前为止还算不错,但是我不能为我一生正确地在Databricks中使用Scala下的{ "type": "record","name": "User","namespace": "SerDes","fields": [ { "name": "FirstName","type": "string" },{ "name": "LastName","type": "string" } ] } 命令来反序列化。

加载(完全相同)模式作为字符串:

from_avro()

配置EventHub

val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")

读取数据。

val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
  .setEventHubName("<NAME_OF_EVENT_HUB>")
  .build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()

因此,本质上讲,这里正在发生的事情..我正在使用与反序列化相同的模式对数据进行序列化,但是格式不正确。.在这一方面,文档非常少(在microsoft网站上非常少)。

heyongkun 回答:反序列化Avro Spark

问题

经过进一步的调查,(主要是在article的帮助下),我发现了我的问题所在:from_avro(data: Column,jsonFormatSchema: String)希望使用Spark模式格式而不是Avro模式格式。文档对此不是很清楚。

解决方案1 ​​

Databricks提供了一种方便的方法from_avro(column: Column,subject: String,schemaRegistryUrl: String)),可从kafka模式注册表中获取所需的avro模式,并自动将其转换为正确的格式。

不幸的是,它不能用于纯火花,也不能在没有kafka模式注册表的情况下使用。

解决方案2

使用spark提供的架构转换:

// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer {
  override def deserialize(payload: Array[Byte]): String = {
    val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
    genericRecord.toString
  }
}

// create deserializer instance
val deserializer = new AvroDeserializer()

// register deserializer
spark.udf.register("deserialize_avro",(bytes: Array[Byte]) =>
  deserializer.deserialize(bytes)
)

// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl,128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

// consume data 
df.selectExpr("deserialize_avro(value) as data")
  .select(from_json(col("data"),sparkSchema.dataType).as("data"))
  .select("data.*")
本文链接:https://www.f2er.com/3143062.html

大家都在问