如何解决ActiveMQ Artemis:从 smallrye-reactive-messaging (AMQP) 以编程方式创建队列
我的 Quarkus 微服务使用 smallrye 反应式消息传递库中的 AMQP 连接器向从 vromero/activemq-artemis:2.16.0-alpine
Docker 映像运行的 ActiveMQ Artemis 代理生成消息。 reactive messaging library 文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:
@Inject
@Channel("task-finished")
lateinit var taskFinishedEmitter: MutinyEmitter<String>
@POST
@Produces(MediaType.TEXT_PLAIN)
fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {
// leaving out the actual messageText computation...
val messageText: String = "DUMMY MESSAGE"
val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withCorrelationId(customerId)
.withAddress("anycast://my-custom-address")
.build()
val message: Message<String> = Message.of(messageText,{
logger.info("message acked")
CompletableFuture.completedFuture(null)
},{
logger.info("message nacked: {}",it.message)
CompletableFuture.completedFuture(null)
}
)
taskFinishedEmitter.send(message.addMetadata(metadata))
return Uni.createFrom().item("DONE")
}
连接器在 application.properties
中定义:
amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***
mp.messaging.outgoing.task-finished.connector=smallrye-amqp
ActiveMQ Artemis 确实动态创建了 my-custom-address
地址,但它不会创建任何绑定到它的队列,并且消息最终被取消路由。
broker.xml
部分包含 core
配置文件
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
<auto-create-queues>true</auto-create-queues>
</address-settings>
我尝试将队列名称与地址一起传递
.withAddress("anycast://my-custom-address::my-queue")
但它没有任何区别。
以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时会确认该消息?
解决方法
默认情况下,ActiveMQ Artemis 会将 AMQP 客户端发送的消息视为多播(即发布/订阅)。多播的语义规定发布到代理的每条消息都将分派给每个订阅者。然而,由于没有订阅者,消息被简单地丢弃(即取消路由)。
由于您使用 anycast://
作为地址前缀,因此您应该使用 acceptor
参数在 broker.xml
中的“amqp”anycastPrefix
上配置该前缀,例如:>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
您还可以更改用于自动创建资源的默认路由类型,例如:
<address-setting match="my-custom-address">
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-settings>
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。