微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

计算增量偏移量 Kafka Java

如何解决计算增量偏移量 Kafka Java

一个 spring 项目中,我使用了 Kafka,现在我想创建一个以“TopicName”和“GroupeId”为参数的方法 并计算“主题分区的最后偏移量”和“组消耗的偏移量”之间的差异

对于 lastOffsets 我明白了 现在我需要获取消耗的偏移量来计算差异

public ResponseEntity<Offsets> deltaoffsets (@RequestParam( name = "groupId") String groupId,@RequestParam( name = "topic") String topic) {
        Map<String,Object> properties = (Map) kafkaLocalConsumerConfig.get("kafkaLocalConsumerConfig");
        properties.put("group.id",groupId);
        properties.put("enable.auto.commit","true");
        List<TopicPartition> partition=new ArrayList<>();

        KafkaConsumer<String,RefentialToReload> kafkaLocalConsumer = new KafkaConsumer<>(properties);

        Map<String,List<PartitionInfo>> topics = kafkaLocalConsumer.listTopics();
        List<PartitionInfo> partitionInfos = topics.get(topic);

        if (partitionInfos == null) {
            log.warn("Partition information was not found for topic");
        }

    else {
            for (PartitionInfo partitionInfo : partitionInfos) {
                TopicPartition topicPartition = new TopicPartition(topic,partitionInfo.partition());
                partition.add(topicPartition);
                log.info("partition assigned to kafkaLocalConsumer");
            }
        }
        //get lastOffsets of the topicPartition
        Map<TopicPartition,Long> OffsetsTopicpartition = kafkaLocalConsumer.endOffsets(kafkaLocalConsumer.assignment());
        //here i need to get consumed offsets 
}

解决方法

beginningOffsets() 是第一个偏移量,而不是最后一个。

您可以使用 AdminClient - 这是一个显示当前和结束偏移量的示例...

@Bean
public ApplicationRunner runner(KafkaAdmin admin,ConsumerFactory<String,String> cf) throws Exception {
    return args -> {
    try (
            AdminClient client = AdminClient.create(admin.getConfig());
            Consumer<String,String> consumer = cf.createConsumer("group","clientId","");
        ) {
        Collection<ConsumerGroupListing> groups = client.listConsumerGroups()
                .all()
                .get(10,TimeUnit.SECONDS);
        groups.forEach(group -> {
            Map<TopicPartition,OffsetAndMetadata> map = null;
            try {
                map = client.listConsumerGroupOffsets(group.groupId())
                        .partitionsToOffsetAndMetadata()
                        .get(10,TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                e.printStackTrace();
            }
            catch (TimeoutException e) {
                e.printStackTrace();
            }
            Map<TopicPartition,Long> endOffsets = consumer.endOffsets(map.keySet());
            map.forEach((tp,off) -> {
                System.out.println("group: " + group + " tp: " + tp
                        + " current offset: " + off.offset()
                        + " end offset: " + endOffsets.get(tp));
            });
        });
    }
    };
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。