ruby-on-rails – Rails:如何监听/拉取服务或队列?

前端之家收集整理的这篇文章主要介绍了ruby-on-rails – Rails:如何监听/拉取服务或队列?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
大多数Rails应用程序的工作方式是它们等待来自客户端的请求然后发挥其魔力.
但是,如果我想使用Rails应用程序作为微服务架构的一部分(例如)与一些非常好的通信(Serivce A将事件发送到Kafka或RabbitMQ队列和服务B – 我的Rails应用程序 – 应该监听此队列),如何调整/启动Rails应用程序以立即收听队列并由来自那里的事件触发? (这意味着初始触发器不是来自客户端,而是来自应用程序本身.)

谢谢你的建议!

解决方法

我只是在我的应用程序中设置RabbitMQ消息传递,并将在第二天左右实现解耦(多个,分布式)应用程序.我发现 this文章非常有帮助(和 RabbitMQ tutorials一样).以下所有代码均适用于RabbitMQ,并假设您已在本地计算机上启动并运行RabbitMQ服务器.

这是我到目前为止所做的 – 这对我有用:

  1. #Gemfile
  2. gem 'bunny'
  3. gem 'sneakers'

我有一个发送到队列的发布者:

  1. # app/agents/messaging/publisher.rb
  2. module Messaging
  3. class Publisher
  4. class << self
  5.  
  6. def publish(args)
  7. connection = Bunny.new
  8. connection.start
  9. channel = connection.create_channel
  10. queue_name = "#{args.keys.first.to_s.pluralize}_queue"
  11. queue = channel.queue(queue_name,durable: true)
  12. channel.default_exchange.publish(args[args.keys.first].to_json,:routing_key => queue.name)
  13. puts "in #{self}.#{__method__},[x] Sent #{args}!"
  14. connection.close
  15. end
  16.  
  17. end
  18. end
  19. end

我用的是这样的:

  1. Messaging::Publisher.publish(event: {... event details...})

然后我有了’倾听者’:

  1. # app/agents/messaging/events_queue_receiver.rb
  2. require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"
  3.  
  4. module Messaging
  5. class EventsQueueReceiver
  6. include Sneakers::Worker
  7. from_queue :events_queue,env: nil
  8.  
  9. def work(msg)
  10. logger.info msg
  11. response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
  12. ack! if response[:success]
  13. end
  14.  
  15. end
  16. end

‘listener’将消息发送到Messaging :: EventsAgent.distribute,如下所示:

  1. # app/agents/messaging/events_agent.rb
  2. require_dependency #{Rails.root.join('app','fsm','state_assignment_agent')}"
  3.  
  4. module Messaging
  5. class EventsAgent
  6. EVENT_HANDLERS = {
  7. enroll_in_program: ["FSM::StateAssignmentAgent"]
  8. }
  9. class << self
  10.  
  11. def publish(event)
  12. Messaging::Publisher.publish(event: event)
  13. end
  14.  
  15. def distribute(event)
  16. puts "in #{self}.#{__method__},message"
  17. if event[:handler]
  18. puts "in #{self}.#{__method__},event[:handler: #{event[:handler}"
  19. event[:handler].constantize.handle_event(event)
  20. else
  21. event_name = event[:event_name].to_sym
  22. EVENT_HANDLERS[event_name].each do |handler|
  23. event[:handler] = handler
  24. publish(event)
  25. end
  26. end
  27. return {success: true}
  28. end
  29.  
  30. end
  31. end
  32. end

按照Codetunes的说明,我有:

  1. # Rakefile
  2. # Add your own tasks in files placed in lib/tasks ending in .rake,# for example lib/tasks/capistrano.rake,and they will automatically be available to Rake.
  3.  
  4. require File.expand_path('../config/application',__FILE__)
  5.  
  6. require 'sneakers/tasks'
  7. Rails.application.load_tasks

和:

  1. # app/config/sneakers.rb
  2. Sneakers.configure({})
  3. Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy

我打开两个控制台窗口.在第一个,我说(让我的听众运行):

  1. $WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
  2. ... a bunch of start up info
  3. 2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  4. 2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  5. 2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  6. 2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2

在第二个,我说:

  1. $rails s --sandBox
  2. 2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program",:program_system_name=>"aha_chh",:person_id=>1}})
  3. in Messaging::Publisher.publish,[x] Sent {:event=>{:event_name=>"enroll_in_program",:person_id=>1}}!
  4. => :closed

然后,回到我的第一个窗口,我看到:

  1. 2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
  2. in Messaging::EventsAgent.distribute,message
  3. in Messaging::EventsAgent.distribute,event[:handler]: FSM::StateAssignmentAgent

在我的RabbitMQ服务器中,我看到:

这是一个非常小的设置,我相信在接下来的几天里我会学到很多东西.

祝好运!

猜你在找的Ruby相关文章