微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当一个主题有多个主题时,如何处理来自 Kafka使用 Apache Beam的 Avro 输入?

如何解决当一个主题有多个主题时,如何处理来自 Kafka使用 Apache Beam的 Avro 输入?

为了使用 KafkaIO 通过 Apache Beam 处理 Avro 编码的消息,需要传递一个 ConfluentSchemaRegistryDeserializerProvider 实例作为值解串器。

一个典型的例子是这样的:

PCollection<KafkaRecord<Long,GenericRecord>> input = pipeline
  .apply(KafkaIO.<Long,GenericRecord>read()
     .withBootstrapServers("kafka-broker:9092")
     .withTopic("my_topic")
     .withKeyDeserializer(LongDeserializer.class)
     .withValueDeserializer(
         ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081","my_subject"))

但是,我想使用的一些 Kafka 主题有多个不同的主题(事件类型)(出于排序原因)。因此,我无法提前提供一个固定的主题名称。如何解决这个困境?

(我的目标是最终使用 BigQueryIO 将这些事件推送到云端。)

解决方法

您可以多次阅读,每个主题一次,然后Flatten

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。