如何解决按ID将RabbitMQ消息分组以同步处理它们
我正在开发具有两个应用程序的解决方案:仅一个发布者和仅onde消费者。该客户需要处理以下消息:
2020-09-30T19:22:41 MSG1) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:22:42 MSG2) {id:'user2.345',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG3) {id:'user3.877',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG4) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG5) {id:'user1.123',msg: '{ANYJSON}'}
每条消息需要处理5-10秒(数据库查询,4个HTTP请求,数据库插入e等),我为每个消息创建一个池(https://threads.js.org/usage-pool),因为它需要重复命令(数据库查询,4个http请求,数据库插入e等)供许多用户使用。这样每个POOL / MSG可以同时运行。
这些IDS由发布者动态生成,并且每次都会更改。基本上,我需要按ID对消息进行“分组”,并且仅当没有消息正在同时处理此ID时才处理消息。例如:
-
MSG 1
(2020-09-30T19:22:41)应该立即处理 -
MSG 2
(2020-09-30T19:22:42)应该立即处理 -
MSG 3
(2020-09-30T19:22:43)应该立即处理 -
MSG 4
(2020-09-30T19:22:43)应该等待MSG 1
被处理 -
MSG 5
(2020-09-30T19:22:43)应该等待MSG 4
被处理
假设消费者1分钟后收到:
2020-09-30T19:23:41 MSG6) {id:'user1.123',msg: '{ANYJSON}'}
2020-09-30T19:23:42 MSG7) {id:'user2.345',msg: '{ANYJSON}'}
-
MSG 6
(2020-09-30T19:23:41)应该立即处理,因为MSG5
已被处理。 -
MSG 7
(2020-09-30T19:23:41)应该立即处理,因为MSG2
已被处理。
在366行,我知道一个池何时完成,因此是时候向另一个MSG询问相同的ID。
那么,如何实现这个按ID“分组”的队列?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。