如何解决ActiveMQ Artemis集群在一个实例崩溃后不重新分发消息
我在 Kubernetes 中有一个 Artemis 集群,有 3 组主/从:
activemq-artemis-master-0 1/1 Running
activemq-artemis-master-1 1/1 Running
activemq-artemis-master-2 1/1 Running
activemq-artemis-slave-0 0/1 Running
activemq-artemis-slave-1 0/1 Running
activemq-artemis-slave-2 0/1 Running
我正在使用 Spring boot JmsListener 来消费发送到通配符队列的消息,如下所示。
@Component
@Log4j2
public class QueueListener {
@Autowired
private ListenerControl listenerControl;
@JmsListener(id = "queueListener0",destination = "QUEUE.service2.*.*.*.notification")
public void add(String message,@Header("sentBy") String sentBy,@Header("sentFrom") String sentFrom,@Header("sentAt") Long sentAt) throws InterruptedException {
log.info("---QUEUE[notification]: message={},sentBy={},sentFrom={},sentAt={}",message,sentBy,sentFrom,sentAt);
TimeUnit.MILLISECONDS.sleep(listenerControl.getDuration());
}
}
有 20 条消息发送到队列,master-1 是传递节点。当消耗了 5 条消息时,我杀死了 master-1 节点来模拟崩溃,我看到 slave-1 开始运行,然后在 Kubernetes 重生后让步给 master-1。侦听器抛出 JMSException
表示连接丢失并尝试重新连接。然后我看到它成功连接到master-0(我看到创建了队列并且消费者计数> 0)。然而,master-0 上的队列是空的,而 master-1 中的同一个队列仍然有 15 条消息并且没有消费者附加到它。我等了一会儿,但 15 条消息从未送达。我不知道为什么重新分配没有开始。
master-1上的通配符队列在崩溃后重新上线时的属性是这样的(我手动替换了字段accesstoken的值,因为它有敏感信息):>
Attribute Value
AckNowledge attempts 0
Address QUEUE.service2.*.*.*.notification
Configuration managed false
Consumer count 0
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 15
Durable persistent size 47705
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age 523996
First message as json [{"JMSType":"service2","address":"QUEUE.service2.tech-drive2.188100000059.thai.notification","messageID":68026,"sentAt":1621957145988,"accesstoken":"REMOVED","type":3,"priority":4,"userID":"ID:56c7b509-bd6f-11eb-a348-de0dacf99072","_AMQ_GROUP_ID":"tech-drive2-188100000059-thai","sentBy":"user@email.com","durable":true,"JMSReplyTo":"queue://QUEUE.service2.tech-drive2.188100000059.thai.notification","__AMQ_CID":"e4469ea3-bd62-11eb-a348-de0dacf99072","sentFrom":"service2","originalDestination":"QUEUE.service2.tech-drive2.188100000059.thai.notification","_AMQ_ROUTING_TYPE":1,"JMSCorrelationID":"c329c733-1170-440a-9080-992a009d87a9","expiration":0,"timestamp":1621957145988}]
First message timestamp 1621957145988
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 119
Last value false
Last value key
Max consumers -1
Message count 15
Messages ackNowledged 0
Messages added 15
Messages expired 0
Messages killed 0
Name QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",component=addresses,address="QUEUE.service2.\*.\*.\*.notification",subcomponent=queues,routing-type="anycast",queue="QUEUE.service2.\*.\*.\*.notification"
Paused false
Persistent size 47705
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User f7bcdaed-8c0c-4bb5-ad03-ec06382cb557
Attribute Value
AckNowledge attempts 0
Address QUEUE.service2.*.*.*.notification
Configuration managed false
Consumer count 3
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age
First message as json [{}]
First message timestamp
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 119
Last value false
Last value key
Max consumers -1
Message count 0
Messages ackNowledged 0
Messages added 0
Messages expired 0
Messages killed 0
Name QUEUE.service2.*.*.*.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",queue="QUEUE.service2.\*.\*.\*.notification"
Paused false
Persistent size 0
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User f7bcdaed-8c0c-4bb5-ad03-ec06382cb557
使用的 Artemis 版本是 2.17.0。这是我在 master-0 broker.xml
中的集群配置。除了 connector-ref
更改为匹配代理外,其他代理的配置相同:
<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-artemis-master-0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>100000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>2244000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpsendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,hornetq,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpsendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpsendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=hornetq,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpsendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redistribution-delay>60000</redistribution-delay>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
<cluster-user>clusterUser</cluster-user>
<cluster-password>aShortclusterPassword</cluster-password>
<connectors>
<connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-0">tcp://activemq-artemis-slave-0.activemq-artemis-slave.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-1">tcp://activemq-artemis-slave-1.activemq-artemis-slave.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-2">tcp://activemq-artemis-master-2.activemq-artemis-master.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-slave-2">tcp://activemq-artemis-slave-2.activemq-artemis-slave.svc.cluster.local:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="activemq-artemis">
<connector-ref>activemq-artemis-master-0</connector-ref>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.1</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<!-- scale-down>true</scale-down -->
<static-connectors>
<connector-ref>activemq-artemis-master-0</connector-ref>
<connector-ref>activemq-artemis-slave-0</connector-ref>
<connector-ref>activemq-artemis-master-1</connector-ref>
<connector-ref>activemq-artemis-slave-1</connector-ref>
<connector-ref>activemq-artemis-master-2</connector-ref>
<connector-ref>activemq-artemis-slave-2</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<ha-policy>
<replication>
<master>
<group-name>activemq-artemis-0</group-name>
<quorum-Vote-wait>12</quorum-Vote-wait>
<Vote-on-replication-failure>true</Vote-on-replication-failure>
<!--we need this for auto failback-->
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
</core>
<core xmlns="urn:activemq:core">
<jmx-management-enabled>true</jmx-management-enabled>
</core>
</configuration>
从 Stack Overflow 的另一个答案中,我了解到我的高可用性拓扑是多余的,我打算删除从属设备。但是,我不认为奴隶是重新分发消息不起作用的原因。我是否缺少处理 Artemis 节点崩溃的配置?
更新 1: 正如贾斯汀建议的那样,我尝试使用 2 个没有 HA 的 Artemis 节点的集群。
activemq-artemis-master-0 1/1 Running 0 27m
activemq-artemis-master-1 1/1 Running 0 74s
以下是2 artemis节点的broker.xml。它们之间唯一的区别是节点名称和journal-buffer-timeout:
<?xml version="1.0"?>
<configuration xmlns="urn:activemq" xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-artemis-master-0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>100000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>2244000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpsendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpsendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<cluster-user>ClusterUser</cluster-user>
<cluster-password>longClusterPassword</cluster-password>
<connectors>
<connector name="activemq-artemis-master-0">tcp://activemq-artemis-master-0.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
<connector name="activemq-artemis-master-1">tcp://activemq-artemis-master-1.activemq-artemis-master.ncp-stack-testing.svc.cluster.local:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="activemq-artemis">
<connector-ref>activemq-artemis-master-0</connector-ref>
<retry-interval>500</retry-interval>
<retry-interval-multiplier>1.1</retry-interval-multiplier>
<max-retry-interval>5000</max-retry-interval>
<initial-connect-attempts>-1</initial-connect-attempts>
<reconnect-attempts>-1</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>activemq-artemis-master-0</connector-ref>
<connector-ref>activemq-artemis-master-1</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redistribution-delay>60000</redistribution-delay>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
</core>
<core xmlns="urn:activemq:core">
<jmx-management-enabled>true</jmx-management-enabled>
</core>
</configuration>
通过这个设置,我还是得到了同样的结果,在artemis节点崩溃并恢复之后,剩下的消息没有移动到另一个节点。
更新 2 我尝试按照贾斯汀的建议使用非通配符队列,但仍然得到相同的行为。我注意到的一个不同之处是,如果我使用非通配符队列,则消费者计数仅为 1,而在通配符队列的情况下为 3。这是崩溃后旧队列的属性
AckNowledge attempts 0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed false
Consumer count 0
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 15
Durable persistent size 102245
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age 840031
First message as json [{"JMSType":"service2","messageID":8739,"sentAt":1621969900922,"accesstoken":"DONOTdisPLAY","userID":"ID:09502dc0-bd8d-11eb-b75c-c6609f1332c9","__AMQ_CID":"c292b418-bd8b-11eb-b75c-c6609f1332c9","JMSCorrelationID":"90b783d0-d9cc-4188-9c9e-3453786b2105","timestamp":1621969900922}]
First message timestamp 1621969900922
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 606
Last value false
Last value key
Max consumers -1
Message count 15
Messages ackNowledged 0
Messages added 15
Messages expired 0
Messages killed 0
Name QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-0",address="QUEUE.service2.tech-drive2.188100000059.thai.notification",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused false
Persistent size 102245
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User 6e25e08b-9587-40a3-b7e9-146360539258
这是新队列的属性
Attribute Value
AckNowledge attempts 0
Address QUEUE.service2.tech-drive2.188100000059.thai.notification
Configuration managed false
Consumer count 1
Consumers before dispatch 0
Dead letter address DLQ
Delay before dispatch -1
Delivering count 0
Delivering size 0
Durable true
Durable delivering count 0
Durable delivering size 0
Durable message count 0
Durable persistent size 0
Durable scheduled count 0
Durable scheduled size 0
Enabled true
Exclusive false
Expiry address ExpiryQueue
Filter
First message age
First message as json [{}]
First message timestamp
Group buckets -1
Group count 0
Group first key
Group rebalance false
Group rebalance pause dispatch false
Id 866
Last value false
Last value key
Max consumers -1
Message count 0
Messages ackNowledged 0
Messages added 0
Messages expired 0
Messages killed 0
Name QUEUE.service2.tech-drive2.188100000059.thai.notification
Object Name org.apache.activemq.artemis:broker="activemq-artemis-master-1",queue="QUEUE.service2.tech-drive2.188100000059.thai.notification"
Paused false
Persistent size 0
Prepared transaction message count 0
Purge on no consumers false
Retroactive resource false
Ring size -1
Routing type ANYCAST
Scheduled count 0
Scheduled size 0
Temporary false
User 6e25e08b-9587-40a3-b7e9-146360539258
解决方法
我使用 redistribution-delay
of 0
的非通配符队列将您的简化配置仅包含 2 个节点,并且我重现了您在本地机器上看到的行为(即没有 Kubernetes )。我相信我明白了为什么这种行为是这样的,但是为了理解当前的行为,您首先必须首先了解再分配是如何工作的。
在集群中,每次创建消费者时,创建消费者的节点都会通知集群中的所有其他节点有关消费者的信息。如果集群中的其他节点在其对应的队列中有消息但没有任何消费者,那么这些其他节点会将它们的消息重新分发到具有消费者的节点(假设 message-load-balancing
是 {{ 1}} 和 ON_DEMAND
是 >= redistribution-delay
)。
然而,在您的情况下,当消费者在另一个节点上创建时,带有消息的节点实际上关闭,因此它实际上从未收到有关消费者的通知。因此,一旦该节点重新启动,它就不会知道其他消费者,也不会重新分发其消息。
我看到您已打开 ARTEMIS-3321 以增强代理以处理这种情况。但是,这需要时间来开发和发布(假设更改已获批准)。同时,我对您的建议是配置您在 the documentation 中讨论的客户端重新连接,例如:
0
给定 tcp://127.0.0.1:61616?reconnectAttempts=30
毫秒的默认 retryInterval
,这将使客户端最初连接的代理有 1 分钟的时间恢复,然后客户端放弃尝试重新连接并抛出异常点应用程序可以完全重新初始化它的连接,就像它现在所做的那样。
由于您使用的是 Spring Boot,因此请务必使用 2.5.0 版,因为它包含 this change,这将允许您指定代理 URL,而不仅仅是主机和端口。
最后,请记住,优雅地关闭节点将使客户端的重新连接短路并触发您的应用程序重新初始化连接,这不是我们在这里想要的。确保不优雅地终止节点(例如使用 2000
)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。