微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何向使用 Azure 服务总线 Java SDK 的发布者/订阅者客户端添加重试选项重试策略?

如何解决如何向使用 Azure 服务总线 Java SDK 的发布者/订阅者客户端添加重试选项重试策略?

我正在测试 Azure 服务总线发布者/订阅者客户端的重试选项,因为在突然连接失败后,客户端将不会重试发送或接收消息。

以下是发布者客户端 sendMessage() 方法代码,我已将订阅的最大传送计数设置为 1000。客户端仍然使用认的 retryPolicy 值,但我看不到它像 amqpRetryOptions 中给出的那样重试。

static void sendMessage() {
        // create Retry Options for the Service Bus client
        AmqpRetryOptions amqpRetryOptions = new AmqpRetryOptions();
        amqpRetryOptions.setDelay(Duration.ofSeconds(1));
        amqpRetryOptions.setMaxRetries(120);
        amqpRetryOptions.setMaxDelay(Duration.ofMinutes(5));
        amqpRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
        amqpRetryOptions.setTryTimeout(Duration.ofSeconds(5));

        // create a Service Bus Sender client for the queue
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .retryOptions(amqpRetryOptions)
                .sender()
                .topicName(topicName)
                .buildClient();

        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello,World! "));
        System.out.println("Sent a single message to the topic");
    }

我的方法错了吗?

  • 如果是,标准的方式是什么?
  • 如果不是,如何处理重试机制?

如果不行怎么办

解决方法

我能够使用 ServiceBusSenderAsyncClient 获得重试机制。此外,我可以捕获异常以检查原因是否是暂时的。

static void sendMessage() {
    // create Retry Options for the Service Bus client
    AmqpRetryOptions amqpRetryOptions = new AmqpRetryOptions();
    amqpRetryOptions.setDelay(Duration.ofSeconds(1));
    amqpRetryOptions.setMaxRetries(5);
    amqpRetryOptions.setMaxDelay(Duration.ofSeconds(15));
    amqpRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
    amqpRetryOptions.setTryTimeout(Duration.ofSeconds(5));

   // instantiate a client that will be used to call the service
   ServiceBusSenderAsyncClient serviceBusSenderAsyncClient = new ServiceBusClientBuilder()
       .connectionString(connectionString)
       .retryOptions(amqpRetryOptions)
       .sender()
       .topicName(topicName)
       .buildAsyncClient();

    // create a message
    ServiceBusMessage serviceBusMessage = new ServiceBusMessage("Hello,World!\n")

    // send the message to the topic
    serviceBusSenderAsyncClient.sendMessage(serviceBusMessage).subscribe(
        unused -> System.out.println("Message sent successfully"),error -> {
            ServiceBusException serviceBusException = (ServiceBusException) error;
            System.out.println(serviceBusException.isTransient());
        },() -> {
            System.out.println("Message sent successfully");
        }
    );
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?