微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何为每个用户获取 Akka Http Websocket 连接的终止开关?

如何解决如何为每个用户获取 Akka Http Websocket 连接的终止开关?

我是 Akka 和 Scala 的新手,我自学了用 websockets 做一个小项目。最终目标很简单,做一个基本的聊天服务器,在某个网页上发布+订阅消息。

事实上,在仔细阅读他们的文档后,我已经找到了与我的目标相关的页面,即 thisthis

使用动态连接(又名 MergeHub 和 broadcastHub)和 Flow.fromSinkAndSource() 方法,我能够实现我想要的一个非常基本的示例。我们甚至可以使用我在下面显示的 akka 文档中的示例来获得终止开关。代码如下:

private lazy val connHub: Flow[Message,Message,UniqueKillSwitch] = {
    val (sink,source) = MergeHub.source[Message].toMat(broadcastHub.sink[Message])(Keep.both).run()
    Flow.fromSinkAndSourceCoupled(sink,source).joinMat(KillSwitches.singleBidi[Message,Message])(Keep.right)
  }

但是,我现在看到了一个问题。以上将返回一个将由 Akka 的 websocket 指令使用的 Flow:akka.http.scaladsl.server.Directives.handleWebSocketMessages(/* FLOW GOES HERE */) 这意味着只要我为它提供处理程序,akka 代码本身就会为我实现这个流程。

但是假设我想通过 KillSwitch 任意终止一个用户的连接(可能是因为他们的会话在我的应用程序上已过期)。虽然用户的 websocket 将通过上述处理程序添加,但由于我的代码不会明确实现该流程,因此我无法访问 KillSwitch。因此,我无法终止连接,只有用户离开网页时才能终止。

令我感到奇怪的是,文档会提到终止开关方法而没有显示我将如何使用 websocket api 获得一个方法

任何人都可以建议我如何获得每个连接的终止开关的解决方案吗?我对这应该如何工作有根本的误解吗?

提前致谢。

解决方法

我很高兴地说,经过大量时间、研究和编码,我对这个问题有了答案。为了做到这一点,我不得不在 Akka Gitter 和 Lightbend 论坛上发帖。请参阅我得到的惊人答案 there 以了解对问题的一些看法和一些解决方案。我会在这里总结一下。

为了从我使用的代码中获取 UniqueKillSwitch,我需要在返回的 Flow 上使用 mapMaterializeValue() 方法。这是我现在用来将 Flow 返回到 handleWebSocketMessages 指令的代码:

// note - state will not be updated properly if cancellation events come through from the client side as user->killswitch mapping may still remain in concurrent map even if the connections are closed
    Flow.fromSinkAndSourceCoupled(mergeHubSink,broadcastHubSource)
      .joinMat(KillSwitches.singleBidi[Message,Message])(Keep.right)
      .mapMaterializedValue { killSwitch =>
        connections.put(user,killSwitch) // add kill switch in side effect once value is ready from materialization
        NotUsed.notUsed()
      }

以上代码位于我创建的聊天室类中,该类可以访问合并中心和广播中心具体化的接收器和源。它还可以访问并发哈希映射,该哈希映射将终止开关持久化给用户。通过这种方式,我们现在可以通过地图查询来访问 Kill Switch。从那里,您可以调用 switch.shutdown() 从服务器端终止用户的连接。

我的主要问题是我最初认为即使我没有控制实现,我也可以直接获得开关。这似乎不可能。当您知道需要您的 Flow 的调用者不关心物化值(又名终止开关)时,我建议使用此方法。

请参考我链接的答案,了解更多场景和处理此问题的方法。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。