如何解决通过Doobie 0.9.0可以正确使用Monix 3.2.2
我想将Monix Observable与Doobie(fs2)流一起使用,但似乎无法使其正常工作。没有流媒体,我的测试应用程序可以正常退出,但是在使用流媒体之后,我的TaskApp似乎挂起了关机状态,无法弄清原因。
这是重现该问题的最小示例:
package example
import java.util.concurrent.Executors
import doobie.implicits._
import cats.effect.{Blocker,ContextShift,ExitCode,Resource}
import doobie.hikari.HikariTransactor
import monix.eval.{Task,TaskApp}
import com.typesafe.scalalogging.StrictLogging
import fs2.interop.reactivestreams._
import monix.reactive.Observable
import scala.concurrent.ExecutionContext
object Hello extends TaskApp with StrictLogging {
private def resources()(implicit contextShift: ContextShift[Task]): Resource[Task,Resources] = {
for {
transactor <- Database.transactor("org.postgresql.Driver","jdbc:postgresql://localhost/fubar","fubar","fubar")
} yield Resources(transactor)
}
def run(args: List[String]): Task[ExitCode] = resources().use(task)
.flatMap(_ => Task { println("All Done!") })
.flatMap(_ => Task(ExitCode.Success))
def task(resources: Resources): Task[Unit] = {
val publisher =
sql"""select id from message;"""
.query[(Long)]
.stream
.transact(resources.transactor)
.toUnicastPublisher()
Observable.fromReactivePublisher(publisher)
.foreachL(id => logger.info(id.toString))
}
}
case class Resources(transactor: HikariTransactor[Task])
object Database {
val ecBlocking = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
def transactor(dbDriver: String,dbUrl: String,dbUser: String,dbPassword: String)(implicit contextShift: ContextShift[Task]): Resource[Task,HikariTransactor[Task]] = {
HikariTransactor.newHikariTransactor[Task](dbDriver,dbUrl,dbUser,dbPassword,ecBlocking,Blocker.liftExecutionContext(ecBlocking))
}
}
根据Monix文档:https://monix.io/docs/current/reactive/observable.html#fs2
,我已将fs2流转换为可观察到的Monix。我是否需要以某种方式关闭fs2流或Observable才能使应用程序干净退出? 感谢所有可以正常使用的提示或如何正确调试的提示。
解决方法
问题是ExecutionContext
需要关闭。参见作者的答案here。
可以看到正确用法in the documentation。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。