微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在确保每个实体FIFO的同时并行处理消息?

如何解决如何在确保每个实体FIFO的同时并行处理消息?

| 假设您的系统中有一个实体,例如“ Person”,并且您想处理修改各种Person实体的事件。重要的是: 同一个人的事件以FIFO顺序处理 多个Person事件流由不同的线程/进程并行处理 我们有一个使用共享数据库和锁来解决此问题的实现。线程竞争为一个获取锁,然后在获取锁后按顺序处理事件。我们希望移至消息队列以避免轮询和锁定,我们认为这将减少数据库负载并简化使用者代码的实现。 我已经对ActiveMQ,RabbitMQ和hornetq进行了一些研究,但是我没有找到一种明显的实现方法。 ActiveMQ支持使用者订阅通配符,但是我看不到一种将每个队列的并发限制为1的方法。如果可以,那么解决方案将很简单: 不知何故告诉代理,以/ queue / person开头的所有队列允许并发1。 发布者使用队列名称中的“人员ID”将事件写入队列。例如:/queue/person.20 消费者使用通配订阅队列:/ queue / person。 每个消费者将收到针对不同人员队列的消息。如果所有人队列都在使用中,则某些消费者可能会闲置,没关系 处理完一条消息后,使用者将发送一个ACK,告知代理已完成该消息,并允许将该Person队列的另一条消息发送给另一使用者(可能是同一条) ActiveMQ即将来临:您可以进行通配订阅并启用“专用使用者”,但是这种组合导致单个使用者接收发送到所有匹配队列的所有消息,从而将所有人员的并发性降低为1。我觉得我缺少明显的东西。 问题: 是否可以通过任何主要的消息队列实现来实现上述方法?我们对选择持开放态度。唯一的要求是它必须在Linux上运行。 有没有其他方法可以解决我未考虑的一般问题? 谢谢!     

解决方法

看起来JMSXGroupID是我正在寻找的东西。从ActiveMQ文档: http://activemq.apache.org/message-groups.html 他们使用股票价格的示例用例正是我所追求的。我唯一关心的是如果单个消费者死亡,会发生什么。希望经纪人能够检测到该情况,并选择另一个与该组ID关联的消费者。     ,解决此问题(如果我没问题的话)的一种通用方法是为Person引入一些唯一属性(例如,Person的数据库级ID),然后将该属性的哈希值用作FIFO队列的索引以将该Person放入。 由于该属性的哈希值可能非常庞大(您负担不起2 ^ 32个队列/线程),因此仅使用N个哈希值的最低有效位。 每个FIFO队列都应该有专门的工作人员来处理它-瞧,您的要求得到满足! 这种方法有一个缺点-您的人员必须具有分布均匀的ID,才能使所有队列的负载大致相等。如果不能保证,请考虑使用循环队列集并跟踪正在处理的人员,以确保对同一人员进行顺序处理。     ,如果您已经有一个允许共享锁的系统,为什么不为每个队列都具有一个锁,使用者在从队列中读取之前必须获得这些锁?     

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。