如何解决Akka和背压
我看到了Akka Streams的背压概念,但是我的项目没有使用Akka Stream,我为Akka开发了另一种背压指示器概念,我想问一下它是否可行...
我计划使用邮箱队列的深度作为用作背压的标准....背后的逻辑,如果Akka能够跟上我生成消息的速度,则消息队列的深度应该浅一些。如果我发送太多消息而Akka无法跟上,那么消息队列中就会有越来越多的消息,因此我必须降低产生消息的速度。
这与Apache Cassandra的“飞行请求”中的想法基本相同...
Session.State state = session.getState();
for (Host host : state.getConnectedHosts()) {
Hostdistance distance = loadBalancingPolicy.distance(host);
int connections = state.getopenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
....
}
所以我从Akka使用的Akka reference.conf中找到了。
default-mailBox {
mailBox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailBox"
}
class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue with
UnboundedMessageQueueSemantics {
final def enqueue(receiver: ActorRef,handle: Envelope): Unit = add(handle)
final def dequeue(): Envelope = poll()
因此,我编写了一个AspectJ Aspect来拦截enqueue()和dequeue(),并递增和递减全局AtomicLong,可用来跟踪Akka Actor邮箱中等待消息的总数... >
所以我的逻辑是,如果Akka可以跟上我发送的邮件数量,则该数量应低于一些预配置的值...
因此,假设我有10万个参与者,并且他们在邮箱消息队列中有1000条消息,一切正常,但是,如果我看到消息队列中有1 000 000条消息正在等待,则这是降低消息生成速度的信号。 ...如果队列中有1000万条消息,则确定的信号将停止消息的产生...
我构建了一个原型,并且可以按预期运行。...但是在继续之前,我想在这里问一下,您是否发现该概念确实存在缺陷?
寻求答案...
解决方法
这可能可行,但是:
- 每次发送邮件时,单点争用(甚至是无锁的竞争)很可能会对性能产生重大影响
- 您希望放慢速度的特定阈值将在很大程度上取决于应用程序,并且可能会随着您更改应用程序而改变
- 取决于系统的详细信息(以及“停止产生消息”的确切含义),存在死锁的可能性(即,如果您需要不丢弃传入消息并且处理消息需要发送另一条消息(在Akka中,至少发送一条消息作为处理消息的一部分可能是最常见的事情),这意味着该参与者将停止从其邮箱中进行处理)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。