在ACE 6.0.0 里提供了一个Reactor示例程序,对其分析一下(在代码的注释里)。
Reactors_Test.cpp文件(在 \ACE-6.0.0\ACE_wrappers\tests\ 目录下):
- // $Id: Reactors_Test.cpp 91671 2010-09-08 18:39:23Z johnnyw $
- // ============================================================================
- //
- // = LIBRARY
- // tests
- //
- // = FILENAME
- // Reactors_Test.cpp
- //
- // = DESCRIPTION
- // This is a test that performs a torture test of multiple
- // <ACE_Reactors> and <ACE_Tasks> in the same process.
- //
- // = AUTHOR
- // Prashant Jain <pjain@cs.wustl.edu>,// Detlef Becker <Detlef.Becker@med.siemens.de>,and
- // Douglas C. Schmidt <schmidt@cs.wustl.edu>
- //
- // ============================================================================
- #include "test_config.h"
- #include "ace/Task.h"
- #include "ace/Reactor.h"
- #include "ace/Atomic_Op.h"
- #include "ace/Recursive_Thread_Mutex.h"
- #if defined (ACE_HAS_THREADS)
- ACE_Thread_Manager *thr_mgr;
- static const int MAX_TASKS = 20;
- class Test_Task : public ACE_Task<ACE_MT_SYNCH>
- // = TITLE
- // Exercise the tasks.
- {
- public:
- // = Initialization and termination methods.
- Test_Task (void);
- ~Test_Task (void);
- //FUZZ: disable check_for_lack_ACE_OS
- // = Task hooks.
- virtual int open (void *args = 0);
- virtual int close (u_long flags = 0);
- virtual int svc (void);
- //FUZZ: enable check_for_lack_ACE_OS
- // = Event Handler hooks.
- virtual int handle_input (ACE_HANDLE handle);
- virtual int handle_close (ACE_HANDLE fd,ACE_Reactor_Mask close_mask);
- private:
- size_t handled_;
- // Number of iterations handled.
- static int task_count_;
- // Number of tasks running.
- };
- // Static data member initialization.
- int Test_Task::task_count_ = 0;
- static ACE_Atomic_Op<ACE_Thread_Mutex,int> done_count = MAX_TASKS * 2;
- static ACE_Recursive_Thread_Mutex recursive_lock;
- Test_Task::Test_Task (void)
- : handled_ (0)
- {
- ACE_GUARD (ACE_Recursive_Thread_Mutex,ace_mon,recursive_lock);
- Test_Task::task_count_++;
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) TT+ Test_Task::task_count_ = %d\n"),Test_Task::task_count_));
- }
- Test_Task::~Test_Task (void)
- {
- ACE_GUARD (ACE_Recursive_Thread_Mutex,recursive_lock);
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) TT- Test_Task::task_count_ = %d\n"),Test_Task::task_count_));
- ACE_ASSERT (Test_Task::task_count_ == 0);
- }
- int
- Test_Task::open (void *args)
- {
- this->reactor (reinterpret_cast<ACE_Reactor *> (args));
- return this->activate (THR_NEW_LWP); // 只有调用了activate方法后,才能自动执行后面的svc方法。
- }
- int
- Test_Task::close (u_long)
- {
- ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,recursive_lock,-1);
- Test_Task::task_count_--;
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) close Test_Task::task_count_ = %d\n"),Test_Task::task_count_));
- ACE_ASSERT (Test_Task::task_count_ >= 0);
- return 0;
- }
- int
- Test_Task::svc (void) // 在上面调用了activate方法后,任务自动执行svc方法。
- {
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) svc\n")));
- for (size_t i = 0; i < ACE_MAX_ITERATIONS; i++)
- {
- ACE_OS::thr_yield ();
- // Only wait up to 10 milliseconds to notify the Reactor.
- ACE_Time_Value timeout (0,10 * 1000);
- // 调用 Reactor 的 notify 方法向 Reactor 发送通知,后面的 handle_input 和 handle_close 方法在 Reactor 处理通知(在下面)时被回调。
- if (this->reactor ()->notify (this,ACE_Event_Handler::READ_MASK,&timeout) == -1)
- {
- if (errno == ETIME)
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) %p\n"),ACE_TEXT ("notify() timed out")));
- else
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("notify")),-1);
- }
- }
- return 0;
- }
- int
- Test_Task::handle_close (ACE_HANDLE,ACE_Reactor_Mask)
- {
- return 0;
- }
- int
- Test_Task::handle_input (ACE_HANDLE)
- {
- this->handled_++;
- if (this->handled_ == ACE_MAX_ITERATIONS)
- {
- done_count--;
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) handle_input,handled_ = %d,done_count = %d\n"),this->handled_,done_count.value ()));
- }
- ACE_OS::thr_yield ();
- return -1;
- }
- static void *
- worker (void *args)
- {
- ACE_Reactor *reactor = reinterpret_cast<ACE_Reactor *> (args);
- // Make this thread the owner of the Reactor's event loop.
- reactor->owner (ACE_Thread::self ());
- // Use a timeout to inform the Reactor when to shutdown.
- ACE_Time_Value timeout (4);
- for (;;)
- switch (reactor->handle_events (timeout)) // 调用 Reactor 的 handle_events 方法,循环处理任务发来的通知(在上面),回调任务的 handle_input 和 handle_close 方法。
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("reactor")),0);
- /* NOTREACHED */
- case 0:
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) Reactor shutdown\n")));
- return 0;
- }
- ACE_NOTREACHED (return 0);
- }
- #endif /* ACE_HAS_THREADS */
- int
- run_main (int,ACE_TCHAR *[])
- {
- ACE_START_TEST (ACE_TEXT ("Reactors_Test"));
- #if defined (ACE_HAS_THREADS)
- ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
- thr_mgr = ACE_Thread_Manager::instance ();
- ACE_Reactor reactor;
- ACE_ASSERT (ACE_LOG_MSG->op_status () != -1);
- Test_Task tt1[MAX_TASKS];
- Test_Task tt2[MAX_TASKS];
- // Activate all of the Tasks.
- for (int i = 0; i < MAX_TASKS; i++)
- {
- tt1[i].open (ACE_Reactor::instance ()); // 给任务关联Reactor。
- tt2[i].open (&reactor); // 给任务关联Reactor。
- }
- //创建两线程,分别监视两个Reactor。
- // Spawn two threads each running a different reactor.
- if (ACE_Thread_Manager::instance ()->spawn
- (ACE_THR_FUNC (worker),(void *) ACE_Reactor::instance (),THR_BOUND | THR_DETACHED) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("%p\n"),ACE_TEXT ("spawn")),-1);
- else if (ACE_Thread_Manager::instance ()->spawn
- (ACE_THR_FUNC (worker),(void *) &reactor,-1);
- // 等待所有线程结束。
- if (ACE_Thread_Manager::instance ()->wait () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("wait")),-1);
- ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) all threads are finished\n")));
- #else
- ACE_ERROR ((LM_INFO,ACE_TEXT ("threads not supported on this platform\n")));
- #endif /* ACE_HAS_THREADS */
- ACE_END_TEST;
- return 0;
- }
在run_main函数里写上下面输出语句,看看 ACE_Reactor::instance() 和 &reactor 是否相等。
printf("ACE_Reactor::instance() = %d,&reactor = %d,reactor.instance() = %d\n",ACE_Reactor::instance(),&reactor,reactor.instance());
运行发现:ACE_Reactor::instance() 和 reactor.instance() 相等,但与 &reactor 不相等。
查看ACE源代码,得知应该是下面原因。
ACE_Reactor 类里有 reactor_ 静态成员:
/// Pointer to a process-wide ACE_Reactor singleton.(指向一个进程内的ACE_Reactor单例)
static ACE_Reactor *reactor_;
但ACE_Reactor的构造函数里并没有对 reactor_ 如下赋值:
reactor_ = this;
所以 ACE_Reactor reactor; 语句没有把 &reactor 传入类中。