微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring DefaultMessageListenerContainer和ActiveMQ

我已将Spring DefaultMessageListenerContainer配置为消耗来自队列的消息的ActiveMQ使用者.我们称之为“Test.Queue”
我将此代码部署在4台不同的计算机上,并将所有计算机配置到同一个ActiveMQ实例,以处理来自相同“Test.Queue”队列的消息.

所有4台机器都启动并运行后,我将最大消费者大小设置为20,我看到消费者数量与队列数相比为80(4 *最大消费者大小= 80)

生成并发送到队列的消息变得很高时,一切都很好.

当有1000个消息时,在80个消费者中,让我们说其中一个被卡住它会冻结Active MQ以停止向其他消费者发送消息.

所有消息都永远停留在ActiveMQ中.

由于我有4台机器,最多有80名消费者,我不知道哪个消费者没有承认.

我停下来重新启动所有4台机器,当我停止那些卡住了坏消费者的机器时,消息再次开始流动.

我不知道如何配置DefaultMessageListenerContainer以放弃坏消费者并立即发信号通知ActiveMQ以开始发送消息.

即使没有Spring,我也可以创建场景,如下所示:

>我最多生成了5000条消息并将它们发送到“Test.Queue”队列
>我创建了2个消费者(消费者A,B)和一个消费者B.
onMessage()方法,我把线程睡了很长时间(
Thread.sleep(Long.MAX_VALUE))具有类似于当前时间%13为0的条件然后将线程置于休眠状态.
>这两个消费者.
>去了Active MQ,发现队列有2个消费者.
> A和B都在处理消息
>在某个时间点,消费者B的onMessage()被调用,并且当当前时间%13的条件为0时,它使线程处于休眠状态.
>消费者B被卡住,无法向经纪人承认
>我回到Active MQ Web控制台,仍然将消费者视为2,但没有消息出列.
>现在我创建了另一个消费者C并运行它来消费.
>只有ActiveMQ中的消费者数量从2增加到3.
>但消费者C并没有消费任何东西,因为经纪人没有发送任何持有这些消息的消息,因为它仍然在等待消费者B承认它.
>我还注意到消费者A没有消费任何东西
>我去杀死消费者B,现在所有的消息都被消耗掉了.

假设A,B,C是由Spring的使用DefaultMessageListenerContainer管理,我该如何调整弹簧使用DefaultMessageListenerContainer采取坏消费掉池(在我的情况消费者B)后未能确认为x秒,承认经纪人立刻让经纪人没有永久保留消息.

谢谢你的时间.

感谢我能解决这个问题.

最佳答案
这里有几个选项可供尝试……

>将队列预取设置为0,以促进更好地分配给消费者,并减少特定消费者的“卡住”消息.见http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>在连接上设置“?useKeepAlive = false& wireFormat.maxInactivityDuration = 20000”以在指定的非活动时间后超时
>再次设置队列策略“slowConsumerStrategy-> abortSlowConsumer”…以使缓慢的消费者超时

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐