位于的代码
var result = deliveryReport.Result;
无限期地挂起。该代码一直工作到本周末为止,然后突然停止了。
这是交付报告在生产者之后的价值。ProduceAsync returns:
deliveryReport = Id = 2,Status = WaitingForactivation,Method = "{null}",Result = "{Not yet computed}"
我正在通过VPN连接到法国的Kafka服务器,但不确定其版本,我确定它可以在Linux VM上运行。
我正在使用
Confluent v 11.5
,
librdkafka.redist v 11.5
,
System.Runtime.CompilerServices.Unsafe v 4.0.0
这是代码块:
foreach (var measureName in measureStreams)
{
String subRequest = null;
Guid requestId = Guid.NewGuid();
if (subscribeSwitch)
subRequest = String.Format(@"{{""username"":""{0}"",""password"":""{1}"",""subscriptionRequestId"":""{2}"",""request"":""{3}"",""measuresStream"":""{4}"",""version"":""{5}""}}",sSUid,sspwd,requestId.ToString(),measureName,subscribeType,version);
else
subRequest = String.Format(@"{{""username"":""{0}"",""measuresStreams"":""{2}""}}",measureName);
string key = i++.ToString();
var deliveryReport = producer.ProduceAsync(topicName,key,subRequest);
_log.DebugFormat("TopicName={0} | Key={1} | Request={2}",topicName,subRequest);
var result = deliveryReport.Result; // synchronously waits for message to be produced.
_log.DebugFormat("Result = {0}",result.Value);
_log.DebugFormat($"> Partition: {result.Partition},Offset: {result.Offset} MeasureStream: {measureName} TopicRequest: {topicName}");
producer.Flush(100);
var partitionOffset = producer.QueryWatermarkOffsets(new TopicPartition(topicName,0),TimeSpan.FromSeconds(10));
}
任何指针将不胜感激。