大多数Rails应用程序的工作方式是它们等待来自客户端的请求然后发挥其魔力.
但是,如果我想使用Rails应用程序作为微服务架构的一部分(例如)与一些非常好的通信(Serivce A将事件发送到Kafka或RabbitMQ队列和服务B – 我的Rails应用程序 – 应该监听此队列),如何调整/启动Rails应用程序以立即收听队列并由来自那里的事件触发? (这意味着初始触发器不是来自客户端,而是来自应用程序本身.)
但是,如果我想使用Rails应用程序作为微服务架构的一部分(例如)与一些非常好的通信(Serivce A将事件发送到Kafka或RabbitMQ队列和服务B – 我的Rails应用程序 – 应该监听此队列),如何调整/启动Rails应用程序以立即收听队列并由来自那里的事件触发? (这意味着初始触发器不是来自客户端,而是来自应用程序本身.)
谢谢你的建议!
解决方法
我只是在我的应用程序中设置RabbitMQ消息传递,并将在第二天左右实现解耦(多个,分布式)应用程序.我发现
this文章非常有帮助(和
RabbitMQ tutorials一样).以下所有代码均适用于RabbitMQ,并假设您已在本地计算机上启动并运行RabbitMQ服务器.
这是我到目前为止所做的 – 这对我有用:
- #Gemfile
- gem 'bunny'
- gem 'sneakers'
我有一个发送到队列的发布者:
- # app/agents/messaging/publisher.rb
- module Messaging
- class Publisher
- class << self
- def publish(args)
- connection = Bunny.new
- connection.start
- channel = connection.create_channel
- queue_name = "#{args.keys.first.to_s.pluralize}_queue"
- queue = channel.queue(queue_name,durable: true)
- channel.default_exchange.publish(args[args.keys.first].to_json,:routing_key => queue.name)
- puts "in #{self}.#{__method__},[x] Sent #{args}!"
- connection.close
- end
- end
- end
- end
我用的是这样的:
- Messaging::Publisher.publish(event: {... event details...})
然后我有了’倾听者’:
- # app/agents/messaging/events_queue_receiver.rb
- require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"
- module Messaging
- class EventsQueueReceiver
- include Sneakers::Worker
- from_queue :events_queue,env: nil
- def work(msg)
- logger.info msg
- response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
- ack! if response[:success]
- end
- end
- end
‘listener’将消息发送到Messaging :: EventsAgent.distribute,如下所示:
- # app/agents/messaging/events_agent.rb
- require_dependency #{Rails.root.join('app','fsm','state_assignment_agent')}"
- module Messaging
- class EventsAgent
- EVENT_HANDLERS = {
- enroll_in_program: ["FSM::StateAssignmentAgent"]
- }
- class << self
- def publish(event)
- Messaging::Publisher.publish(event: event)
- end
- def distribute(event)
- puts "in #{self}.#{__method__},message"
- if event[:handler]
- puts "in #{self}.#{__method__},event[:handler: #{event[:handler}"
- event[:handler].constantize.handle_event(event)
- else
- event_name = event[:event_name].to_sym
- EVENT_HANDLERS[event_name].each do |handler|
- event[:handler] = handler
- publish(event)
- end
- end
- return {success: true}
- end
- end
- end
- end
按照Codetunes的说明,我有:
- # Rakefile
- # Add your own tasks in files placed in lib/tasks ending in .rake,# for example lib/tasks/capistrano.rake,and they will automatically be available to Rake.
- require File.expand_path('../config/application',__FILE__)
- require 'sneakers/tasks'
- Rails.application.load_tasks
和:
- # app/config/sneakers.rb
- Sneakers.configure({})
- Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy
我打开两个控制台窗口.在第一个,我说(让我的听众运行):
- $WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
- ... a bunch of start up info
- 2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
- 2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
- 2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
- 2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2
在第二个,我说:
- $rails s --sandBox
- 2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program",:program_system_name=>"aha_chh",:person_id=>1}})
- in Messaging::Publisher.publish,[x] Sent {:event=>{:event_name=>"enroll_in_program",:person_id=>1}}!
- => :closed
然后,回到我的第一个窗口,我看到:
- 2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
- in Messaging::EventsAgent.distribute,message
- in Messaging::EventsAgent.distribute,event[:handler]: FSM::StateAssignmentAgent
在我的RabbitMQ服务器中,我看到:
这是一个非常小的设置,我相信在接下来的几天里我会学到很多东西.
祝好运!