接上篇 Kafka新版消费者API示例(一)https://blog.csdn.net/Simon_09010817/article/details/83748974
kafka手动提交策略提供了更加灵活的管理方式,在某些场景我们需要对消费偏移量有更精准的管理。以保证消息不被重复消费以及消息不丢失。
Kafka提供两种手动提交方式:
1.异步提交(commitAsync):
异步模式下,提交失败也不会尝试提交。消费者线程不会被阻塞,因为异步操作,可能在提交偏移量操作结果未返回时就开始下一次拉取操作。
2.同步提交(CommitSync):
同步模式下,提交失败时一直尝试提交,直到遇到无法重试才结束。同步方式下,消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。
实现手动提交前需要在创建消费者时关闭自动提交,设置enable.auto.commit=false。
由于异步提交不会等消费偏移量提交成功后再拉取下一次消息,因此异步提交提供了一个偏移量提交回调方法commitAsync(OffsetCommitCallback callback)。提交偏移量完成之后会回调OffsetCommitCallback接口的onComplete()方法
示例代码:
- package com.simon.kafka.consumer.newconsumer;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import java.util.*;
- /**
- * Created by Simon on 2018/11/5.
- */
- public class KafkaConsumerAsync {
- public static void main(String[] args) throws InterruptedException {
- // 1、准备配置文件
- String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
- Properties props = new Properties();
- //kafka连接信息
- props.put("bootstrap.servers",kafkas);
- //消费者组id
- props.put("group.id", "test_group");
- //是否自动提交offset
- props.put("enable.auto.commit", "false");
- //在没有offset的情况下采取的拉取策略
- props.put("auto.offset.reset", "none");
- //自动提交时间间隔
- props.put("auto.commit.interval.ms", "1000");
- //设置一次fetch请求取得的数据最大为1k
- props.put("fetch.max.bytes", "1024");
- //key反序列化
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //value反序列化
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- String topic = "test";
- // 2、创建KafkaConsumer
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 3、订阅数据,不给定监听器
- consumer.subscribe(Collections.singleton(topic));
- try{
- //最少处理100条
- int minCommitSize = 100;
- //定义计数器
- int icount = 0;
- // 4、获取数据
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
- icount++;
- }
- Thread.sleep(5000);
- //在业务逻辑处理成功后提交offset
- if(icount >= minCommitSize){
- //满足最少消费100条,再进行异步提交
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if(exception == null){
- System.out.println("commit success");
- }else {
- //提交失败,对应处理
- System.out.println("commit Failed");
- }
- }
- });
- //计数器归零
- icount = 0 ;
- }
- }
- }catch (Exception e){
- e.printstacktrace();
- }finally {
- //关闭连接
- consumer.close();
- }
- }
- }
以时间戳查询消息:
Kafka在0.10.1.1版本上增加了时间戳索引文件。Kafka消费者API提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,参数为一个map对象,key为待查询的分区,value为待查询的时间戳。会返回一个大于等于该事件戳的第一条消息对应的偏移量和时间戳。若待查询分区不存在,会一直阻塞。
示例:
将kafka-client的maven依赖改为1.0.0 。在0.10.0.1中无法引入OffsetAndTimestamp类
- <!--引入kafka-clients-->
- <!--<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.10.0.1</version>
- </dependency>-->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.0.0</version>
- </dependency>
代码:
- package com.simon.kafka.consumer.newconsumer;
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- /**
- * Created by Simon on 2018/11/5.
- */
- public class KafkaConsumerTimestamps {
- public static void main(String[] args) throws InterruptedException {
- // 1、准备配置文件
- String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
- Properties props = new Properties();
- //kafka连接信息
- props.put("bootstrap.servers",kafkas);
- //消费者组id
- props.put("group.id", "test_group");
- //客户端id
- props.put("client.id", "test_group");
- //是否自动提交offset
- props.put("enable.auto.commit", "true");
- //在没有offset的情况下采取的拉取策略
- props.put("auto.offset.reset", "none");
- //自动提交时间间隔
- props.put("auto.commit.interval.ms", "1000");
- //设置一次fetch请求取得的数据最大为1k
- props.put("fetch.max.bytes", "1024");
- //key反序列化
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //value反序列化
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- String topic = "test";
- // 2、创建KafkaConsumer
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 3、订阅主题
- TopicPartition topicPartition = new TopicPartition(topic,0);
- consumer.assign(Collections.singleton(topicPartition));
- try{
- Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
- // 设置查询12 小时之前消息的偏移量
- timestampsToSearch.put(topicPartition, (System.currentTimeMillis() - 12 * 3600 * 1000));
- // 会返回时间大于等于查找时间的第一个偏移量
- Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes(timestampsToSearch);
- OffsetAndTimestamp offsetTimestamp = null;
- // 用for 轮询,当然由于本例是查询的一个分区,因此也可以用if 处理
- for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
- // 若查询时间大于时间戳索引文件中最大记录索引时间,
- // 此时value 为空,即待查询时间点之后没有新消息生成
- offsetTimestamp = entry.getValue();
- if (null != offsetTimestamp) {
- // 重置消费起始偏移量
- consumer.seek(topicPartition, entry.getValue().offset());
- }
- }
- while (true) {
- //4.轮询拉取消息
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
- }
- }
- }catch (Exception e){
- e.printstacktrace();
- }finally {
- //关闭连接
- consumer.close();
- }
- }
- }
由于集群环境已选型为kafka0.10.0.1,本次无法按指定时间戳拉取,报错信息为不支持当前broker版本。
速度控制:
应用场景中我们可能需要暂停某些分区消费,先消费其他分区,当达到某个条件再恢复该分区消费。
1.pause(Collection<TopicPartition> partitions):暂停某些分区在拉取操作时返回数据给客户端
- //无返回值
- consumer.pause(Collections.singleton(topicPartition));
2.resume(Collection<TopicPartition> partitions):恢复某些分区向客户端返回数据
- //无返回值
- consumer.resume(Collections.singleton(topicPartition));
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。