我正在使用 KsqlDb
表格,表格如下:
KSQL-DB 查询create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');
C# 模型
public class Currency
{
public int Id{get;set;}
public string Name{get;set;}
}
现在我想知道我应该如何使用 Confluent 库在 C# 中写入/读取此主题的数据:
写作
IProducer<int,Currency> producer=....
Currency cur=new Currency();
Message<int,Currency> message = new Message<int,Currency>
{
Key = msg.Id,Timestamp = new Timestamp(DateTime.UtcNow,TimestampType.CreateTime),Value = msg
};
DeliveryResult<int,Currency> delivery = await this.producer.ProduceAsync(topic,message);
阅读
IConsumer<int,Currency> iconsumer = new ConsumerBuilder<int,Currency>(config)
.SetKeyDeserializer(Deserializers.Int32) //i assume i need to use the id from my dto
.SetvalueDeserializer(...) //what deserializer
.Build();
ConsumeResult<int,Currency> result = consumer.Consume();
Currency message = // what deserializer JsonSerializer.Deserialize<Currency>(result.Message.Value);
我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库 AvroSerializer ,但我不知道作者在哪里获取 schema
。
有关如何读取/写入与我的 ksqldb
模型匹配的特定主题的任何帮助?
更新
经过一些研究和一些答案后,我开始使用 schemaRegistry
var config = new ConsumerConfig
{
GroupId = kafkaConfig.ConsumerGroup,BootstrapServers = kafkaConfig.ServerUrl,AutoOffsetReset = AutoOffsetReset.Earliest
};
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = kafkaConfig.SchemaRegistryUrl
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
IConsumer<int,Currency> consumer = new ConsumerBuilder<int,Currency>(config)
.SetKeyDeserializer(new AvroDeserializer<int>(schemaRegistry).AsSyncOverAsync())
.SetvalueDeserializer(new AvroDeserializer<Currency>(schemaRegistry).AsSyncOverAsync())
.Build();
ConsumeResult<int,Currency> result = consumer.Consume();
现在我收到另一个错误:
期望数据帧长度为 5 字节或更多但总数据大小 是 4 个字节
正如有人善意指出的那样,我似乎只从架构注册表中检索了 id。
我怎样才能:insert into currency (id,name) values (1,3)
并在 C# 中将其作为 POCO(如上所列)检索?
更新 2
在我找到这个 source 程序后,由于某种原因,我似乎无法将消息发布到表中。
发送消息没有报错但是没有发布到Kafka。