微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring rabbitmq 消息排序不再起作用

如何解决Spring rabbitmq 消息排序不再起作用

我正在处理消息排序问题,不久前修复了它,现在修复不再起作用。

只是为了概述,我有以下环境:

enter image description here

订单在 tcpAdapter 和消息接收者之间的某个地方丢失了。

我使用以下方法修复了这个问题:

  1. 在生产者方面 - 使用发布者确认和返回
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
  1. 在消费者方面 - 强制执行单线程执行器: 我在这里找到的想法:RabbitMQ - Message order of delivery,为此我使用了后处理器。
@Component
public class RabbitConnectionFactoryPostProcessor implements BeanPostProcessor {
  @Override
  public Object postProcessAfterInitialization(Object bean,String beanName) throws BeansException {
    if (bean instanceof CachingConnectionFactory) {
      ((CachingConnectionFactory) bean).setExecutor(Executors.newSingleThreadExecutor());
    }
    return bean;
  }
}

现在,在一些 master-pom 更新之后(我们不控制 master pom,它在项目级别),修复突然不再起作用了。检查差异后,我没有看到spring-rabbit或spring-amqp有任何变化,我不明白为什么会有影响。


如果您需要具体示例,这里有更多详细信息:

  1. 制作人。

TCP Server 向 tcpAdapter 应用程序发送消息,该应用程序使用 spring-integration 流从 TCP 获取消息并将其发送到 rabbitmq。

这是执行此操作的代码(inboundAdapterClient 我没有在此处发布,因为我认为这不重要):

  @Bean
  public IntegrationFlow tcpToRabbitFlowClient() {
    return IntegrationFlows.from(inboundAdapterClient())      
        .transform(tcpToRabbitTransformer)     
        .channel(TCP_ADAPTER_SOURCE);
        .get();
  }

tcpAdapter 应用程序以正确的顺序从 TCP 接收消息,但是 tcpAdapter rabbitmq 堆栈不会每次都以正确的顺序发送它们(80% 的时间正常,20% 的顺序错误

这是 spring boot yml 配置(仅相关信息):

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
  cloud:
    stream:
      bindings:
        tcpAdapterSource:
          binder: rabbit
          content-type: application/json
          destination: tcpadapter.messagereceiver
  1. 消费者。

消息接收者强制执行单线程执行器,并配置如下。

这里是spring boot yml配置(仅相关信息)

spring:
  cloud:     
    stream:
      bindings:
        fromTcpAdapter:
          binder: rabbit
          content-type: application/json
          destination: tcpadapter.messagereceiver
      rabbit:
        default:
          producer:
            exchangeDurable: false
            exchangeAutoDelete: true
          consumer:
            exchangeDurable: false
            exchangeAutoDelete: true

注意:只有一个生产者和一个消费者。

来自 pom 的一些版本,也许有帮助:

      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot</artifactId>
        <version>2.2.4.RELEASE</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-amqp</artifactId>
        <version>2.2.3.RELEASE</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.2.3.RELEASE</version>
      </dependency>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>3.0.1.RELEASE</version>
      </dependency>

解决方法

通过删除 yml 配置并使用显式 bean 声明和工厂配置解决,如下所述。唯一的问题是性能缓慢,但发布商确认这是预期的。

所以这确实是生产者的问题。

  @Bean
  public CachingConnectionFactory connectionFactory() {
    com.rabbitmq.client.ConnectionFactory connectionFactoryClient = new com.rabbitmq.client.ConnectionFactory();
    connectionFactoryClient.setUsername(username);
    connectionFactoryClient.setPassword(password);
    connectionFactoryClient.setHost(hostname);
    connectionFactoryClient.setVirtualHost(vhost);
    return new CachingConnectionFactory(connectionFactoryClient);
  }

  @Bean("rabbitTemplateAdapter")
  @Primary
  public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
    connectionFactory.setPublisherConfirmType(CORRELATED);
    connectionFactory.setPublisherReturns(true);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback((correlationData,ack,cause)
            -> log.debug("correlationData({}),ack({}),cause ({})",correlationData,cause));
    rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)
            -> log.debug("exchange({}),route({}),replyCode({}),replyText({}),message:{}",routingKey,message));
    return rabbitTemplate;
  }

用于发送消息:

rabbitTemplateAdapter.invoke(t -> {
      t.convertAndSend(
              exchange,DESTINATION,jsonMessage.getPayload(),m -> {outboundMapper().fromHeadersToRequest(jsonMessage.getHeaders(),m.getMessageProperties());
                return m;
              });
      t.waitForConfirmsOrDie(10_000);
      return true;
    });

我使用 spring rabbit 和 amqp 版本做到了这一点:

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.2.3.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-amqp</artifactId>
  <version>2.2.3.RELEASE</version>
</dependency>

Spring amqp 文档帮助很大,使用的技术称为“范围操作”: https://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#scoped-operations

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。