如何解决akka 如何更新可变状态?
我阅读了 Akka 文档,但我不明白:
class MyActor extends Actor {
private var _state = 0
override def receive: Receive = {
case x: Int =>
if (x != _state) {
println(s"---------> fail: ${x} and ${_state}")
}
_state = x + 1
}
}
implicit val system = ActorSystem("my-system")
val ref = system.actorOf(Props[MyActor],"my-actor")
(0 to 10000).foreach { x =>
ref ! x
}
我有一个 _state
变量,它不是 @volatile
也不是 atomic
但同时 _state
总是正确的,如果我用 {{1} }-方法。
Akka 如何保护和更新 Actor 的内部状态?
解决方法
Akka 是 Actor 计算模型的实现。参与者模型做出的(可以说是the)关键保证之一是参与者一次只处理一条消息。仅仅凭借 _state
对参与者是私有的,您就可以获得至少与对象的每个方法都为 @synchronized
一样强的并发保证,另外还有 {{ 1}} 操作以非阻塞方式发送消息。
在幕后,粗略(在一些地方进行了简化,但粗略的笔触是准确的)概述了它的工作原理以及如何执行保证:
-
使用
!
Props
构造ActorSystem
的实例,将该实例的唯一 JVM 引用放置在MyActor
中(有人告诉我术语,以及 Akka 的深层内部结构是“地牢”,其灵感来自 Akka 的早期开发团队,该团队位于瑞典乌普萨拉以前是监狱的办公室),以及 {{1} } 与ActorCell
。与此同时(从技术上讲,这是在ActorCell
已经返回my-actor
之后发生的),它构造一个system.actorOf
以允许用户代码引用演员。 -
在
ActorRef
内部,调用ActorRef
方法并将结果ActorCell
(其类型同义词为receive
)保存在对应于演员行为的PartialFunction[Any,Unit]
。 -
Receive
上的ActorCell
操作(至少对于本地!
)确定哪个调度员负责该 actor 并将消息传递给调度员。调度程序然后将消息放入对应于ActorRef
的ActorRef
的邮箱中(这是以线程安全的方式完成的)。 -
如果当前没有调度任务来处理来自actor邮箱的消息,这样的任务会被排入调度程序的执行上下文以从
ActorCell
的邮箱中出列一些(可配置的)消息并在循环中一次一个地处理它们。在那个循环之后,如果有更多的消息要处理,另一个这样的任务将被排队。处理消息包括将其传递给存储在my-actor
的行为字段中的ActorCell
(此机制允许使用Receive
模式更改行为)。
最后一点提供了保证只有一个线程调用 ActorCell
逻辑的核心。
这是 Akka Actors 的经典模型。如果您只是在学习演员,那么您应该使用 Typed Actors,因为这是未来支持的模型。
对于类型化的actor,actor系统保存每个actor的状态,而不是actor本身。当参与者需要处理消息时,参与者系统会将当前状态传递给参与者。当actor 处理完消息后,actor 会将新状态返回给actor 系统。
类型化模型避免了所有同步问题,因为它不使用任何外部状态,它只使用传递给它的状态。并且它不会修改任何外部状态,它只是返回一个修改后的状态值。
如果您必须使用 Classic actor,那么您可以使用 context.become
而不是 var
来实现相同的模型。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。