如何解决是否可以在立即返回 HTTP 响应的同时使用 cat.effect.IO 和 FS2 Streams 将值异步推送到 Kafka?
我试图解决的问题可以描述如下:
- k8s 调度程序每周一早上调用我的 Finagle/Finch 应用程序的 HTTP 端点
- HTTP 端点将处理请求并返回
IO[Int]
(立即) - 在生成 HTTP 响应之前,应该启动另一个函数的执行,因此它会异步运行
- 异步进程应该使用来自使用 Doobie 生成的 Postgres RDB 的数据流
- 应该在恒定内存中操作该流,并且应该将数据推送到 Kafka 主题中
val myHTTPEndpoint: Endpoint[IO,Int] =
get("k8s-trigger") { () =>
keyService
.processKeys(LocalDateTime.now)
.map(Ok)
}
class KeyService {
def processKeys(endDateTime: LocalDateTime): IO[Int] =
for {
numberOfKeys <- reportCardRepository.countReportCardKeys(endDateTime) // Doobie Repo
_ <- IO.shift *> enqueueKeys(endDateTime,numberOfKeys,keysAccumulator = 0) // To be executed asynchronously
} yield numberOfKeys
def enqueueKeys(endDateTime: LocalDateTime,keysCounter: Int,keysAccumulator: Int): IO[Int] = ???
所以本质上我想从数据库中获取要处理的键的数量,触发 enqueueKeys()
的异步执行,这将获取 numberOfKeys 作为输入参数,并将这个数字按顺序返回给 HTTP 端点之后立即构建响应,无需等待 enqueueKeys()
完成其计算。
我想知道是否可以将 enqueueKeys()
作为批处理异步执行,该批处理应该发生在另一个线程池 - 另一个 ContextShift。使用 cats.effect.Async 或使用 FS2 是否可行?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。