微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何通过带有 Spring 集成的 RabbitMQ 接收 protobuf 消息?

如何解决如何通过带有 Spring 集成的 RabbitMQ 接收 protobuf 消息?

我尝试使用 Spring Integration 从 RabbitMQ 接收 protobuf 消息。

我的集成流程:

public class FacadeIntegrationFlowAdapter extends IntegrationFlowAdapter {

  @SuppressWarnings("rawtypes")
  private final Facade facade;
  private final FacadeProperties facadeProperties;

  @SuppressWarnings("unchecked")
  @Override
  protected IntegrationFlowDeFinition<?> buildFlow() {
    return from(facadeProperties.getQueueName())
        .handle(facade::getNewMessages);
  }
}

getNewMessages 方法

@Override
public ExchangeResponse getNewMessages(Message<ExchangeRequest> message) {
    ExchangeRequest request = message.getPayload();
    log.info("Receiving new message: " + request.toString());

这就是我将消息发送到队列的方式。使测试易于遵循是如此简单。

  ExchangeRequest request = ExchangeRequest.newBuilder()
      .addAllAuthors(List.of("author1","author2"))
      .addAllBooks(List.of("book1","book2"))
      .build();

  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setUsername("user");
  connectionFactory.setPassword("password");
  connectionFactory.setHost("localhost");
  connectionFactory.setPort(24130);

  try {
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    var basicProperties = new AMQP.BasicProperties().builder()
        .contentType("application/x-protobuf")
        .type(request.getDescriptorForType().getFullName())
        .build();

    channel.basicpublish(
        "facade-exchange","facade-routing-key",basicProperties,request.toByteArray());
  } catch (IOException e) {

不幸的是,我不断收到异常:

com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.

但是,当我将 getNewMessages 方法更改为以下内容时,一切似乎都很好。

  @Override
  public ExchangeResponse getNewMessages(Message message) {
    try {
      Any payload = (Any) message.getPayload();
      ByteString value = payload.getValue();
      ExchangeRequest request = ExchangeRequest.parseFrom(value);
      log.info("Receiving new message: " + request.toString());

我哪里出错了?发送!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。