如果我们重新启动Receiver,应用程序现在会收到数据包,但它不是解决方案.我在Sender& ;;中尝试了各种ZMQ_RATE.接收方.
问题:
发送者使用以下套接字选项发送近300,000个数据包,但接收器卡在&之间.没有收到所有的数据包.如果我们添加Sleep(2) – 在每次发送中等待2 ms,有时我们会收到所有数据包,但它需要更多时间.
环境设置:
(发送器和接收器使用D-Link交换机在单个子网内连接.介质速度为1Gbps)
Sender: JZMQ ( ZMQ C library,openPGM ) ZMQ_RATE - 30Mbps ( Megabits per second ) Packet size - 1024 bytes ZMQ_RECOVERY_IVL - 2 Minutes Send Flag - 0 ( blocking mode ) Sleep( 2ms ) - sometimes its working without any issue but taking more time for transfer. Platform - Windows Receiver: ZMQ C++ ( ZMQ C library,openPGM ) ZMQ_RATE - 30Mbps ( Megabits per second ) ZMQ_RCVTIMEO - 3 Secs receive Flag - 0 ( blocking mode ) Platform - Windows
可能是什么问题?
ZeroMQ PGM-multicast是不是一个稳定的库?
JZMQ Sender: ZMQ.Context context = ZMQ.context(1); ZMQ.Socket socket = context.socket(ZMQ.PUB); socket.setRate(80000); socket.setRecoveryInterval(60*60); socket.setSendTimeOut(-1); socket.setSendBufferSize(1024*64); socket.bind("pgm://local_IP;239.255.0.20:30001"); byte[] bytesToSend = new byte[1024]; int count = 0; while(count < 300000) { socket.send(bytesToSend,0); count++; } ------------------------------------------------ // ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include <stdio.h> #include "zmq.hpp" int main(int argc,char* argv[]) { try { zmq::context_t context(1); // Socket to talk to server printf ("Connecting to server..."); zmq::socket_t *s1 = new zmq::socket_t(context,ZMQ_SUB); int recvTimeout = 3000; s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int)); int recvRate = 80000; s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int)); int recsec = 60 * 60; // s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec)); s1->connect("pgm://local_IP;239.255.0.20:30001"); s1->setsockopt (ZMQ_SUBSCRIBE,NULL,0); printf ("done. \n"); int seq=0; while(true) { zmq::message_t msgbuff; int ret = s1->recv(&msgbuff,0); if(!ret) { printf ("Received not received timeout\n"); continue; } printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size()); ++seq; } } catch( zmq::error_t &e ) { printf ("An error occurred: %s\n",e.what()); return 1; } catch( std::exception &e ) { printf ("An error occurred: %s\n",e.what()); return 1; } return 0; }@H_403_4@
解决方法
这不是一个好习惯.我敢于指责图书馆维护人员在发布图书馆之前没有彻底测试过PGM / EPGM,或者在应用程序设计得到充分理解,设计稳健,诊断良好之前随时在开发中做不好的工作.性能/延迟测试在实际部署生态系统的现实检查,通常由{localhost |家庭子网|远程网络| remote-host(s)}.
[PUB] – 发送部分需要得到应有的注意:
如果不出意外,文档的这一部分就是警告并响起所有的钟声和响声.吹嘘所有口哨,如果在一些模拟SLOC中进行不充分的资源管理,而对于非阻塞,超快速循环的残酷尝试确实存在应有的谨慎:
ØMQ does not guarantee that the socket will accept as many as
ZMQ_SNDHWM
messages,and the actual limit may be as much as 60-70% lower depending on the flow of messages on the socket.
所以,可能是对的,你的[PUB] -sender丢失了丢失的消息,然后再将这些消息发送到线路上.
下一个警告来自O / S权限:
The
pgm
transport implementation requires access to raw IP sockets. Additional privileges may be required on some operating systems for this operation. Applications not requiring direct interoperability with other PGM implementations are encouraged to use theepgm
transport instead which does not require any special privileges.
接下来是[SUB] -receiver:
一些更多的调整将有助于嗅探[PUB] -sender,类似于下面提出的[SUB] -receiver的内联状态/跟踪工具:
------------------------------------------------ // ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application. // MODs: https://stackoverflow.com/q/44526517/3666197 #include "stdafx.h" #include <stdio.h> #include "zmq.hpp" #include <chrono> // since C++ 11 typedef std::chrono::high_resolution_clock nanoCLK; #define ZMQ_IO_THREAD_POOL_SIZE 8 #define ZMQ_AFINITY_PLAIN_ROUNDROBIN_UNMANAGED_RISKY 0 #define ZMQ_AFINITY_LO_PRIO_POOL 0 | 1 #define ZMQ_AFINITY_HI_PRIO_POOL 0 | 0 | 2 #define ZMQ_AFINITY_MC_EPGM_POOL 0 | 0 | 0 | 4 | 8 | 0 | 0 | 64 | 128 int main( int argc,char* argv[] ) { auto RECV_start = nanoCLK::now(); auto RECV_ret = nanoCLK::now(); auto RECV_last = nanoCLK::now(); auto TEST_start = nanoCLK::now(); try { zmq::context_t context( ZMQ_IO_THREAD_POOL_SIZE ); printf ( "Connecting to server..." ); int major,minor,patch; zmq::version( &major,&minor,&patch ); printf ( "Using ZeroMQ( %d.%d.%d )",major,patch ); zmq::socket_t *s1 = new zmq::socket_t( context,ZMQ_SUB ); // Socket to talk to server int zmqLinger = 0,// [ ms] zmqAffinity = 0,// [ #] mapper bitmap-onto-IO-thread-Pool (ref. #define-s above ) recvBuffer = 2 * 123456,// [ B] recvMaxSize = 9876,// [ B] recvHwMark = 123456,// [ #] max number of MSGs allowed to be Queued per connected Peer recvRate = 80000 * 10,// [kbps] recvTimeout = 3000,// [ ms] before ret EAGAIN { 0: NO_BLOCK | -1: INF | N: wait [ms] } recoverMSEC = 60 * 60 // [ ms] ; s1->setsockopt ( ZMQ_AFFINITY,&zmqAffinity,sizeof(int) ); s1->setsockopt ( ZMQ_LINGER,&zmqLinger,sizeof(int) ); s1->setsockopt ( ZMQ_MAXMSGSIZE,&recvMaxSize,sizeof(int) ); s1->setsockopt ( ZMQ_RCVBUF,&recvBuffer,sizeof(int) ); s1->setsockopt ( ZMQ_RCVHWM,&recvHwMark,sizeof(int) ); s1->setsockopt ( ZMQ_RCVTIMEO,sizeof(int) ); s1->setsockopt ( ZMQ_RATE,sizeof(int) ); // s1->setsockopt ( ZMQ_RECOVERY_IVL,&recoverMSEC,sizeof(int) ); s1->connect ( "pgm://local_IP;239.255.0.20:30001" ); s1->setsockopt ( ZMQ_SUBSCRIBE,0 ); printf ( "done. \n" ); int seq = 0; while( true ) { zmq::message_t msgbuff; RECV_start = nanoCLK::now(); RECV_last = RECV_ret; int ret = s1->recv( &msgbuff,0 ); RECV_ret = nanoCLK::now(); if ( !ret ) printf ( "[T0+ %14d [ns]]: [SUB] did not receive any message within set timeout(%d). RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n",std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(),recvTimeout,ret,std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(),std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() ); else printf ( "[T0+ %14d [ns]]: [SUB] did now receive a message SEQ#(%6d.) DATA[%6d] B. RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n",++seq,msgbuff.size(),std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() ); } } catch( zmq::error_t &e ) { printf ( "[T0+ %14d [ns]]: [EXC.ZMQ] An error occurred: %s\nWill RET(1)",e.what() ); return 1; } catch( std::exception &e ) { printf ( "[T0+ %14d [ns]]: [EXC.std] An error occurred: %s\nWill RET(1)",e.what() ); return 1; } return 0; }@H_403_4@ @H_403_4@