文章目录
Springboot 版本: 2.7.0
超出队列限制后会发生什么?
- 丢弃旧消息:如果没有配置关联死信队列,则丢弃最老的消息。
- 将旧消息路由到死信队列:如果配置有关联的死信队列,则将最老的消息路由到死信队列。
- 拒绝新消息入队:如果默认行为不满足需求,可以通过参数 overflow 进行修改。
怎么设置队列长度?
有两种方式:
- 服务端通过policy设置
- 客户端在队列声明时使用队列的可选参数进行配置
如果服务端和客户端都做了设置,那么以二者中的小值为准。
服务端通过policy设置
命令行配置
配置命令:
rabbitmqctl set_policy my-pol "^myQueue$" '{"max-length":5, "max-length-bytes":1048576, "overflow":"reject-publish"}' --apply-to queues --vhost my_vhost
- name: my-pol
- pattern: ^myQueue$
- deFinition:
- max-length: 5; 最多包含5个消息
- max-length-bytes:1048576; 最多包含1MiB的消息数据
- overflow:reject-publish; 超出限制后直接拒绝新的消息入队
配置结果:
管理页面配置
填写内容和命令行是一样的,其结果也是一样的。
客户端申明队列时配置
@Bean
public Queue queue() {
// 常规队列与死信交换机的绑定关系
Map<String, Object> queueParams = new HashMap<>(2);
//设置队列长度为5
queueParams.put("x-max-length", 5);
queueParams.put("max-length-bytes", 1048576);
return new Queue(QUEUE_NAME, true, false, false, queueParams);
}
代码实践
只限制消息长度(丢弃旧消息)
配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: my_vhost
# 消息确认(ACK)
publisher-confirm-type: correlated #correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
listener:
type: simple
simple:
default-requeue-rejected: false
ackNowledge-mode: MANUAL
配置类
定义交换机、队列以及他们之间的绑定关系,并开启生产者消息确认。
@Slf4j
@Configuration
public class RabbitConfiguration {
public final static String TOPIC_EXCHANGE = "myExchange";
public final static String QUEUE_NAME = "myQueue";
@Bean
public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonConverter());
template.setExchange(TOPIC_EXCHANGE);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息:{}发送成功", correlationData.getId());
} else {
log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
}
});
template.setMandatory(true);
template.setReturnsCallback(returned -> {
log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
});
return template;
}
// 申明一个常规的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
// 申明一个常规使用的队列
@Bean
public Queue queue() {
// 常规队列与死信交换机的绑定关系
Map<String, Object> queueParams = new HashMap<>(2);
//设置队列长度为5
queueParams.put("x-max-length", 5);
queueParams.put("x-max-length-bytes", 1048576);
return new Queue(QUEUE_NAME, true, false, false, queueParams);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
}
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
}
生产者
连续发布10个消息。
@Component
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
for (int i = 0; i < 10; i++) {
User user = new User("kleven", 18, i+1);
rabbitTemplate.convertAndSend("my.test.message", user, new CorrelationData(user.getId().toString()));
}
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {
private static final long serialVersionUID = -5079682733940745661L;
private String name;
private Integer age;
private Integer id;
}
测试类
@RunWith(springrunner.class)
@SpringBoottest(classes = {App.class})
public class QueueLengthTest {
@Autowired
private Publisher publisher;
@Test
public void testSend(){
publisher.send();
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printstacktrace();
}
}
}
测试结果
10个消息都发布成功,但是队列中只有后5个消息,前5个消息被丢弃。
限制消息长度,并配置死信队列(将旧消息路由到死信队列)
配置类
@Slf4j
@Configuration
public class RabbitConfiguration {
public final static String TOPIC_EXCHANGE = "myExchange";
public final static String QUEUE_NAME = "myQueue";
public final static String DEAD_EXCHANGE = "myDeadExchange";
public final static String DEAD_QUEUE = "myDeadQueue";
@Bean
public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonConverter());
template.setExchange(TOPIC_EXCHANGE);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息:{}发送成功", correlationData.getId());
} else {
log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
}
});
template.setMandatory(true);
template.setReturnsCallback(returned -> {
log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
});
return template;
}
// 申明一个常规的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
// 申明一个常规使用的队列
@Bean
public Queue queue() {
// 常规队列与死信交换机的绑定关系
Map<String, Object> queueParams = new HashMap<>(4);
queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
queueParams.put("x-dead-letter-routing-key","my.dead.letter");
//设置队列长度为5
queueParams.put("x-max-length", 5);
queueParams.put("x-max-length-bytes", 1048576);
return new Queue(QUEUE_NAME, true, false, false, queueParams);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
}
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
// 申明一个死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE, true, false);
}
// 申明一个死信队列
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE);
}
// 绑定死信交换机和死信队列
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("my.dead.letter");
}
}
测试结果
10个消息均发布成功,前5个旧的消息进入死信队列,后5个消息在常规业务队列。
限制消息长度,并配置 overflow (拒绝新消息入队)
修改配置类增加overflow配置,增加一个消费者,其他保持不变。
配置类
@Bean
public Queue queue() {
// 常规队列与死信交换机的绑定关系
Map<String, Object> queueParams = new HashMap<>(5);
queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
queueParams.put("x-dead-letter-routing-key","my.dead.letter");
//设置队列长度为5
queueParams.put("x-max-length", 5);
queueParams.put("x-max-length-bytes", 1048576);
queueParams.put("x-overflow", "reject-publish");
return new Queue(QUEUE_NAME, true, false, false, queueParams);
}
消费者
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = "myQueue", messageConverter = "jsonConverter")
public void normalConsumer(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
// 假设消费者消费一条消息需要2s
Thread.sleep(2_1000);
log.info("正常消费者消费 -> {}", user);
channel.basicAck(deliveryTag, false);
}
}
测试结果
前6个消息发送成功;后4个消息因为被拒绝所以发送失败。
为什么限制的长度是5却有6个消息发送成功呢?
原因是队列长度(及所占字节数)限制只针对Ready状态的消息,有上图可知,因为我们这次加了一个消费者,其正在消费一个消息但还没有确认,所以有一个消息的状态是Unacked。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。