ACE之Reactor模式使用实例

前端之家收集整理的这篇文章主要介绍了ACE之Reactor模式使用实例前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
  1. // ACE_Reactor_Client.cpp : 定义控制台应用程序的入口点。
  2. //
  3.  
  4. #include "stdafx.h"
  5.  
  6.  
  7. #include "ace/Reactor.h"
  8. #include "ace/SOCK_Connector.h"
  9. #include "ace/OS.h"
  10. #include "ace/Log_Msg.h"
  11. #include <string>
  12. #include <iostream>
  13. using namespace std;
  14.  
  15. class MyClient:public ACE_Event_Handler
  16. {
  17. public:
  18. bool do_connect(string ip,int port,int local_port)
  19. {
  20. ACE_SOCK_Connector connector;
  21. ACE_INET_Addr local_addr(local_port,"0.0.0.0");
  22.  
  23. ACE_INET_Addr addr(port,ip.c_str());
  24. ACE_Time_Value timeout(5,0);
  25. if(connector.connect(peer_sock,addr,&timeout,local_addr) != 0)
  26. {
  27. cout<<"connect fail."<<endl;
  28. return false;
  29. }
  30. int ret = ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
  31. if (ret != 0)
  32. {
  33. cout<<"local_port:"<<local_port<<" register_handler fail."<<endl;
  34. return false;
  35. }
  36. sprintf(buf,"%d",local_port);
  37. peer_sock.send(buf,strlen(buf)+1);
  38. return true;
  39. }
  40.  
  41. ACE_HANDLE get_handle(void) const
  42. {
  43. return peer_sock.get_handle();
  44. }
  45.  
  46. int handle_input (ACE_HANDLE fd)
  47. {
  48. int rev=0;
  49. ACE_Time_Value timeout(5,0);
  50. if((rev=peer_sock.recv(buf,sizeof(buf),&timeout))>0)
  51. {
  52. buf[rev]='\0';
  53. cout<<"recv: "<<buf<<endl;
  54. }
  55. ACE_INET_Addr raddr;
  56. peer_sock.get_local_addr(raddr);
  57. //ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( " (%P|%t) close:%s %d\n " ),raddr.get_host_addr(),raddr.get_port_number()));
  58. sprintf(buf,raddr.get_port_number());
  59. peer_sock.send(buf,strlen(buf)+1);
  60. return 0;
  61. }
  62.  
  63. private:
  64. ACE_SOCK_Stream peer_sock;
  65. char buf[100];
  66. };
  67.  
  68. #include <ace/OS.h>
  69. #include <ace/Task.h>
  70.  
  71. class TTcpNetThread : public ACE_Task_Base
  72. {
  73. public:
  74. /// 运行
  75. int open();
  76.  
  77. /// 停止运行
  78. int close();
  79. protected:
  80. /// 线程函数
  81. virtual int svc();
  82. };
  83.  
  84. int TTcpNetThread::open() { return this->activate(); }
  85.  
  86. int TTcpNetThread::close()
  87. {
  88. ACE_Reactor::instance()->end_reactor_event_loop(); // 终止ACE_Proactor循环
  89.  
  90. this->wait(); // 等待清理现场
  91.  
  92. return 0;
  93. }
  94.  
  95. int TTcpNetThread::svc()
  96. {
  97. // Proactor的事件循环开始
  98. while(!ACE_Reactor::instance()->event_loop_done())
  99. {
  100. ACE_Reactor::instance()->handle_events();
  101. }
  102.  
  103. ACE_DEBUG((LM_DEBUG,ACE_TEXT("Network fin\n")));
  104.  
  105. return 0;
  106. }
  107.  
  108. /**********************************************************************************************
  109. 在Socket编程中,常见的事件就是"读就绪","写就绪",通过对这两个事件的捕获分发,可以实现Socket中的异步操作。
  110.  
  111. Socket编程中的事件处理器
  112.  
  113. 在前面我们已经介绍过,在ACE反应器框架中,任何都必须派生自ACE_Event_Handler类,并通过重载其相应会调事件处理函数来实现相应的回调处理的。在Socket编程中,我们通常需要重载的函数
  114.  
  115. 1.handle_input()
  116. 当I/O句柄(比如UNIX中的文件描述符)上的输入可用时,反应器自动回调该方法
  117.  
  118. 2.handle_output()
  119. 当I/O设备的输出队列有可用空间时,反应器自动回调该方法
  120.  
  121. 3.handle_close()
  122. 当事件处理器中的事件从Reactor中移除的时候调用
  123.  
  124. 此外,为了使Reactor能通过I/O句柄找到对应的事件处理器,还必须重载其get_handle()方法以使得Reactor建立起I/O句柄和事件处理器的关联。
  125. ***********************************************************************************************/
  126. #pragma comment(lib,"ACEd.lib")
  127.  
  128. #define CLIENT_THREAD_NUM 4
  129.  
  130. int main(int argc,char *argv[])
  131. {
  132. for (int i=0;i<2000;i++)
  133. {
  134. MyClient *client = new MyClient;
  135. if (!client->do_connect("127.0.0.1",4567,10000+i))
  136. break;
  137. }
  138.  
  139. system("pause");
  140. TTcpNetThread netThread[CLIENT_THREAD_NUM];
  141.  
  142. for(int i = 0; i < CLIENT_THREAD_NUM; i++)
  143. {
  144. netThread[i].open();
  145. }
  146. while (getchar())
  147. {
  148. ACE_OS::sleep(1);
  149. }
  150.  
  151. /*while(true)
  152. {
  153. ACE_Reactor::instance()->handle_events();
  154. }*/
  155.  
  156. return 0;
  157. }

  1. // ACE_Reactor_Server.cpp : 定义控制台应用程序的入口点。
  2. //
  3.  
  4. #include "stdafx.h"
  5.  
  6. #include <ace/Reactor.h>
  7. #include <ace/SOCK_Connector.h>
  8. #include <ace/SOCK_Acceptor.h>
  9. #include <ace/Auto_Ptr.h>
  10. #include "ace/OS.h"
  11. #include "ace/Log_Msg.h"
  12. #include <list>
  13. #pragma comment(lib,"ACEd.lib")
  14.  
  15. class ClientService : public ACE_Event_Handler
  16. {
  17. public:
  18. ACE_SOCK_Stream &peer (void) { return this->sock_; }
  19.  
  20. int regist_this(void)
  21. {
  22. //注册读就绪回调函数
  23. return this->reactor ()->register_handler(this,ACE_Event_Handler::READ_MASK);
  24. }
  25.  
  26. virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); }
  27.  
  28. virtual int handle_input (ACE_HANDLE fd )
  29. {
  30. int rev = peer().recv(buf,sizeof(buf));
  31. if(rev<=0)
  32. return -1;
  33. buf[rev] = '\0';
  34. printf("recv:%s",buf);
  35.  
  36. return 0;
  37. }
  38.  
  39. // 释放相应资源
  40. virtual int handle_close (ACE_HANDLE,ACE_Reactor_Mask mask)
  41. {
  42. if (mask == ACE_Event_Handler::WRITE_MASK)
  43. return 0;
  44. mask = ACE_Event_Handler::ALL_EVENTS_MASK |
  45. ACE_Event_Handler::DONT_CALL;
  46. this->reactor ()->remove_handler (this,mask);
  47. this->sock_.close ();
  48. delete this; //socket出错时,将自动删除该客户端,释放相应资源
  49. return 0;
  50. }
  51.  
  52. protected:
  53. char buf[100];
  54. ACE_SOCK_Stream sock_;
  55. };
  56.  
  57.  
  58. class ClientAcceptor : public ACE_Event_Handler
  59. {
  60. public:
  61. virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE,0);}
  62.  
  63. int start_listen (const ACE_INET_Addr &listen_addr)
  64. {
  65. if (this->acceptor_.open (listen_addr,1) == -1)
  66. {
  67. printf("open port fail\n");
  68. return -1;
  69. }
  70. //注册接受连接回调事件
  71. return this->reactor ()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
  72. }
  73.  
  74. virtual ACE_HANDLE get_handle (void) const
  75. { return this->acceptor_.get_handle (); }
  76.  
  77. virtual int handle_input (ACE_HANDLE fd )
  78. {
  79. ClientService *client = new ClientService();
  80. auto_ptr<ClientService> p (client);
  81.  
  82. if (this->acceptor_.accept (client->peer ()) == -1)
  83. {
  84. printf("accept client fail\n");
  85. return -1;
  86. }
  87. p.release ();
  88. client->reactor (this->reactor ());
  89. if (client->regist_this () == -1)
  90. client->handle_close (ACE_INVALID_HANDLE,0);
  91. return 0;
  92. }
  93.  
  94. virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask)
  95. {
  96. if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
  97. {
  98. ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
  99. ACE_Event_Handler::DONT_CALL;
  100. this->reactor ()->remove_handler (this,m);
  101. this->acceptor_.close ();
  102. }
  103. return 0;
  104. }
  105.  
  106. protected:
  107. ACE_SOCK_Acceptor acceptor_;
  108. };
  109.  
  110.  
  111.  
  112. int main1(int argc,char *argv[])
  113. {
  114. ACE_INET_Addr addr(4567,"127.0.0.1");
  115. ClientAcceptor server;
  116. server.reactor(ACE_Reactor::instance());
  117. server.start_listen(addr);
  118.  
  119. while(true)
  120. {
  121. ACE_Reactor::instance()->handle_events();
  122. }
  123.  
  124. return 0;
  125. }
  126.  
  127. ////////////////////////////////////////////////
  128. #define MAX_BUFF_SIZE 1024
  129. #define LISTEN_PORT 4567
  130. #define SERVER_IP ACE_LOCALHOST
  131.  
  132. class ClientHandler : public ACE_Event_Handler
  133. {
  134. public:
  135. friend class ServerAcceptor;
  136. public:
  137. ClientHandler(){}
  138. ~ClientHandler()
  139. {
  140. sock_stream.close();
  141. ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
  142. }
  143. int send_some(const void *buff,int bytes)
  144. {
  145. return sock_stream.send(buff,bytes);
  146. }
  147. ACE_SOCK_Stream& GetStream(){return sock_stream;} //给accept提供接口绑定数据通道
  148. public:
  149. virtual int handle_input(ACE_HANDLE fd); //I/O触发事件后调用
  150. virtual ACE_HANDLE get_handle(void) const {return sock_stream.get_handle();} //不重载需要手动将handle传入ACE_Reactor
  151. private:
  152. ACE_INET_Addr Cli_addr;
  153. ACE_SOCK_Stream sock_stream;
  154. };
  155.  
  156. int ClientHandler::handle_input(ACE_HANDLE fd)
  157. {
  158. char strBuffer[MAX_BUFF_SIZE];
  159. int byte = sock_stream.recv(strBuffer,MAX_BUFF_SIZE); //可读数据
  160. if (-1 == byte)
  161. {
  162. ACE_DEBUG((LM_INFO,ACE_TEXT("receive data Failed\n")));
  163. }
  164. else if(0 == byte)
  165. {
  166. sock_stream.close();
  167. ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
  168. ACE_DEBUG((LM_INFO,ACE_TEXT("client closed!\n")));
  169. }
  170. else
  171. {
  172. ACE_DEBUG((LM_INFO,ACE_TEXT("receive:%s\n"),strBuffer));
  173. sock_stream.send(strBuffer,strlen(strBuffer)+1);
  174. }
  175. return 0;
  176. }
  177.  
  178. // ServerAcceptor
  179. class ServerAcceptor : public ACE_Event_Handler
  180. {
  181. public:
  182. ServerAcceptor(int port,char* ip);
  183. ~ServerAcceptor();
  184. virtual int handle_input(ACE_HANDLE fd); // ACE框架回调
  185. virtual ACE_HANDLE get_handle(void) const {return Svr_aceept.get_handle();}
  186. private:
  187. ACE_INET_Addr Svr_addr;
  188. ACE_SOCK_Acceptor Svr_aceept;
  189. std::list<ClientHandler*> m_streamPool; //stream pool
  190. };
  191.  
  192. ServerAcceptor::ServerAcceptor(int port,char* ip):Svr_addr(port,ip)
  193. {
  194. if (-1 == Svr_aceept.open(Svr_addr,1))
  195. {
  196. ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open Failed\n")));
  197. Svr_aceept.close();
  198. }
  199. ACE_DEBUG((LM_ERROR,ACE_TEXT("accept open success\n")));
  200. }
  201.  
  202. ServerAcceptor::~ServerAcceptor()
  203. {
  204. ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
  205. Svr_aceept.close();
  206.  
  207. std::list<ClientHandler*>::iterator it;
  208. for (it = m_streamPool.begin();it != m_streamPool.end();++it)
  209. {
  210. if (NULL != (*it))
  211. {
  212. delete (*it);
  213. }
  214. }
  215. }
  216. #include "ace/SOCK_SEQPACK_Association.h"
  217. int ServerAcceptor::handle_input(ACE_HANDLE fd )
  218. {
  219. ClientHandler *stream = new ClientHandler(); //产生新通道
  220. if (NULL != stream)
  221. {
  222. m_streamPool.push_back(stream);
  223. }
  224. if (Svr_aceept.accept(stream->GetStream()) == -1) //绑定通道
  225. {
  226. printf("accept client fail\n");
  227. return -1;
  228. }
  229. ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注册到ACE_Reactor
  230.  
  231. ACE_INET_Addr raddr;
  232. stream->GetStream().get_remote_addr(raddr);
  233. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ( "client:%s %d\n" ),raddr.get_port_number()));
  234.  
  235. /*ACE_INET_Addr addr;
  236. ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(fd);
  237. size_t addr_size=sizeof ACE_INET_Addr;
  238. ass.get_remote_addrs(&addr,addr_size);
  239. ACE_OS::printf("fd:%d ip:%d port:%d\n",(int)fd,addr.get_ip_address(),addr.get_port_number());*/
  240. //ACE_DEBUG((LM_ERROR,ACE_TEXT("User connect success!\n")));
  241. return 0;
  242. }
  243.  
  244. #include <ace/OS.h>
  245. #include <ace/Task.h>
  246.  
  247. class TTcpNetThread : public ACE_Task_Base
  248. {
  249. public:
  250. /// 运行
  251. int open();
  252.  
  253. /// 停止运行
  254. int close();
  255. protected:
  256. /// 线程函数
  257. virtual int svc();
  258. };
  259.  
  260. int TTcpNetThread::open() { return this->activate(); }
  261.  
  262. int TTcpNetThread::close()
  263. {
  264. ACE_Reactor::instance()->end_reactor_event_loop(); // 终止ACE_Proactor循环
  265.  
  266. this->wait(); // 等待清理现场
  267.  
  268. return 0;
  269. }
  270.  
  271. int TTcpNetThread::svc()
  272. {
  273. ACE_Reactor::instance()->run_reactor_event_loop();
  274.  
  275. ACE_DEBUG((LM_DEBUG,ACE_TEXT("Network fin\n")));
  276.  
  277. return 0;
  278. }
  279.  
  280. #define CLIENT_THREAD_NUM 4
  281.  
  282. int main(int argc,char *argv[])
  283. {
  284. ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
  285.  
  286. //listen port注册到ACE_Reactor
  287. ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);
  288.  
  289. TTcpNetThread netThread[CLIENT_THREAD_NUM];
  290.  
  291. for(int i = 0; i < CLIENT_THREAD_NUM; i++)
  292. {
  293. netThread[i].open();
  294. }
  295. while (getchar())
  296. {
  297. ACE_OS::sleep(1);
  298. }
  299.  
  300. //进入消息循环,有I/O事件回调handle_input
  301. //ACE_Reactor::instance()->run_reactor_event_loop();
  302.  
  303. return 0;
  304. }

猜你在找的React相关文章