如何解决如何通过带有 Spring 集成的 RabbitMQ 接收 protobuf 消息?
我尝试使用 Spring Integration 从 RabbitMQ 接收 protobuf 消息。
我的集成流程:
public class FacadeIntegrationFlowAdapter extends IntegrationFlowAdapter {
@SuppressWarnings("rawtypes")
private final Facade facade;
private final FacadeProperties facadeProperties;
@SuppressWarnings("unchecked")
@Override
protected IntegrationFlowDeFinition<?> buildFlow() {
return from(facadeProperties.getQueueName())
.handle(facade::getNewMessages);
}
}
getNewMessages
方法:
@Override
public ExchangeResponse getNewMessages(Message<ExchangeRequest> message) {
ExchangeRequest request = message.getPayload();
log.info("Receiving new message: " + request.toString());
这就是我将消息发送到队列的方式。使测试易于遵循是如此简单。
ExchangeRequest request = ExchangeRequest.newBuilder()
.addAllAuthors(List.of("author1","author2"))
.addAllBooks(List.of("book1","book2"))
.build();
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("user");
connectionFactory.setPassword("password");
connectionFactory.setHost("localhost");
connectionFactory.setPort(24130);
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
var basicProperties = new AMQP.BasicProperties().builder()
.contentType("application/x-protobuf")
.type(request.getDescriptorForType().getFullName())
.build();
channel.basicpublish(
"facade-exchange","facade-routing-key",basicProperties,request.toByteArray());
} catch (IOException e) {
不幸的是,我不断收到异常:
com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.
但是,当我将 getNewMessages
方法更改为以下内容时,一切似乎都很好。
@Override
public ExchangeResponse getNewMessages(Message message) {
try {
Any payload = (Any) message.getPayload();
ByteString value = payload.getValue();
ExchangeRequest request = ExchangeRequest.parseFrom(value);
log.info("Receiving new message: " + request.toString());
我哪里出错了?发送!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。