如何解决如何将 Doobie 生成的 FS2 Stream 发布到 Kafka
我想将一长串事件发布到 Kafka 中,使用 fs2.Stream 对应一个非常大的 DB 行列表,如果编译为 List,最终将导致 Out Of Memotry 错误。
所以,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:
def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO,UUID]
并且我想使用这个发布者将一个事件发布到 Kafka 中,对应于 chunk 500 个键:
trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}
我想创建一个函数来将此流加入/发布到 Kafka 中:
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}
如何使用由 DB 发起的流,将其拆分为大小恒定的块,然后将每个块发布到 Kafka 中?
显然很难找到有关此主题的良好文档或示例。你能指出我正确的方向吗?
解决方法
由于您没有说明 ChunkOfKeys
是什么类型,我假设它类似于 Chunk[UUID]
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
xa: Transactor[IO],publisher: KeyPublisher
): IO[Unit] =
getKeyStream(endDateTime)
.transact(xa) // Convert the ConnectionIO stream to Stream[IO,UUID]
.chunkN(500) // into Stream[IO,Chunk[UUID]]
.evalMap(publisher.publish) // Into Stream[IO,Long]
.compile
.drain // An IO[Unit] that describes the whole process
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。