如何解决如何使用 Java 将错误消息移动到 Azure 死信队列主题 - 订阅?
我需要将我的消息从 azure 主题订阅发送到死信队列,以防在读取和处理来自主题的消息时出现任何错误。所以我尝试测试将消息直接推送到 DLQ。
我的示例代码如下
static void sendMessage()
{
// create a Service Bus Sender client for the queue
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
.topicName(topicName)
.buildClient();
// send one message to the topic
senderClient.sendMessage(new ServiceBusMessage("Hello,World!"));
}
static void resceiveAsync() {
ServiceBusReceiverAsyncclient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.topicName(topicName)
.subscriptionName(subName)
.buildAsyncclient();
// receive() operation continuously fetches messages until the subscription is disposed.
// The stream is infinite,and completes when the subscription or receiver is closed.
disposable subscription = receiver.receiveMessages().subscribe(message -> {
System.out.printf("Id: %s%n",message.getMessageId());
System.out.printf("Contents: %s%n",message.getBody().toString());
},error -> {
System.err.println("Error occurred while receiving messages: " + error);
},() -> {
System.out.println("Finished receiving messages.");
});
// Continue application processing. When you are finished receiving messages,dispose of the subscription.
subscription.dispose();
// When you are done using the receiver,dispose of it.
receiver.close();
}
我尝试获取死信队列路径
String dlq = EntityNameHelper.formatDeadLetterPath(topicName);
我得到了死信队列的路径,例如 = "mytopic/$deadletterqueue"
但是在将路径作为主题名称传递时它不起作用。它抛出一个实体主题未找到异常。
有没有人可以给我建议
参考: How to move error message to Azure dead letter queue using Java?
How to push the failure messages to Azure service bus Dead Letter Queue in Spring Boot Java?
解决方法
您可能知道,如果在处理过程中抛出异常,并且超过最大传递计数,则消息将自动移至死信队列。如果您想显式地将消息移动到 DLQ,您也可以这样做。一个常见的情况是,如果您知道消息因其内容而永远不会成功。
您不能直接向 DLQ 发送新消息,因为那样的话系统中就会有两条消息。您需要对父实体调用特殊操作。此外,<topic path>/$deadletterqueue
不起作用,因为这将是所有订阅的 DLQ。正确的实体路径是这样构建的:
<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue
此示例代码适用于队列,但您应该能够很容易地将其调整到主题:
// register the RegisterMessageHandler callback
receiver.registerMessageHandler(
new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {
// ...
} else {
return receiver.deadLetterAsync(message.getLockToken());
}
return receiver.completeAsync(message.getLockToken());
}
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable,ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},// 1 concurrent call,messages are auto-completed,auto-renew duration
new MessageHandlerOptions(1,false,Duration.ofMinutes(1)),executorService);
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。