微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

重置偏移量和寻找最新偏移量的无限循环

如何解决重置偏移量和寻找最新偏移量的无限循环

我正在尝试执行一个简单的 Spark 结构化流应用程序,它目前对从本地 Kafka 集群中提取并写入本地文件系统没有太大的期望。代码如下:

    private static final String TARGET_PATH = "orchestration/target/myfolder/";

    private static final String BOOTSTRAP_SERVER = "localhost:9092";
    private static final String KAFKA_TOPIC = "twitter_aapl2";

    public static void main(String[] args) throws TimeoutException,StreamingQueryException {

        SparkSession spark = SparkSession.builder().master("local[*]").appName("spark app").getorCreate();

        Dataset<Row> df = spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers",BOOTSTRAP_SERVER)
                .option("subscribe",KAFKA_TOPIC)
                .load();

        StreamingQuery query = df.writeStream()
                .outputMode("append")
                .format("parquet")
                .option("path",TARGET_PATH + "data/")
                .option("checkpointLocation",TARGET_PATH + "checkpoints/")
                .start();
        query.awaitTermination();


但是在执行时,我得到以下输出并且我的数据并没有真正得到处理。

21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1,groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1,groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1,groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.

我该如何解决这个问题?

解决方法

事实证明,如果人们不是从头开始而是从最新偏移量读取主题,则这种查找和重置行为是非常理想的。然后,管道仅读取在运行时发送到 Kafka 主题的新数据,并且由于没有发送新数据,因此会出现寻找(新数据)和重置(到最新偏移量)的无限循环。

底线,只需从头读取或发送新数据即可解决问题。

,

我不得不通过设置日志配置来避免这种情况:

log4j.logger.org.apache.kafka.clients.consumer.internals.SubscriptionState=WARN

尽管这是预期的行为,但似乎没有必要每隔几毫秒记录一次“寻求最新偏移量”消息。它隐藏了日志文件中的所有其他应用程序日志。当从一直不是很活跃的主题中消费时,这个问题变得更加令人担忧。如果只是在 DEBUG 级别而不是 INFO 级别会更好。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。