如何使用融合平台从 ksql avro 格式序列化/反序列化到 c#

我正在使用 KsqlDb 表格,表格如下:

KSQL-DB 查询
create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');

C# 模型

public class Currency
{
    public int Id{get;set;}
    public string Name{get;set;}
}

现在我想知道我应该如何使用 Confluent 库在 C# 中写入/读取此主题的数据:

写作

 IProducer<int,Currency> producer=....

 Currency cur=new Currency();

 Message<int,Currency> message = new Message<int,Currency>
            {
                Key = msg.Id,Timestamp = new Timestamp(DateTime.UtcNow,TimestampType.CreateTime),Value = msg
            };
 DeliveryResult<int,Currency> delivery =  await this.producer.ProduceAsync(topic,message);

阅读

IConsumer<int,Currency> iconsumer = new ConsumerBuilder<int,Currency>(config)
                .SetKeyDeserializer(Deserializers.Int32) //i assume i need to use the id from my dto
                .SetvalueDeserializer(...) //what deserializer
                .Build();

ConsumeResult<int,Currency> result = consumer.Consume();

Currency message =  // what deserializer JsonSerializer.Deserialize<Currency>(result.Message.Value);

我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库 AvroSerializer ,但我不知道作者在哪里获取 schema

有关如何读取/写入与我的 ksqldb 模型匹配的特定主题的任何帮助?

更新

经过一些研究和一些答案后,我开始使用 schemaRegistry

var config = new ConsumerConfig
            {
                GroupId = kafkaConfig.ConsumerGroup,BootstrapServers = kafkaConfig.ServerUrl,AutoOffsetReset = AutoOffsetReset.Earliest
            };
var schemaRegistryConfig = new SchemaRegistryConfig
            {
                Url = kafkaConfig.SchemaRegistryUrl
            };
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

IConsumer<int,Currency> consumer = new ConsumerBuilder<int,Currency>(config)
            .SetKeyDeserializer(new AvroDeserializer<int>(schemaRegistry).AsSyncOverAsync())
            .SetvalueDeserializer(new AvroDeserializer<Currency>(schemaRegistry).AsSyncOverAsync())
            .Build();

ConsumeResult<int,Currency> result = consumer.Consume();

现在我收到另一个错误:

期望数据帧长度为 5 字节或更多但总数据大小 是 4 个字节

正如有人善意指出的那样,我似乎只从架构注册表中检索了 id。

我怎样才能:insert into currency (id,name) values (1,3) 并在 C# 中将其作为 POCO(如上所列)检索?

更新 2

在我找到这个 source 程序后,由于某种原因,我似乎无法将消息发布到表中。

发送消息没有报错但是没有发布到Kafka。

wcnm2132 回答:如何使用融合平台从 ksql avro 格式序列化/反序列化到 c#

我找到了这个库 AvroSerializer ,但我不知道作者从哪里获取架构。

不清楚为什么需要使用 Confluent 以外的库,但他们知道 from the Schema Registry。您可以使用 CachedSchemaRegistryClient 轻松获取架构字符串,但是您不应该在代码中使用它,因为解串器会自行从注册表下载。

如果您参考 examples/ in the confluent-kafka-dotnet repo for Specific Avro consumption,您可以看到它们从 User 文件生成 User.avsc 类,这似乎正是您想要在这里为 {{1} } 而不是自己写

,

我通过定义我的自定义序列化器解决了这个问题,从而实现了 ISerializer<T>IDeserializer<T> 接口,它们在它们的肚子里只是 System.Text.Json.JsonSerializerNewtonsoftJson 的包装器。

序列化器

public class MySerializer:ISerializer<T>
{
     byte[] Serialize(T data,SerializationContext context)
     {
          var str=System.Text.Json.JsonSerializer.Serialize(data); //you can also use Newtonsoft here
          var bytes=Encoding.UTF8.GetBytes(str);
          return bytes;
     }
}

使用

   var config = new ConsumerConfig
                {
                    GroupId = kafkaConfig.ConsumerGroup,BootstrapServers = kafkaConfig.ServerUrl,AutoOffsetReset = AutoOffsetReset.Earliest
                };
    IConsumer<int,Currency> consumer = new ConsumerBuilder<int,Currency>(config)
            .SetValueDeserializer(new MySerializer<Currency>())
            .Build();

ConsumeResult<int,Currency> result = consumer.Consume();

附注

在实现接口后,我什至没有在这里使用架构注册表

本文链接:https://www.f2er.com/17583.html

大家都在问