如何解决Java 中延迟最低的 Rabbit MQ 扇出设置
我正在使用 Java 中的 RabbitMQ 尝试查看在将消息发布到扇出交换并拥有几个消费者时我可以预期的延迟,所有消费者都在同一台机器上运行。
我测量延迟的方法是在调用 channel.basicPublish()
之前简单地在消息中编码一个时间戳然后当我在另一端收到 msg 时,我使用另一个时间戳并计算 2 之间的差异。现在我看到延迟在 2 到 4 毫秒范围内。
我想配置我的交换和队列以提供最低的可靠性保证,并希望最大限度地减少过程中的延迟(无持久性、无确认、无保证交付等)。我很难找到所有这些设置。特别是在没有确认或持久性的情况下。
有人可以帮助进行设置以尽可能减少延迟吗?总的来说,有人会知道在同一台机器上与消费者和生产者打交道时我可以实现的最佳延迟吗?能达到 100us 以下吗?
这是我目前的代码:
import com.rabbitmq.client.*;
import java.time.Instant;
class SPMCPublisher2 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
long count = 0;
while (true) {
final Instant now = Instant.now();
final long now_ts = now.toEpochMilli() * 1000 + now.getNano()/1000;
final String message = "msg #" + (++count) + "," + now_ts;
channel.basicPublish(EXCHANGE_NAME,"",new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.build(),message.getBytes("UTF-8"));
System.out.println(" --> '" + message + "'");
Thread.sleep(2000);
}
}
}
}
class SPMCConsumer11 {
private static final String EXCHANGE_NAME = "logs";
private static final String QUEUE_NAME = "my_queue";
private static final boolean AUTO_ACK = true;
private static final boolean NOT_DURABLE = false;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//on-durable,exclusive,autodelete queue with a generated name
channel.queueDeclareNoWait(QUEUE_NAME,NOT_DURABLE,true,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
final String message = new String(delivery.getBody(),"UTF-8");
final Instant now = Instant.now();
final long now_ts = now.toEpochMilli() * 1000 + now.getNano()/1000;
final String[] parts = message.split(",");
final long latency_us = now_ts - Long.parseLong(parts[1]);
System.out.println(" <-- '" + message + "',latency: " + latency_us + " us");
};
channel.basicConsume(QUEUE_NAME,AUTO_ACK,deliverCallback,consumerTag -> { });
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。