多线程 – “任何”代码的Perl异步任务,无论它是什么?

前端之家收集整理的这篇文章主要介绍了多线程 – “任何”代码的Perl异步任务,无论它是什么?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我一直在编写一个“检查器”系统,对各种服务,系统,数据库,文件等执行各种“检查”.“检查”本质上是通用的,可以是任何东西.所有检查都以他们通过或失败的通用格式天气报告,无论可能是什么.

它以模块化OO方式编写,因此开发人员可以简单地遵循框架并独立于其中一个编写检查.每个对象都包含一个共享的报告对象,在它们运行检查后,它们只需$self-> {‘reporting’} – > report(params).定义了参数,并假设开发人员适当报告.报告对象然后索引这些报告.我的主加载器脚本包含以下条目:

  1. my $reportingObject = new Checks::Reporting(params);
  2. my @checks;
  3.  
  4. push @checks,new Checks::Check_One($reportingObject,params));
  5. push @checks,params));
  6. .
  7. .
  8. push @checks,new Checks::Check_N($reportingObject,params));

在完成检查并完成报告后,我一直在做:

  1. foreach my $check (@checks) {
  2. $check->run_stuff();
  3. }
  4.  
  5. $reportingObject->finalize_report();

现在,由于这些检查完全独立(不用担心报告对象),因此它们可以并行运行.作为一项改进,我做了:

  1. my @threads;
  2. foreach my $check (@checks) {
  3. push @threads,async { $check->run_stuff(); }
  4. }
  5.  
  6. foreach my $thread (@threads) {
  7. $thread->join;
  8. }
  9.  
  10. #All threads are complete,and thus all checks are done
  11. $reportingObject->finalize_report();

正如我之前所说,开发人员将彼此独立地编写Checks.有些检查很简单,有些则不是.简单检查可能没有异步代码,但其他人可能需要在内部异步运行,例如

  1. sub do_check {
  2. my @threads;
  3. my @list = @{$self->{'list'}};
  4.  
  5. foreach my $item (@list) {
  6. push @threads,async {
  7. #do_work_on_$item
  8. #return 1 or 0 for success or fail
  9. };
  10. foreach my $thread (@threads) {
  11. my $res = $thread->join;
  12. if($res == 1) {
  13. $self->{'reporting'}->report(params_here);
  14. }
  15. }
  16. }
  17. }

正如您所看到的,线程模型允许我以非常模糊的方式执行操作.每个“检查”无论它在自己的线程中独立运行.如果一个开发人员有异步操作,无论它是什么,他只是在自己的线程中独立完成.我想要一个类似于此的模型.

不幸的是,线程缓慢且效率低下.所有异步库都有特定的观察者,如IO等.我不想要任何特定的东西.我想要一个基于事件的模型,它允许我简单地启动异步任务,无论它们是什么,只需通知他们什么时候完成所以我可以继续前进.

希望这能解释它,你可以指出我正确的方向.

解决方法

这似乎非常适合老板工人模型:

>在项目开始时产生一些工人.确保他们都可以访问队列.
>根据需要排队多次检查.工作人员将检查队列出列,执行它们,并将结果排入输出队列.
>您的主线程查看输出线程的结果,并执行它想要的任何操作.
>在END区块加入工人

如果您希望将coderefs放入队列,您可能希望查看Thread::Queue::Any.

这是一个完全可运行的例子:

  1. use strict; use feature 'say';
  2. use threads; use threads::shared; use Thread::Queue::Any;
  3. use constant NUM_THREADS => 5;
  4. local $Storable::Deparse = 1; local $Storable::Eval = 1; # needed to serialize code
  5.  
  6. my $check_q = Thread::Queue::Any->new;
  7. my $result_q = Thread::Queue::Any->new;
  8.  
  9. # start the workers
  10. {
  11. my $running :shared = NUM_THREADS;
  12. my @threads = map threads->new(\&worker,$check_q,$result_q,\$running),1..NUM_THREADS;
  13.  
  14. END { $_->join for @threads }
  15. }
  16.  
  17. # enqueue the checks
  18. $check_q->enqueue($_) for sub {1},sub{2},sub{"hi"},sub{ die };
  19. $check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue
  20.  
  21. while(defined( my $result = $result_q->dequeue )) {
  22. report($$result);
  23. }
  24.  
  25. sub report {
  26. say shift // "Failed";
  27. }
  28.  
  29. sub worker {
  30. my ($in,$out,$running_ref) = @_;
  31. while (defined( my $check = $in->dequeue )) {
  32. my $result = eval { $check->() };
  33. $out->enqueue(\$result);
  34. }
  35.  
  36. # last thread closes the door
  37. lock $$running_ref;
  38. --$$running_ref || $out->enqueue(undef);
  39. }

这打印

  1. 1
  2. 2
  3. hi
  4. Failed

以略微随机的顺序.

猜你在找的Java相关文章