微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

具有并发限制的并行Aff执行?

如何解决具有并发限制的并行Aff执行?

就Aff而言,并发进程限制实现并行执行的方法是什么?我相信 std libs 中没有方法,也没有找到一个很好的完整答案。

parSequenceWithLmit :: Array (Aff X) -> Int -> Aff (Array X)

Aff X 计算应该并行进行,但不能超过给定的 N 个并发计算。所以它开始 N cals,当一个完成时,下一个(左边的)开始。

解决方法

对于这种事情,一个好的机制是AVar,它是一个阻塞可变单元格。在概念上可以将其视为单元素阻塞队列。

首先AVar 可以是空的也可以是满的。您可以使用 empty 创建一个空的,然后您可以使用 put 用一个值“填充”它。这里有用的一点是,当您调用 put 并且 AVar 已经“满”时,put阻塞直到它再次为空。

第二,您可以使用 take 读取值,这将返回值,但同时将 AVar 留空。与 put 类似,如果 AVar 为空,take 将阻塞直到它已满。

那么你可以用它做什么如下:

  1. 创建单个 AVar
  2. 分叉 N 个进程,每个进程将 take 来自该 AVar 的值并对其进行处理,然后循环。永远。
  3. 有一个协调器进程,它将遍历整个工作序列并将 put 工作项迭代到 AVar 中。

当所有工作进程都忙时,orchestrator进程会再往AVar中推送另一个值,然后会尝试推送下一个,但是此时会被阻塞,因为AVar是已经满了。它将保持阻塞状态,直到其中一个工作进程完成其工作并调用 take 以获取下一个工作项,而将 AVar 留空。这将解除对协调器进程的阻塞,从而立即将下一个工作项推送到 AVar,依此类推。

这里缺少的一点是如何停止。如果工作进程只是无限循环,它们将永远不会退出。当协调器进程最终耗尽工作并停止填充 AVar 时,工作进程将永远阻塞在 take 调用上。不好。

所以要解决这个问题,有两种工作项目 - (1) 实际工作和 (2) 命令停止处理。然后让协调器进程首先推送所有工作项,一旦完成,推送 N 个命令停止。或者,您可以推送 N+1 个命令停止:这将保证协调器进程阻塞,直到最后一个工作线程完成。

将所有这些放在一起,这是一个演示程序:

module Main where

import Prelude

import Data.Array ((..))
import Data.Foldable (for_)
import Data.Int (toNumber)
import Effect (Effect)
import Effect.AVar (AVar)
import Effect.Aff (Aff,Milliseconds(..),delay,forkAff,launchAff_)
import Effect.Aff.AVar as AVar
import Effect.Class (liftEffect)
import Effect.Console (log)

data Work a = Work a | Done

process :: Int -> AVar (Work Int) -> Aff Unit
process myIndex v = do
  w <- AVar.take v
  case w of
    Done ->
      pure unit
    Work i -> do
      liftEffect $ log $ "Worker " <> show myIndex <> ": Processing " <> show i
      delay $ Milliseconds $ toNumber i
      liftEffect $ log $ "Worker " <> show myIndex <> ": Processed " <> show i
      process myIndex v

main :: Effect Unit
main = launchAff_ do
  var <- AVar.empty
  for_ (1..5) \idx -> forkAff $ process idx var

  let inputs = [100,200,300,400,1000,2000,101,102,103,104]
  for_ inputs \i -> AVar.put (Work i) var

  for_ (1..6) \_ -> AVar.put Done var

在这个程序中,我的工作项目只是数字,表示睡眠的毫秒数。我将其用作每个工作项目处理“昂贵”程度的模型。程序输出将是这样的:

Worker 1: Processing 100
Worker 2: Processing 200
Worker 3: Processing 300
Worker 4: Processing 300
Worker 5: Processing 400
Worker 1: Processed 100
Worker 1: Processing 1000
Worker 2: Processed 200
Worker 2: Processing 2000
Worker 3: Processed 300
Worker 3: Processing 101
Worker 4: Processed 300
Worker 4: Processing 102
Worker 5: Processed 400
Worker 5: Processing 103
Worker 3: Processed 101
Worker 3: Processing 104
Worker 4: Processed 102
Worker 5: Processed 103
Worker 3: Processed 104
Worker 1: Processed 1000
Worker 2: Processed 2000

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。