如何解决如何在Arrow + Reactor Monad理解中异步执行
在下面的代码中,每个helloX()
方法都是异步运行的(这是一个在单独线程中运行的延迟Mono),请参见下面的完整代码):
override fun helloEverybody(): Kind<ForMonoK,String> {
return MonoK.monad().fx.monad {
val j = !helloJoey()
val j2 = !helloJohn()
val j3 = !helloMary()
"$j and $j2 and $j3"
}.fix()
}
但是在日志中,我看到它们是秘密运行的:
14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary
如何使它们并行执行,并在所有结果完成后将所有结果汇总到monad理解中?
class HelloServiceImpl : HelloService<ForMonoK> {
private val logger = LoggerFactory.getLogger(javaClass)
override fun helloEverybody(): Kind<ForMonoK,String> {
return MonoK.monad().fx.monad {
val j = !helloJoey()
val j2 = !helloJohn()
val j3 = !helloMary()
"$j and $j2 and $j3"
}.fix()
}
override fun helloJoey(): Kind<ForMonoK,String> {
return Mono.defer {
logger.info("helloJoey()")
sleep(2000)
logger.info("helloJoey() - ready")
Mono.just("hello Joey")
}.subscribeOn(Schedulers.elastic()).k()
}
override fun helloJohn(): Kind<ForMonoK,String> {
return Mono.defer {
logger.info("helloJohn()")
sleep(5000)
logger.info("helloJohn() - ready")
Mono.just("hello John")
}.subscribeOn(Schedulers.elastic()).k()
}
override fun helloMary(): Kind<ForMonoK,String> {
return Mono.defer {
logger.info("helloMary()")
sleep(5000)
logger.info("helloMary() - ready")
Mono.just("hello Mary")
}.subscribeOn(Schedulers.elastic()).k()
}
}
fun main() {
val countDownLatch = CountDownLatch(1)
HelloServiceImpl().helloEverybody().fix().mono.subscribe {
println(it)
countDownLatch.countDown()
}
countDownLatch.await()
}
更新
我调整了方法,将顺序操作与并行操作相结合:
override fun helloEverybody(): Kind<ForMonoK,String> {
return MonoK.async().fx.async {
val j = helloJoey().bind()
val j2= dispatchers.IO
.parMapN(helloJohn(),helloMary()){ it1,it2 -> "$it1 and $it2" }
"$j and $j2"
}
}
不幸的是parMapN不能与ForMonoK一起使用:
Type inference Failed: fun <A,B,C,D> CoroutineContext.parMapN(fa: Kind<ForIO,A>,fb: Kind<ForIO,B>,fc: Kind<ForIO,C>,f: (A,C) -> D): IO<D>
cannot be applied to
receiver: Coroutinedispatcher arguments: (Kind<ForMonoK,String>,Kind<ForMonoK,(String,String,String) -> String)
想法?
解决方法
flatMap
与map
一样,没有线程语义或并行性。您所追求的是parMap
和parTraverse
,它们并行运行多个MonoK
。
这时fx
块变得不必要,因为它是为顺序操作而设计的。您可以将两者混合搭配。
MonoK.async().fx.async {
val result =
Dispatchers.IO
.parMap(helloJoey(),helloMary()) { joe,mary -> ... }
.bind()
otherThing(result).bind()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。