如何解决如何在客户端断开连接的情况下流式传输 fs2.Queue 而不会丢失项目
我需要将 fs2.Queue
- 一些事件流式传输到单个 http 客户端。
import cats.effect._
import org.http4s._
import org.http4s.dsl.io._
import org.http4s.server.Router
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.syntax.kleisli._
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
object StreamingHttp4sFs2Server2 extends App {
implicit val timer: Timer[IO] = IO.timer(global)
implicit val cs: ContextShift[IO] = IO.contextShift(global)
(for {
queue <- fs2.concurrent.Queue.unbounded[IO,Long]
_ <- IO {
fs2.Stream.awakeEvery[IO](1.seconds).map(_.toSeconds)
.through(queue.enqueue)
.compile.drain
.unsafeRunAsyncAndForget()
}
_ <- BlazeServerBuilder[IO](ExecutionContext.global)
.bindHttp(8080,"localhost")
.withHttpApp(Router("/" -> HttpRoutes.of[IO] {
case GET -> Root => Ok(queue.dequeue.map(_.toString + "\n"))
}).orNotFound)
.serve
.compile
.drain
} yield ()).unsafeRunAsyncAndForget()
}
有时客户端断开连接,一些事件丢失并显示错误 Error writing body
。
您可以使用 curl 重现问题:
curl --no-buffer -X GET http://localhost:8080/
1
2
3
4
5
^C //hit ctrl+c,curl --no-buffer -X GET http://localhost:8080/
8
9
10
如何在客户端断开连接时不丢失事件的情况下实现某些东西?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。