如何解决如何向使用 Azure 服务总线 Java SDK 的发布者/订阅者客户端添加重试选项重试策略?
我正在测试 Azure 服务总线发布者/订阅者客户端的重试选项,因为在突然连接失败后,客户端将不会重试发送或接收消息。
以下是发布者客户端 sendMessage() 方法的代码,我已将订阅的最大传送计数设置为 1000。客户端仍然使用默认的 retryPolicy 值,但我看不到它像 amqpRetryOptions 中给出的那样重试。
static void sendMessage() {
// create Retry Options for the Service Bus client
AmqpRetryOptions amqpRetryOptions = new AmqpRetryOptions();
amqpRetryOptions.setDelay(Duration.ofSeconds(1));
amqpRetryOptions.setMaxRetries(120);
amqpRetryOptions.setMaxDelay(Duration.ofMinutes(5));
amqpRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
amqpRetryOptions.setTryTimeout(Duration.ofSeconds(5));
// create a Service Bus Sender client for the queue
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.retryOptions(amqpRetryOptions)
.sender()
.topicName(topicName)
.buildClient();
// send one message to the topic
senderClient.sendMessage(new ServiceBusMessage("Hello,World! "));
System.out.println("Sent a single message to the topic");
}
我的方法错了吗?
- 如果是,标准的方式是什么?
- 如果不是,如何处理重试机制?
如果不行怎么办
解决方法
我能够使用 ServiceBusSenderAsyncClient
获得重试机制。此外,我可以捕获异常以检查原因是否是暂时的。
static void sendMessage() {
// create Retry Options for the Service Bus client
AmqpRetryOptions amqpRetryOptions = new AmqpRetryOptions();
amqpRetryOptions.setDelay(Duration.ofSeconds(1));
amqpRetryOptions.setMaxRetries(5);
amqpRetryOptions.setMaxDelay(Duration.ofSeconds(15));
amqpRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
amqpRetryOptions.setTryTimeout(Duration.ofSeconds(5));
// instantiate a client that will be used to call the service
ServiceBusSenderAsyncClient serviceBusSenderAsyncClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.retryOptions(amqpRetryOptions)
.sender()
.topicName(topicName)
.buildAsyncClient();
// create a message
ServiceBusMessage serviceBusMessage = new ServiceBusMessage("Hello,World!\n")
// send the message to the topic
serviceBusSenderAsyncClient.sendMessage(serviceBusMessage).subscribe(
unused -> System.out.println("Message sent successfully"),error -> {
ServiceBusException serviceBusException = (ServiceBusException) error;
System.out.println(serviceBusException.isTransient());
},() -> {
System.out.println("Message sent successfully");
}
);
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。