Confluent 11.5 C#.net Kafka订阅请求挂起

位于的代码 var result = deliveryReport.Result; 无限期地挂起。该代码一直工作到本周末为止,然后突然停止了。

这是交付报告在生产者之后的价值。ProduceAsync returns: deliveryReport = Id = 2,Status = WaitingForactivation,Method = "{null}",Result = "{Not yet computed}"

我正在通过VPN连接到法国的Kafka服务器,但不确定其版本,我确定它可以在Linux VM上运行。

我正在使用

Confluent v 11.5librdkafka.redist v 11.5System.Runtime.CompilerServices.Unsafe v 4.0.0

这是代码块:

            foreach (var measureName in measureStreams)
            {
                String subRequest = null;
                Guid requestId = Guid.NewGuid();
                if (subscribeSwitch)
                    subRequest = String.Format(@"{{""username"":""{0}"",""password"":""{1}"",""subscriptionRequestId"":""{2}"",""request"":""{3}"",""measuresStream"":""{4}"",""version"":""{5}""}}",sSUid,sspwd,requestId.ToString(),measureName,subscribeType,version);
                else
                    subRequest = String.Format(@"{{""username"":""{0}"",""measuresStreams"":""{2}""}}",measureName);
                string key = i++.ToString();
                var deliveryReport = producer.ProduceAsync(topicName,key,subRequest);
                _log.DebugFormat("TopicName={0} | Key={1} | Request={2}",topicName,subRequest);
                var result = deliveryReport.Result; // synchronously waits for message to be produced.
                _log.DebugFormat("Result = {0}",result.Value);
                _log.DebugFormat($"> Partition: {result.Partition},Offset: {result.Offset} MeasureStream: {measureName} TopicRequest: {topicName}");
                producer.Flush(100);
                var partitionOffset = producer.QueryWatermarkOffsets(new TopicPartition(topicName,0),TimeSpan.FromSeconds(10));
            }

任何指针将不胜感激。

wxp1818118 回答:Confluent 11.5 C#.net Kafka订阅请求挂起

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3027017.html

大家都在问