我一直在编写一个“检查器”系统,对各种服务,系统,数据库,文件等执行各种“检查”.“检查”本质上是通用的,可以是任何东西.所有检查都以他们通过或失败的通用格式天气报告,无论可能是什么.
它以模块化OO方式编写,因此开发人员可以简单地遵循框架并独立于其中一个编写检查.每个对象都包含一个共享的报告对象,在它们运行检查后,它们只需$self-> {‘reporting’} – > report(params).定义了参数,并假设开发人员适当报告.报告对象然后索引这些报告.我的主加载器脚本包含以下条目:
- my $reportingObject = new Checks::Reporting(params);
- my @checks;
- push @checks,new Checks::Check_One($reportingObject,params));
- push @checks,params));
- .
- .
- push @checks,new Checks::Check_N($reportingObject,params));
在完成检查并完成报告后,我一直在做:
- foreach my $check (@checks) {
- $check->run_stuff();
- }
- $reportingObject->finalize_report();
现在,由于这些检查完全独立(不用担心报告对象),因此它们可以并行运行.作为一项改进,我做了:
- my @threads;
- foreach my $check (@checks) {
- push @threads,async { $check->run_stuff(); }
- }
- foreach my $thread (@threads) {
- $thread->join;
- }
- #All threads are complete,and thus all checks are done
- $reportingObject->finalize_report();
正如我之前所说,开发人员将彼此独立地编写Checks.有些检查很简单,有些则不是.简单检查可能没有异步代码,但其他人可能需要在内部异步运行,例如
- sub do_check {
- my @threads;
- my @list = @{$self->{'list'}};
- foreach my $item (@list) {
- push @threads,async {
- #do_work_on_$item
- #return 1 or 0 for success or fail
- };
- foreach my $thread (@threads) {
- my $res = $thread->join;
- if($res == 1) {
- $self->{'reporting'}->report(params_here);
- }
- }
- }
- }
正如您所看到的,线程模型允许我以非常模糊的方式执行操作.每个“检查”无论它在自己的线程中独立运行.如果一个开发人员有异步操作,无论它是什么,他只是在自己的线程中独立完成.我想要一个类似于此的模型.
不幸的是,线程缓慢且效率低下.所有异步库都有特定的观察者,如IO等.我不想要任何特定的东西.我想要一个基于事件的模型,它允许我简单地启动异步任务,无论它们是什么,只需通知他们什么时候完成所以我可以继续前进.
希望这能解释它,你可以指出我正确的方向.
解决方法
这似乎非常适合老板工人模型:
>在项目开始时产生一些工人.确保他们都可以访问队列.
>根据需要排队多次检查.工作人员将检查队列出列,执行它们,并将结果排入输出队列.
>您的主线程查看输出线程的结果,并执行它想要的任何操作.
>在END区块加入工人
如果您希望将coderefs放入队列,您可能希望查看Thread::Queue::Any
.
这是一个完全可运行的例子:
- use strict; use feature 'say';
- use threads; use threads::shared; use Thread::Queue::Any;
- use constant NUM_THREADS => 5;
- local $Storable::Deparse = 1; local $Storable::Eval = 1; # needed to serialize code
- my $check_q = Thread::Queue::Any->new;
- my $result_q = Thread::Queue::Any->new;
- # start the workers
- {
- my $running :shared = NUM_THREADS;
- my @threads = map threads->new(\&worker,$check_q,$result_q,\$running),1..NUM_THREADS;
- END { $_->join for @threads }
- }
- # enqueue the checks
- $check_q->enqueue($_) for sub {1},sub{2},sub{"hi"},sub{ die };
- $check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue
- while(defined( my $result = $result_q->dequeue )) {
- report($$result);
- }
- sub report {
- say shift // "Failed";
- }
- sub worker {
- my ($in,$out,$running_ref) = @_;
- while (defined( my $check = $in->dequeue )) {
- my $result = eval { $check->() };
- $out->enqueue(\$result);
- }
- # last thread closes the door
- lock $$running_ref;
- --$$running_ref || $out->enqueue(undef);
- }
这打印
- 1
- 2
- hi
- Failed
以略微随机的顺序.