正确的设计在akka-讯息传递

如何解决正确的设计在akka-讯息传递

我将尝试为您解答其中一些问题。我不会为所有问题提供具体的答案,但希望我能指导您正确的方向。

对于初学者,您将需要改变将请求传达给进行图书搜索的3个参与者的方式。ScatterGatherFirstCompletedRouter在这里使用a 可能不是正确的方法。该路由器将仅等待其中一个路由的响应(第一个响应),因此您的结果集将不完整,因为它将不包含其他2条路由的结果。还有一个BroadcastRouter,但由于它只能处理tell (!)而不能满足您的需求ask (?)。做你想做的事,一种选择是将请求发送到每个RECEIPIENT,得到Futures的答复,然后将它们组合成一个聚合Future使用Future.sequence。一个简化的示例可能如下所示:

case class SearchBooks(title:String)
case class Book(id:Long, title:String)

class BookSearcher extends Actor{

  def receive = {
    case req:SearchBooks =>
      val routees:List[ActorRef] = ...//Lookup routees here
      implicit val timeout = Timeout(10 seconds)
      implicit val ec = context.system.dispatcher

      val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
      val fut = Future.sequence(futures)

      val caller = sender //Important to not close over sender
      fut onComplete{
        case Success(books) => caller ! books.flatten

        case Failure(ex) => caller ! Status.Failure(ex)
      }
  }
}

现在,这不是我们的最终代码,而是您的样本尝试执行的操作的近似值。在此示例中,如果任何一条下游路由发生故障/超时,我们将陷入困境,Failure呼叫者也将失败。如果它们全部成功,则调用者将获得汇总的Book对象列表。

现在进入您的问题。首先,您询问是否在超时时间内没有从其中一个路由得到答复,是否应该再次向所有参与者发送请求。这个问题的答案实际上取决于您。您是要让另一端的用户看到部分结果(即3个参与者中2个参与者的结果),还是总是每次都必须是全部结果?如果答案是肯定的,则可以调整发送到路由的代码,如下所示:

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
  case ex =>
    //probably log something here
    List()
})

使用此代码,如果任何路由超时或由于任何原因而失败,则将以“ Book”的空白列表代替响应而不是失败。现在,如果您无法获得部分结果,则可以再次发送整个请求,但是您必须记住,另一端可能有某人在等待他们的书结果,而他们不想永远等待。

对于第二个问题,您问超时是否过早怎么办?您选择的超时值将完全取决于您,但是最有可能应该基于两个因素。第一个因素将来自测试搜索的通话时间。为了安全起见,请平均找出需要多长时间,并根据该值选择一个稍有缓冲的值。第二个因素是另一端的某人愿意等待结果的时间。您可以在超时方面非常保守,将其设置为60秒只是为了安全起见,但是如果另一端确实有人在等待结果,他们愿意等待多长时间?我宁愿收到失败响应,指出我应该重试而不是永远等待。因此,考虑到这两个因素,

对于问题3,您询问如果删除该消息会发生什么。在这种情况下,我猜想接收该消息的人的未来只会超时,因为它不会得到响应,因为接收方参与者永远不会收到要响应的消息。Akka不是JMS;它没有确认模式,在该模式下,如果收件人没有收到并确认消息,则可以多次重发消息。

另外,从您的示例中可以看到,我同意不Future使用来阻止聚合Await。我更喜欢使用非阻塞回调。阻塞接收功能并不理想,因为该Actor实例将停止处理其邮箱,直到该阻塞操作完成。通过使用非阻塞回调,您可以释放该实例以返回到处理其邮箱的权限,并允许结果的处理只是在中执行的另一项工作ExecutionContext,与处理其邮箱的参与者分离。

现在,如果您真的想在网络不可靠时不浪费通信,可以考虑使用Akka 2.2中的“可靠代理”。如果您不想走这条路线,则可以通过ping定期向路线发送类型消息来自行滚动。如果没有及时响应,则将其标记为已关闭并且不发送消息,直到获得可靠的信息(在很短的时间内)ping由此看来,每个路由就像一个FSM。如果您绝对需要这种行为,那么这两种方法都可以使用,但是您需要记住,这些解决方案会增加复杂性,并且仅在您绝对需要这种行为时才应采用。如果您正在开发银行软件,并且您绝对需要保证交付的语义,因为否则将导致严重的财务隐患,因此请务必采用这种方法。请谨慎判断您是否需要这样的东西,因为我敢打赌90%的时间都不需要。在您的模型中,可能只有等待另一端的呼叫者是等待您可能已经知道的事情不会成功的人。通过在actor中使用非阻塞回调,它不会因为某些事情可能需要很长时间而停止。它’ 的已移至下一条消息。如果您决定在失败时重新提交,则也需要小心。您不想淹没接收方的参与者邮箱。如果您决定重新发送,则将其设置为固定次数。

如果需要这些保证的语义,另一种可能的方法是研究Akka的Clustering Model。如果将下游路由群集在一起,并且其中一台服务器出现故障,则所有流量都将被路由到仍处于运行状态的节点,直到该其他节点恢复为止。

解决方法

我已经浏览了一些有关如何以及为何akka无法保证消息传递的帖子。该文档,这个讨论和小组其他讨论做解释它做好。

我对akka来说还很陌生,希望了解适合表壳的设计。例如说我有3个不同的角色,都在不同的机器上。一个负责食谱,另一个负责历史,最后一个负责技术书籍。

我在另一台机器上有个主要演员。假设对主角有一个查询,以搜索是否有可用的书。主参与者将请求发送到3个远程参与者,并期望结果。所以我这样做:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees,within = 10 seconds)),"router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult,timeout.duration) line(a)

简而言之,我已经向所有3个远程参与者发送了请求,并期望在10秒内得到结果。

应该采取什么行动?

  1. 假设我在10秒内未收到结果,是否应该再次向所有人发送新请求?
  2. 如果within上述时间过早了怎么办。但是我不知道可能要花多少时间。
  3. 如果within时间足够但消息丢失了怎么办。

如果我没有及时得到答复,within然后重新发送请求。像这样的东西,它仍然是异步的:

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

但是在查询太多的情况下,调用时是否会有太多线程并且笨重?如果我取消注释line(a),它将变为同步,并且在负载下可能表现不佳。

假设我在10秒钟内没有收到回复。如果within时间还为时过早,那么它将再次发生大量无用的计算。如果消息丢失了,那么10将浪费数秒的宝贵时间。如果说我知道消息已经传递,我可能会等待更长的时间而不会
怀疑。

人们如何解决此类问题?ACK?但是然后我必须将状态存储在所有查询的参与者中。这一定是平常的事情,我正在寻找正确的设计。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res