如何从事件驱动架构中错过的集成或通知事件中恢复?

如何解决如何从事件驱动架构中错过的集成或通知事件中恢复?

情况如下。共有三种服务,一种服务是事件源,并使用事件总线(如 Azure 服务总线或 ActiveMQ)向其他两种服务(订阅者)发布集成或通知事件(发件箱模式)。

Pub sub outbox pattern on event sourced microservice

此设计灵感来自.NET microservices - Architecture e-book - Subscribing to events

我想知道如果这些事件之一由于错误而无法传递或者事件处理只是没有正确实现会发生什么。

  • 如果出现应用程序错误,我应该信任我的消息总线吗?
    • 这是死信队列的用例吗?
  • 在重新发布事件时,是否应该将所有消息重新发布到所有主题,还是只能重新发布一个子集?
    • 服务重新发布事件是否应该能够访问发布者和订阅者数据库以了解消息偏移量?
    • 或者订阅微服务是否应该能够读取发件箱?

解决方法

只是为了补充@xander 的出色回答,我相信您可能对您的事件总线使用了不合适的技术。您应该会发现 Azure Event HubsApache Kafka 更适合事件发布/订阅架构。与旧的服务总线方法相比,专用事件总线技术的优势包括:

  • 每条事件消息只有一个副本(而 Azure 服务总线或 RabbitMQ 为每个订阅者制作每条消息的深层副本)
  • 任何订阅者消费后都不会删除消息。相反,消息会在指定的时间段内保留在主题上(在 Kafka 的情况下可以是不确定的)。
  • 每个订阅者(消费者组)将能够跟踪其提交的偏移量。这允许订阅者在丢失消息时重新连接和回滚,独立于发布者和其他订阅者(即隔离)。
  • 新消费者可以在消息发布后订阅,并且仍然可以接收所有可用消息(即倒回可用事件的开始)

考虑到这一点:

如果出现应用程序错误,我应该信任我的消息总线吗?

是的,因为 xander 提供的原因。一旦发布者确认事件总线已接受该事件,发布者的工作就完成了,不应再次发送相同的事件。

Nitpicky,但由于您处于发布订阅架构(即 0..N 个订阅者)中,无论使用何种技术,您都应该将总线称为事件总线(而不是消息总线)。

这是死信队列的用例吗?

死信队列通常是点对点队列或服务总线交付架构的产物,即其中有一条命令消息(事务性地)用于单个或可能有限数量的接收者。在发布-订阅事件总线拓扑中,期望发布者监控所有订阅者的交付对发布者来说是不公平的。

相反,订阅者应承担弹性交付的责任。在 Azure 事件中心和 Apache Kafka 等技术中,每个消费者组的事件都是唯一编号的,因此订阅者可以通过监控消息偏移量来收到丢失消息的警报。

在重新发布事件时,是否应该将所有消息重新发布到所有主题,还是可以只重新发布一个子集?

不,事件发布者不应重新发布事件,因为这会破坏所有观察者订阅者的事件链。请记住,每个发布的事件可能有 N 个订阅者,其中一些可能在您的组织之外/在您的控制之外。事件应被视为在某个时间点发生的“事实”。事件发布者不应该关心事件的订阅者是 0 还是 100。由每个订阅者决定如何解释事件消息。

例如不同类型的订阅者可以对事件执行以下任何操作:

  • 只需记录事件以进行分析
  • 将事件转换为命令(或 Actor 模型消息)并作为特定于订阅者的事务处理
  • 将事件传递到规则引擎以推理更广泛的事件流,例如如果特定客户执行异常大量的交易,则触发反欺诈措施

因此您可以看到,为了一个 flakey 订阅者的利益重新发布事件会破坏其他订阅者的数据流。

服务重新发布事件是否应该能够访问发布者和订阅者数据库以了解消息偏移量?

正如 xander 所说,系统和微服务不应该共享数据库。但是,系统可以公开 API(RESTful、gRPC 等)

事件总线本身应该跟踪哪个订阅者读取了哪个偏移量(即每个消费者组、每个主题和每个分区)。每个订户将能够监视和更改其偏移量,例如以防事件丢失并需要重新处理。 (同样,一旦确认事件已被总线接收到,生产者不应重新发布事件)

或者订阅微服务是否应该能够读取发件箱?

事件驱动的企业架构至少有两种常用方法:

  • “最少信息”事件,例如Customer Y has purchased Product Z。在这种情况下,许多订阅者会发现事件中包含的信息不足以完成下游工作流,并且需要丰富事件数据,通常是通过调用靠近发布者的 API 来检索其余数据他们需要。这种方法具有安全优势(因为 API 可以验证对更多数据的请求),但会导致 API 的 I/O 负载较高。
  • “深度图”事件,其中每条事件消息都包含任何订阅者希望需要的所有信息(这在未来证明非常困难!)。尽管事件消息的大小会变大,但它确实节省了大量触发 I/O,因为订阅者不需要从生产者那里执行进一步的充实。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res