微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在Arrow + Reactor Monad理解中异步执行

如何解决如何在Arrow + Reactor Monad理解中异步执行

在下面的代码中,每个helloX()方法都是异步运行的(这是一个在单独线程中运行的延迟Mono),请参见下面的完整代码):

    override fun helloEverybody(): Kind<ForMonoK,String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

但是在日志中,我看到它们是秘密运行的:

14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary

如何使它们并行执行,并在所有结果完成后将所有结果汇总到monad理解中?

使用main方法()的完整代码

class HelloServiceImpl : HelloService<ForMonoK> {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun helloEverybody(): Kind<ForMonoK,String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

    override fun helloJoey(): Kind<ForMonoK,String> {
        return Mono.defer {
            logger.info("helloJoey()")
            sleep(2000)
            logger.info("helloJoey() - ready")
            Mono.just("hello Joey")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloJohn(): Kind<ForMonoK,String> {
        return Mono.defer {
            logger.info("helloJohn()")
            sleep(5000)
            logger.info("helloJohn() - ready")
            Mono.just("hello John")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloMary(): Kind<ForMonoK,String> {
        return Mono.defer {
            logger.info("helloMary()")
            sleep(5000)
            logger.info("helloMary() - ready")
            Mono.just("hello Mary")
        }.subscribeOn(Schedulers.elastic()).k()
    }

}

fun main() {
    val countDownLatch = CountDownLatch(1)
    HelloServiceImpl().helloEverybody().fix().mono.subscribe {
        println(it)
        countDownLatch.countDown()
    }
    countDownLatch.await()
}

更新

我调整了方法,将顺序操作与并行操作相结合:

    override fun helloEverybody(): Kind<ForMonoK,String> {
        return MonoK.async().fx.async {
            val j = helloJoey().bind()
            val j2= dispatchers.IO
                    .parMapN(helloJohn(),helloMary()){ it1,it2 -> "$it1 and $it2" }
            "$j and $j2"
        }
    }

不幸的是parMapN不能与ForMonoK一起使用:

Type inference Failed: fun <A,B,C,D> CoroutineContext.parMapN(fa: Kind<ForIO,A>,fb: Kind<ForIO,B>,fc: Kind<ForIO,C>,f: (A,C) -> D): IO<D>
cannot be applied to
receiver: Coroutinedispatcher  arguments: (Kind<ForMonoK,String>,Kind<ForMonoK,(String,String,String) -> String)

想法?

解决方法

flatMapmap一样,没有线程语义或并行性。您所追求的是parMapparTraverse,它们并行运行多个MonoK

这时fx块变得不必要,因为它是为顺序操作而设计的。您可以将两者混合搭配。

MonoK.async().fx.async {

  val result = 
    Dispatchers.IO
     .parMap(helloJoey(),helloMary()) { joe,mary -> ... }
     .bind()

  otherThing(result).bind()

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。