对ACE反应器Reactor模式的示例程序分析

前端之家收集整理的这篇文章主要介绍了对ACE反应器Reactor模式的示例程序分析前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

在ACE 6.0.0 里提供了一个Reactor示例程序,对其分析一下(在代码的注释里)。

Reactors_Test.cpp文件(在 \ACE-6.0.0\ACE_wrappers\tests\ 目录下):

  1. // $Id: Reactors_Test.cpp 91671 2010-09-08 18:39:23Z johnnyw $
  2.  
  3. // ============================================================================
  4. //
  5. // = LIBRARY
  6. // tests
  7. //
  8. // = FILENAME
  9. // Reactors_Test.cpp
  10. //
  11. // = DESCRIPTION
  12. // This is a test that performs a torture test of multiple
  13. // <ACE_Reactors> and <ACE_Tasks> in the same process.
  14. //
  15. // = AUTHOR
  16. // Prashant Jain <pjain@cs.wustl.edu>,// Detlef Becker <Detlef.Becker@med.siemens.de>,and
  17. // Douglas C. Schmidt <schmidt@cs.wustl.edu>
  18. //
  19. // ============================================================================
  20.  
  21. #include "test_config.h"
  22. #include "ace/Task.h"
  23. #include "ace/Reactor.h"
  24. #include "ace/Atomic_Op.h"
  25. #include "ace/Recursive_Thread_Mutex.h"
  26.  
  27.  
  28.  
  29. #if defined (ACE_HAS_THREADS)
  30.  
  31. ACE_Thread_Manager *thr_mgr;
  32.  
  33. static const int MAX_TASKS = 20;
  34.  
  35. class Test_Task : public ACE_Task<ACE_MT_SYNCH>
  36. // = TITLE
  37. // Exercise the tasks.
  38. {
  39. public:
  40. // = Initialization and termination methods.
  41. Test_Task (void);
  42. ~Test_Task (void);
  43.  
  44. //FUZZ: disable check_for_lack_ACE_OS
  45. // = Task hooks.
  46. virtual int open (void *args = 0);
  47. virtual int close (u_long flags = 0);
  48. virtual int svc (void);
  49. //FUZZ: enable check_for_lack_ACE_OS
  50.  
  51. // = Event Handler hooks.
  52. virtual int handle_input (ACE_HANDLE handle);
  53. virtual int handle_close (ACE_HANDLE fd,ACE_Reactor_Mask close_mask);
  54. private:
  55. size_t handled_;
  56. // Number of iterations handled.
  57.  
  58. static int task_count_;
  59. // Number of tasks running.
  60. };
  61.  
  62. // Static data member initialization.
  63. int Test_Task::task_count_ = 0;
  64.  
  65. static ACE_Atomic_Op<ACE_Thread_Mutex,int> done_count = MAX_TASKS * 2;
  66.  
  67.  
  68.  
  69. static ACE_Recursive_Thread_Mutex recursive_lock;
  70.  
  71. Test_Task::Test_Task (void)
  72. : handled_ (0)
  73. {
  74. ACE_GUARD (ACE_Recursive_Thread_Mutex,ace_mon,recursive_lock);
  75.  
  76. Test_Task::task_count_++;
  77.  
  78. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) TT+ Test_Task::task_count_ = %d\n"),Test_Task::task_count_));
  79. }
  80.  
  81. Test_Task::~Test_Task (void)
  82. {
  83. ACE_GUARD (ACE_Recursive_Thread_Mutex,recursive_lock);
  84.  
  85. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) TT- Test_Task::task_count_ = %d\n"),Test_Task::task_count_));
  86.  
  87. ACE_ASSERT (Test_Task::task_count_ == 0);
  88. }
  89.  
  90. int
  91. Test_Task::open (void *args)
  92. {
  93. this->reactor (reinterpret_cast<ACE_Reactor *> (args));
  94. return this->activate (THR_NEW_LWP); // 只有调用了activate方法后,才能自动执行后面的svc方法
  95. }
  96.  
  97. int
  98. Test_Task::close (u_long)
  99. {
  100. ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,recursive_lock,-1);
  101.  
  102. Test_Task::task_count_--;
  103.  
  104. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) close Test_Task::task_count_ = %d\n"),Test_Task::task_count_));
  105.  
  106. ACE_ASSERT (Test_Task::task_count_ >= 0);
  107.  
  108. return 0;
  109. }
  110.  
  111. int
  112. Test_Task::svc (void) // 在上面调用了activate方法后,任务自动执行svc方法
  113. {
  114. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) svc\n")));
  115.  
  116. for (size_t i = 0; i < ACE_MAX_ITERATIONS; i++)
  117. {
  118. ACE_OS::thr_yield ();
  119.  
  120. // Only wait up to 10 milliseconds to notify the Reactor.
  121. ACE_Time_Value timeout (0,10 * 1000);
  122.  
  123. // 调用 Reactor 的 notify 方法向 Reactor 发送通知,后面的 handle_input 和 handle_close 方法在 Reactor 处理通知(在下面)时被回调。
  124. if (this->reactor ()->notify (this,ACE_Event_Handler::READ_MASK,&timeout) == -1)
  125. {
  126. if (errno == ETIME)
  127. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) %p\n"),ACE_TEXT ("notify() timed out")));
  128. else
  129. ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("notify")),-1);
  130. }
  131. }
  132.  
  133. return 0;
  134. }
  135.  
  136. int
  137. Test_Task::handle_close (ACE_HANDLE,ACE_Reactor_Mask)
  138. {
  139. return 0;
  140. }
  141.  
  142. int
  143. Test_Task::handle_input (ACE_HANDLE)
  144. {
  145. this->handled_++;
  146.  
  147. if (this->handled_ == ACE_MAX_ITERATIONS)
  148. {
  149. done_count--;
  150. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) handle_input,handled_ = %d,done_count = %d\n"),this->handled_,done_count.value ()));
  151. }
  152.  
  153. ACE_OS::thr_yield ();
  154. return -1;
  155. }
  156.  
  157. static void *
  158. worker (void *args)
  159. {
  160. ACE_Reactor *reactor = reinterpret_cast<ACE_Reactor *> (args);
  161.  
  162. // Make this thread the owner of the Reactor's event loop.
  163. reactor->owner (ACE_Thread::self ());
  164.  
  165. // Use a timeout to inform the Reactor when to shutdown.
  166. ACE_Time_Value timeout (4);
  167.  
  168. for (;;)
  169. switch (reactor->handle_events (timeout)) // 调用 Reactor 的 handle_events 方法,循环处理任务发来的通知(在上面),回调任务的 handle_input 和 handle_close 方法
  170. {
  171. case -1:
  172. ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("reactor")),0);
  173. /* NOTREACHED */
  174. case 0:
  175. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) Reactor shutdown\n")));
  176. return 0;
  177. }
  178.  
  179. ACE_NOTREACHED (return 0);
  180. }
  181.  
  182. #endif /* ACE_HAS_THREADS */
  183.  
  184. int
  185. run_main (int,ACE_TCHAR *[])
  186. {
  187. ACE_START_TEST (ACE_TEXT ("Reactors_Test"));
  188.  
  189. #if defined (ACE_HAS_THREADS)
  190. ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
  191.  
  192. thr_mgr = ACE_Thread_Manager::instance ();
  193.  
  194. ACE_Reactor reactor;
  195. ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
  196.  
  197. Test_Task tt1[MAX_TASKS];
  198. Test_Task tt2[MAX_TASKS];
  199.  
  200. // Activate all of the Tasks.
  201.  
  202. for (int i = 0; i < MAX_TASKS; i++)
  203. {
  204. tt1[i].open (ACE_Reactor::instance ()); // 给任务关联Reactor。
  205. tt2[i].open (&reactor); // 给任务关联Reactor。
  206. }
  207.  
  208. //创建两线程,分别监视两个Reactor。
  209. // Spawn two threads each running a different reactor.
  210.  
  211. if (ACE_Thread_Manager::instance ()->spawn
  212. (ACE_THR_FUNC (worker),(void *) ACE_Reactor::instance (),THR_BOUND | THR_DETACHED) == -1)
  213. ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("%p\n"),ACE_TEXT ("spawn")),-1);
  214.  
  215. else if (ACE_Thread_Manager::instance ()->spawn
  216. (ACE_THR_FUNC (worker),(void *) &reactor,-1);
  217.  
  218. // 等待所有线程结束。
  219. if (ACE_Thread_Manager::instance ()->wait () == -1)
  220. ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("wait")),-1);
  221.  
  222. ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) all threads are finished\n")));
  223.  
  224. #else
  225. ACE_ERROR ((LM_INFO,ACE_TEXT ("threads not supported on this platform\n")));
  226. #endif /* ACE_HAS_THREADS */
  227. ACE_END_TEST;
  228. return 0;
  229. }


在run_main函数里写上下面输出语句,看看 ACE_Reactor::instance() 和 &reactor 是否相等。

printf("ACE_Reactor::instance() = %d,&reactor = %d,reactor.instance() = %d\n",ACE_Reactor::instance(),&reactor,reactor.instance());

运行发现:ACE_Reactor::instance() 和 reactor.instance() 相等,但与 &reactor 不相等。

查看ACE源代码,得知应该是下面原因。

ACE_Reactor 类里有 reactor_ 静态成员:

/// Pointer to a process-wide ACE_Reactor singleton.(指向一个进程内的ACE_Reactor单例)
static ACE_Reactor *reactor_;

但ACE_Reactor的构造函数里并没有对 reactor_ 如下赋值:
reactor_ = this;

所以 ACE_Reactor reactor; 语句没有把 &reactor 传入类中。

猜你在找的React相关文章