目前,我有一切与pop一起工作(每个消费者每5ms弹出一次).这似乎是不合需要的,因为它转换为每秒600次popm请求到rabbitmq.
我很乐意,如果有一个pop命令会像订阅一样,但只有一条消息. (进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)
我试图欺骗AMQP gem来做这样的事情,但它不起作用:在队列为空并且不再向消费者发送消息之前,我似乎无法取消订阅.其他取消订阅的方法我担心会失去信息.
consume_1.rb:
require "amqp" EventMachine.run do puts "connecting..." connection = AMQP.connect(:host => "localhost",:user => "guest",:pass => "guest",:vhost => "/") puts "Connected to AMQP broker" channel = AMQP::Channel.new(connection) queue = channel.queue("tasks",:auto_delete => true) exchange = AMQP::Exchange.default(channel) queue.subscribe do |payload| puts "Received a message: #{payload}." queue.unsubscribe { puts "unbound" } sleep 3 end end
consumer_many.rb:
require "amqp" # Imagine the command is something cpu - intensive like image processing. command = "sleep 0.1" EventMachine.run do puts "connecting..." connection = AMQP.connect(:host => "localhost",:auto_delete => true) exchange = AMQP::Exchange.default(channel) queue.subscribe do |payload| puts "Received a message: #{payload}." end end
producer.rb:
require "amqp" i = 0 EventMachine.run do connection = AMQP.connect(:host => "localhost",:auto_delete => true) exchange = AMQP::Exchange.default(channel) EM.add_periodic_timer(1) do msg = "Message #{i}" i+=1 puts "~ publishing #{msg}" end end
我将启动consume_many.rb和producer.rb.消息将按预期流动.
当我启动consume_1.rb时,它会获取所有其他消息(如预期的那样).但它永远不会取消订阅,因为它永远不会完成处理它的所有消息……所以就这样了.
我如何让consume_1.rb订阅队列,获取单个消息,然后将自己从负载均衡器环中取出,这样它就可以完成它的工作,而不会丢失可能在队列中的任何其他待处理作业否则会被安排发送到这个过程?
蒂姆
解决方法
在您的消费者中:
channel = AMQP::Channel.new(connection,:prefetch => 1)
然后使用您的订阅块,执行:
queue.subscribe(:ack => true) do |queue_header,payload| puts "Received a message: #{payload}." # Do long running work here # AckNowledge message queue_header.ack end
这是什么告诉RabbitMQ一次只发送你的消费者1消息,而不是发送另一条消息,直到你在长时间运行一个长时间运行的任务后调用队列头上的ack.
我可能需要对此进行纠正,但我相信直接交换会更适合这项任务.
原文地址:https://www.jb51.cc/ruby/270533.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。