如何解决Cats Effect:用于非阻塞 IO 的线程池?
来自本教程 https://github.com/slouc/concurrency-in-scala-with-ce#threading 异步操作分为 3 组,需要显着不同的线程池来运行:
非阻塞异步操作:
具有非常低的线程数量(甚至可能只有一个),具有非常高的优先级的有界池。这些线程基本上大部分时间都处于空闲状态,并不断轮询是否有新的异步 IO 通知。这些线程用于处理请求的时间直接映射到应用程序延迟,因此除了接收通知并将它们转发到应用程序的其余部分之外,在此池中没有其他工作完成非常重要。 线程数量非常少(甚至可能只有一个)的有界池,具有非常高的优先级。这些线程基本上大部分时间都处于空闲状态,并不断轮询是否有新的异步 IO 通知。这些线程处理请求所花费的时间直接映射到应用程序延迟中,因此除了接收通知并将它们转发给应用程序的其余部分之外,在此池中没有其他工作完成非常重要。
阻塞异步操作:
无界缓存池。无界,因为阻塞操作可以(并且将会)阻塞一个线程一段时间,并且我们希望在此期间能够为其他 I/O 请求提供服务。缓存是因为我们可能会因创建过多线程而耗尽内存,因此重用现有线程很重要。
占用大量 cpu 的操作:
固定池,其中线程数等于 cpu 内核数。这很简单。回到过去,“黄金法则”是线程数 = cpu 内核数 + 1,但“+1”来自这样一个事实,即总是为 I/O 保留一个额外线程(如上所述,我们现在有单独的池)。
在我的 Cats Effect 应用程序中,我使用基于 Scala Future 的 ReactiveMongo 库来访问 MongoDB,它在与 MongoDB 交谈时不会阻塞线程,例如执行非阻塞 IO。
它需要执行上下文。
Cats 效果提供默认执行上下文 IOApp.executionContext
我的问题是:我应该为非阻塞 io 使用哪个执行上下文?
IOApp.executionContext
?
但是,来自 IOApp.executionContext
文档:
为应用提供默认的 ExecutionContext。
JVM 顶部的默认设置是基于可用 cpu 的可用数量(请参阅 PoolUtils)懒惰地构建为固定线程池。
似乎这个执行上下文属于我上面列出的第三组 - cpu-heavy operations (Fixed pool in which number of threads equals the number of cpu cores.)
,
这让我觉得 IOApp.executionContext 不是非阻塞 IO 的好选择。
我是对的吗,我应该为非阻塞 IO 创建一个带有固定线程池(1 个或 2 个线程)的单独上下文(因此它将属于我上面列出的第一组 - Non-blocking asynchronous operations: Bounded pool with a very low number of threads (maybe even just one),with a very high priority.
)?
或者 IOApp.executionContext
是为 cpu 绑定和非阻塞 IO 操作而设计的?
我用来将 Scala Future 转换为 F 的函数并且除了执行上下文:
def scalaFuturetoF[F[_]: Async,A](
future: => Future[A]
)(implicit ec: ExecutionContext): F[A] =
Async[F].async { cb =>
future.onComplete {
case Success(value) => cb(Right(value))
case Failure(exception) => cb(Left(exception))
}
}
解决方法
在 Cats Effect 3 中,每个 IOApp
都有一个 Runtime
:
final class IORuntime private[effect] (
val compute: ExecutionContext,private[effect] val blocking: ExecutionContext,val scheduler: Scheduler,val shutdown: () => Unit,val config: IORuntimeConfig,private[effect] val fiberErrorCbs: FiberErrorHashtable = new FiberErrorHashtable(16)
)
除了测试或教育示例外,您几乎总是希望保留默认值,而不是随意声明自己的运行时。
在您的 IOApp
中,您可以通过以下方式访问 compute
池:
runtime.compute
如果要执行阻塞操作,则可以使用 blocking
构造:
blocking(IO(println("foo!"))) >> IO.unit
通过这种方式,您告诉 CE3 运行时此操作可能会阻塞,因此应将其分派到专用池。见here。
CE2 怎么样?嗯,它有类似的机制,但它们非常笨重,也包含不少惊喜。例如,阻塞调用是使用 Blocker
调度的,然后必须以某种方式凭空召唤或贯穿整个应用程序,线程池定义是使用笨拙的 ContextShift
完成的。如果您在这件事上有任何选择,我强烈建议您在 migrating to CE3 上投入一些精力。
很好,但是反应式 Mongo 呢?
ReactiveMongo 使用 Netty(基于 Java NIO API)。并且 Netty 有自己的线程池。这在 Netty 5 中有所改变(参见 here),但 ReactiveMongo 似乎仍在 Netty 4 上(参见 here)。
但是,您所询问的 ExecutionContext
是将执行回调的线程池。这可以是您的计算池。
让我们看一些代码。首先,你的翻译方法。我刚刚将 async
更改为 async_
,因为我使用的是 CE3,并且我添加了线程打印行:
def scalaFutureToF[F[_]: Async,A](future: => Future[A])(implicit ec: ExecutionContext): F[A] =
Async[F].async_ { cb =>
future.onComplete {
case Success(value) => {
println(s"Inside Callback: [${Thread.currentThread.getName}]")
cb(Right(value))
}
case Failure(exception) => cb(Left(exception))
}
}
现在让我们假设我们有两个执行上下文 - 一个来自我们的 IOApp
,另一个将代表 ReactiveMongo 用于运行 Future
的任何内容。这是虚构的 ReactiveMongo 之一:
val reactiveMongoContext: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
另一个就是runtime.compute
。
现在让我们像这样定义 Future
:
def myFuture: Future[Unit] = Future {
println(s"Inside Future: [${Thread.currentThread.getName}]")
}(reactiveMongoContext)
注意我们如何通过将 Future
传递给它来假装这个 reactiveMongoContext
在 ReactiveMongo 中运行。
最后,让我们运行应用程序:
override def run: IO[Unit] = {
val myContext: ExecutionContext = runtime.compute
scalaFutureToF(myFuture)(implicitly[Async[IO]],myContext)
}
输出如下:
未来内部:[pool-1-thread-1]
内部回调:[io-compute-6]
我们提供给 scalaFutureToF
的执行上下文只是运行回调。 Future 本身在我们的单独的线程池上运行,它代表 ReactiveMongo 的池。实际上,您无法控制这个池,因为它来自 ReactiveMongo。
额外信息
顺便说一下,如果您不使用类型类层次结构 (F
),而是直接使用 IO
值,那么您可以使用这种简化方法:
def scalaFutureToIo[A](future: => Future[A]): IO[A] =
IO.fromFuture(IO(future))
看看它是如何甚至不需要您传递 ExecutionContext
- 它只是使用您的计算池。或者更具体地说,它使用任何定义为 def executionContext: F[ExecutionContext]
的 Async[IO]
,结果证明它是计算池。让我们检查一下:
override def run: IO[Unit] = {
IO.executionContext.map(ec => println(ec == runtime.compute))
}
// prints true
最后但并非最不重要的:
如果我们真的有办法指定 ReactiveMongo 的底层 Netty 应该使用哪个线程池,那么是的,在这种情况下,我们绝对应该使用单独的线程池。我们永远不应该将我们的 runtime.compute
池提供给其他运行时。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。