我正在使用以下代码利用dummy <- data.frame(categorical_1 = c("a","b","a","a"),categorical_2 = c(rep("one",5),rep("two",5)),numeric = sample(1:10,10))
将数据流推送到Azure EventHub。此代码每5秒运行一次,并简单地将相同的两个Avro序列化项目??:
microsoft.Hadoop.Avro
这里使用的模式还是简单的:
var strSchema = File.ReadAllText("schema.json");
var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
var rootSchema = avroSerializer.WriterSchema as RecordSchema;
var itemList = new List<AvroRecord>();
dynamic record_one = new AvroRecord(rootSchema);
record_one.FirstName = "Some";
record_one.LastName = "Guy";
itemList.Add(record_one);
dynamic record_two = new AvroRecord(rootSchema);
record_two.FirstName = "A.";
record_two.LastName = "Person";
itemList.Add(record_two);
using (var buffer = new MemoryStream())
{
using (var writer = AvroContainer.CreateGenericWriter(strSchema,buffer,Codec.Null))
{
using (var streamWriter = new SequentialWriter<object>(writer,itemList.Count))
{
foreach (var item in itemList)
{
streamWriter.Write(item);
}
}
}
eventHubClient.SendAsync(new EventData(buffer.ToArray()));
}
我已经通过门户上的Azure Stream Analytics中的简单视图验证了这一切都很好:
到目前为止还算不错,但是我不能为我一生正确地在Databricks中使用Scala下的{
"type": "record","name": "User","namespace": "SerDes","fields": [
{
"name": "FirstName","type": "string"
},{
"name": "LastName","type": "string"
}
]
}
命令来反序列化。
加载(完全相同)模式作为字符串:
from_avro()
配置EventHub
val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")
读取数据。
val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
.setEventHubName("<NAME_OF_EVENT_HUB>")
.build
val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()
因此,本质上讲,这里正在发生的事情..我正在使用与反序列化相同的模式对数据进行序列化,但是格式不正确。.在这一方面,文档非常少(在microsoft网站上非常少)。