如何解决ktor websocket flow api如何工作?
我正在使用ktor通过websocket进行服务器端开发。
文档向我们展示了使用传入渠道的示例:
for (frame in incoming.mapNotNull { it as? Frame.Text }) {
// some
}
但是mapNotNull
被标记为不赞成Flow
。我应该如何使用该API,可能会有什么问题?例如,Flow
是冷流。这意味着将在每个collect
上调用生产函数。它如何在websocket的上下文中工作。它会在第二次collect
通话时重新打开,还是可能在下一个collect
之后发送一次旧邮件?如何收集N
邮件,然后停止收集,然后再次收集?
预先感谢:)
解决方法
我应该如何使用此API?可能有什么问题?
我正在使用的以及在文档中某个示例中看到的是在consumeAsFlow()
上调用的ReceiveChannel
方法。这是整个代码段:
webSocket("/websocket") { //this: DefaultWebSocketServerSession
incoming
.consumeAsFlow()
.map { receive(it) }
.collect()
}
这种方法还没有发现重大问题。您应该意识到的一件事(但对于非流方法也是如此)是,如果您将自己扔进流中,那么它将破坏WebSocket连接,这通常不是您想要执行的操作。可能值得考虑将整个内容包装在try-catch
中。
会在第二次催收呼叫中重新打开它,还是可能在下一次催收后将旧邮件发送一次?
您甚至在开始使用流中的消息之前就打开了websocket。您可以看到在webSocket() {}
内部,您处于DefaultWebSocketServerSession
的上下文中。这是您的连接管理。在流内部,您只是在消息到达时(建立连接之后)一个接一个地接收消息。如果连接断开,那么您就没钱了。必须先重新建立它,然后才能处理您的消息。该建立位是通过Route.webSocket()
方法完成的。我建议您看看它的Javadoc。
如果您希望在关闭连接后进行一些清理,则可以添加一个finally
块,如下所示:
webSocket("/chat") {
try {
incoming
.consumeAsFlow()
.map { receive(it,client) }
.collect()
} finally {
// cleanup
}
}
简而言之:collect
每条收到的消息被调用一次。如果没有连接(或连接断开),则不会调用collect
。
如何收集N条消息,然后停止收集,然后再次收集?
这是什么用例?我认为您不应该随心所欲地这样做。您当然可以从流中take(n)
个项目,但是您将无法再从中获取更多内容。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。