微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Cats Effect:用于非阻塞 IO 的线程池?

如何解决Cats Effect:用于非阻塞 IO 的线程池?

来自本教程 https://github.com/slouc/concurrency-in-scala-with-ce#threading 异步操作分为 3 组,需要显着不同的线程池来运行:

非阻塞异步操作:

具有非常低的线程数量(甚至可能只有一个),具有非常高的优先级的有界池。这些线程基本上大部分时间都处于空闲状态,并不断轮询是否有新的异步 IO 通知。这些线程用于处理请求的时间直接映射到应用程序延迟,因此除了接收通知并将它们转发到应用程序的其余部分之外,在此池中没有其他工作完成非常重要。 线程数量非常少(甚至可能只有一个)的有界池,具有非常高的优先级。这些线程基本上大部分时间都处于空闲状态,并不断轮询是否有新的异步 IO 通知。这些线程处理请求所花费的时间直接映射到应用程序延迟中,因此除了接收通知并将它们转发给应用程序的其余部分之外,在此池中没有其他工作完成非常重要。

阻塞异步操作:

无界缓存池。无界,因为阻塞操作可以(并且将会)阻塞一个线程一段时间,并且我们希望在此期间能够为其他 I/O 请求提供服务。缓存是因为我们可能会因创建过多线程而耗尽内存,因此重用现有线程很重要。

占用大量 cpu 的操作:

固定池,其中线程数等于 cpu 内核数。这很简单。回到过去,“黄金法则”是线程数 = cpu 内核数 + 1,但“+1”来自这样一个事实,即总是为 I/O 保留一个额外线程(如上所述,我们现在有单独的池)。

在我的 Cats Effect 应用程序中,我使用基于 Scala Future 的 ReactiveMongo 库来访问 MongoDB,它在与 MongoDB 交谈时不会阻塞线程,例如执行非阻塞 IO。

它需要执行上下文。 Cats 效果提供认执行上下文 IOApp.executionContext

我的问题是:我应该为非阻塞 io 使用哪个执行上下文?

IOApp.executionContext

但是,来自 IOApp.executionContext 文档:

为应用提供认的 ExecutionContext。

JVM 顶部的认设置是基于可用 cpu 的可用数量(请参阅 PoolUtils)懒惰地构建为固定线程池。

似乎这个执行上下文属于我上面列出的第三组 - cpu-heavy operations (Fixed pool in which number of threads equals the number of cpu cores.), 这让我觉得 IOApp.executionContext 不是非阻塞 IO 的好选择。

我是对的吗,我应该为非阻塞 IO 创建一个带有固定线程池(1 个或 2 个线程)的单独上下文(因此它将属于我上面列出的第一组 - Non-blocking asynchronous operations: Bounded pool with a very low number of threads (maybe even just one),with a very high priority.)?

或者 IOApp.executionContext 是为 cpu 绑定和非阻塞 IO 操作而设计的?

我用来将 Scala Future 转换为 F 的函数并且除了执行上下文:

def scalaFuturetoF[F[_]: Async,A](
      future: => Future[A]
  )(implicit ec: ExecutionContext): F[A] =
    Async[F].async { cb =>
      future.onComplete {
        case Success(value)     => cb(Right(value))
        case Failure(exception) => cb(Left(exception))
      }
    }

解决方法

在 Cats Effect 3 中,每个 IOApp 都有一个 Runtime

final class IORuntime private[effect] (
  val compute: ExecutionContext,private[effect] val blocking: ExecutionContext,val scheduler: Scheduler,val shutdown: () => Unit,val config: IORuntimeConfig,private[effect] val fiberErrorCbs: FiberErrorHashtable = new FiberErrorHashtable(16)
)

除了测试或教育示例外,您几乎总是希望保留默认值,而不是随意声明自己的运行时。

在您的 IOApp 中,您可以通过以下方式访问 compute 池:

runtime.compute

如果要执行阻塞操作,则可以使用 blocking 构造:

blocking(IO(println("foo!"))) >> IO.unit

通过这种方式,您告诉 CE3 运行时此操作可能会阻塞,因此应将其分派到专用池。见here

CE2 怎么样?嗯,它有类似的机制,但它们非常笨重,也包含不少惊喜。例如,阻塞调用是使用 Blocker 调度的,然后必须以某种方式凭空召唤或贯穿整个应用程序,线程池定义是使用笨拙的 ContextShift 完成的。如果您在这件事上有任何选择,我强烈建议您在 migrating to CE3 上投入一些精力。

很好,但是反应式 Mongo 呢?

ReactiveMongo 使用 Netty(基于 Java NIO API)。并且 Netty 有自己的线程池。这在 Netty 5 中有所改变(参见 here),但 ReactiveMongo 似乎仍在 Netty 4 上(参见 here)。

但是,您所询问的 ExecutionContext 是将执行回调的线程池。这可以是您的计算池。

让我们看一些代码。首先,你的翻译方法。我刚刚将 async 更改为 async_,因为我使用的是 CE3,并且我添加了线程打印行:

def scalaFutureToF[F[_]: Async,A](future: => Future[A])(implicit ec: ExecutionContext): F[A] =
  Async[F].async_ { cb =>
    future.onComplete {
      case Success(value)     => {
        println(s"Inside Callback: [${Thread.currentThread.getName}]")
        cb(Right(value))
      }
      case Failure(exception) => cb(Left(exception))
    }
  }

现在让我们假设我们有两个执行上下文 - 一个来自我们的 IOApp,另一个将代表 ReactiveMongo 用于运行 Future 的任何内容。这是虚构的 ReactiveMongo 之一:

val reactiveMongoContext: ExecutionContext =
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

另一个就是runtime.compute

现在让我们像这样定义 Future

def myFuture: Future[Unit] = Future {
  println(s"Inside Future: [${Thread.currentThread.getName}]")
}(reactiveMongoContext)

注意我们如何通过将 Future 传递给它来假装这个 reactiveMongoContext 在 ReactiveMongo 中运行。

最后,让我们运行应用程序:

override def run: IO[Unit] = {
  val myContext: ExecutionContext = runtime.compute
  scalaFutureToF(myFuture)(implicitly[Async[IO]],myContext)
}

输出如下:

未来内部:[pool-1-thread-1]
内部回调:[io-compute-6]

我们提供给 scalaFutureToF 的执行上下文只是运行回调。 Future 本身在我们的单独的线程池上运行,它代表 ReactiveMongo 的池。实际上,您无法控制这个池,因为它来自 ReactiveMongo。

额外信息

顺便说一下,如果您不使用类型类层次结构 (F),而是直接使用 IO 值,那么您可以使用这种简化方法:

def scalaFutureToIo[A](future: => Future[A]): IO[A] =
  IO.fromFuture(IO(future))

看看它是如何甚至不需要您传递 ExecutionContext - 它只是使用您的计算池。或者更具体地说,它使用任何定义为 def executionContext: F[ExecutionContext]Async[IO],结果证明它是计算池。让我们检查一下:

override def run: IO[Unit] = {
  IO.executionContext.map(ec => println(ec == runtime.compute))
}
// prints true

最后但并非最不重要的:

如果我们真的有办法指定 ReactiveMongo 的底层 Netty 应该使用哪个线程池,那么是的,在这种情况下,我们绝对应该使用单独的线程池。我们永远不应该将我们的 runtime.compute 池提供给其他运行时。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。