RabbitMQ 的 Dapr 绑定无法使用 Dapr 发布/订阅示例

如何解决RabbitMQ 的 Dapr 绑定无法使用 Dapr 发布/订阅示例

我已经使用了 Dapr pub/sub How-to sample 并尝试更新它以使用 RabbitMQ

我已经从 dockerhub 下载了 Docker rabbitmq:3 镜像,它应该在 amqp://localhost:5672 上监听。

我为 RabbitMQ 创建了一个名为 rabbitmq.yaml 的新组件文件,并将其放置在 .dapr/components 目录中。我的 RabbitMQ 组件配置是:

apiVersion: dapr.io/v1alpha1
kind: Component
Metadata:
  name: my-rabbitmq
spec:
  type: pubsub.rabbitmq
version: v1
Metadata:
  - name: host
    value: amqp://localhost:5672
  - name: durable
    value: true # Optional. Default: "false"
  - name: deletedWhenUnused
   value: false # Optional. Default: "false"
  - name: ttlInSeconds
    value: 60
  - name: prefetchCount
    value: 0

我的订阅在同一 .dapr/components 目录下的 subscription-rabbitmq.yaml 中定义。它看起来如下:

apiVersion: dapr.io/v1alpha1
kind: Subscription
Metadata:
  name: rabbitmq-subscription
spec:
  topic: deathStarStatus
  route: /dsstatus
  pubsubname: my-rabbitmq
scopes:
- app2

当我使用 Redis Streams 组件和订阅运行示例时,节点应用程序会收到消息并显示在 dapr run 命令的输出中。

dapr publish --topic deathStarStatus --pubsub pubsub --data '{ "message": "This is a test" }'

但是当我向 RabbitMQ 发布消息时,它指出“事件已成功发布”,但节点应用程序未收到该消息。

dapr publish --topic deathStarStatus --pubsub my-rabbitmq --data '{ "message": "This is a test" }'

这是节点应用程序和运行它的命令:

dapr run --app-id app2 --app-port 3000 node app2.js

const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

// app.get('/dapr/subscribe',(req,res) => {
//     res.json([
//         {
//             pubsubname: "my-rabbitmq",//             topic: "deathStarStatus",//             route: "dsstatus"        
//         }
//     ]);
// })

app.post('/dsstatus',res) => {
    console.log(req.body);
    res.sendStatus(200);
});

app.listen(port,() => console.log(`consumer app listening on port ${port}!`))

我做错了什么?

解决方法

事实证明,RabbitMQ(Dapr 世界)中的“主题”实际上是一个 Exchange 而不是一个队列。

使用 RabbitMQ 订阅运行“app2”时,会创建一个带有前置 appid(例如 {appid}-{queueName})的队列,但未创建 Exchange。不确定这是设计使然还是我的特定配置。

我最终创建了一个名为“deathStarStatus”的 Exchange,并将该 Exchange 映射到我的名为“app2-deathStarStatus”的队列,一切正常。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?