之前笔者曾经写过通过scala的方式获取kafka最新的offset
但是大多数的情况我们需要使用java的方式进行获取最新offset
GetoffsetShellWrap
public class GetoffsetShellWrap {
private static Logger log = LoggerFactory.getLogger(GetoffsetShellWrap.class);
private String topic;
private int port;
private String host;
private int time;public GetoffsetShellWrap(String topic,int port,String host,int time) {
this.topic = topic;
this.port = port;
this.host = host;
this.time = time;
}
public Map<String, String> getEveryPartitionMaxOffset() {
//1.获取topic所有分区 以及每个分区的元数据 => 返回 Map<分区id,分区元数据>
TreeMap<Integer, PartitionMetadata> partitionIdAndMeta = findTopicEveryPartition();
Map<String, String> map = new HashMap<String, String>();
for (Entry<Integer, PartitionMetadata> entry : partitionIdAndMeta.entrySet()) {
int leaderPartitionId = entry.getKey();
//2.根据每个分区的元数据信息 ==> 获取leader分区的主机
String leadbroker = entry.getValue().leader().host();
String clientName = "Client" + topic + "" + leaderPartitionId;
SimpleConsumer consumer = new SimpleConsumer(leadbroker, port,100000, 64 * 1024, clientName);
//3.从leader主机获取分区的offset
long readOffset = getLastOffset(consumer, topic, leaderPartitionId, clientName);
map.put(String.valueOf(leaderPartitionId), String.valueOf(readOffset));
if (consumer != null)
consumer.close();
}
return map;
}private TreeMap<Integer, PartitionMetadata> findTopicEveryPartition(){
TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(host, port, 100000, 64 * 1024,"leaderLookup" + new Date().getTime());
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> MetaData = resp.topicsMetadata();
if(MetaData!=null && !MetaData.isEmpty()){
TopicMetadata item = MetaData.get(0);
for (PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
} catch (Exception e) {
e.printstacktrace();
} finally {
if (consumer != null)
consumer.close();
}
return map;
}private long getLastOffset(SimpleConsumer consumer, String topic,int leaderPartitionId, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,leaderPartitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getoffsetsBefore(request);
if (response.hasError()) {
log.error("Error fetching data Offset Data the broker. Reason: "+ response.errorCode(topic, leaderPartitionId));
return 0;
}
long[] offsets = response.offsets(topic, leaderPartitionId);
return offsets[0];
}}
GetoffsetShellWrapJavaTestpublic class GetoffsetShellWrapJavaTest {
public static void main(String[] args) {
int port = 9092;
String topic = "2017-11-6-test";
int time = -1;
GetoffsetShellWrap offsetSearch = new GetoffsetShellWrap(topic,port,"hadoop-01",time);
Map<String, String> map = offsetSearch.getEveryPartitionMaxOffset();
for (String key : map.keySet()) {
System.out.println(key+"---"+map.get(key));
}
}
}
结果输出:0---16096
1---15930
2---16099”我自己是一名从事了十余年的后端的老程序员,辞职后目前在做讲师,近期我花了一个月整理了一份最适合2018年学习的JAVA干货(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,MysqL,Zookeeper,Tomcat,Docker,dubbo,Nginx等多个知识点的架构资料)从事后端的小伙伴们都可以来了解一下的,这里是程序员秘密聚集地,各位还在架构师的道路上挣扎的小伙伴们速来。“
加QQ群:611481448(名额有限哦!)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。