微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

按ID将RabbitMQ消息分组以同步处理它们

如何解决按ID将RabbitMQ消息分组以同步处理它们

我正在开发具有两个应用程序的解决方案:仅一个发布者和仅onde消费者。该客户需要处理以下消息:

2020-09-30T19:22:41 MSG1) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:22:42 MSG2) {id:'user2.345',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG3) {id:'user3.877',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG4) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG5) {id:'user1.123',msg: '{ANYJSON}'}

每条消息需要处理5-10秒(数据库查询,4个HTTP请求,数据库插入e等),我为每个消息创建一个池(https://threads.js.org/usage-pool),因为它需要重复命令(数据库查询,4个http请求,数据库插入e等)供许多用户使用。这样每个POOL / MSG可以同时运行。

这些IDS由发布者动态生成,并且每次都会更改。基本上,我需要按ID对消息进行“分组”,并且仅当没有消息正在同时处理此ID时才处理消息。例如:

  • MSG 1(2020-09-30T19:22:41)应该立即处理
  • MSG 2(2020-09-30T19:22:42)应该立即处理
  • MSG 3(2020-09-30T19:22:43)应该立即处理
  • MSG 4(2020-09-30T19:22:43)应该等待MSG 1被处理
  • MSG 5(2020-09-30T19:22:43)应该等待MSG 4被处理

假设消费者1分钟后收到:

2020-09-30T19:23:41 MSG6) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:23:42 MSG7) {id:'user2.345',msg: '{ANYJSON}'}
  • MSG 6(2020-09-30T19:23:41)应该立即处理,因为MSG5已被处理。
  • MSG 7(2020-09-30T19:23:41)应该立即处理,因为MSG2已被处理。

enter image description here

在366行,我知道一个池何时完成,因此是时候向另一个MSG询问相同的ID。

那么,如何实现这个按ID“分组”的队列?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。