为什么使用消息队列?
以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务。传统的调用方式是同步调用,这会存在一定的性能问题
使用消息队列可以实现异步的通信方式,相比于同步的通信⽅式,异步的⽅式可以让上游快速成功,极大提高系统的吞吐量。在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终⼀致性
Kafka 概述
1. 介绍
Kafka 是⼀个分布式的、⽀持分区的(partition)、多副本的 (replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各类需求场景:
- 日志收集:使用 Kafka 收集各种服务的日志,并通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等
- 消息系统:解耦和生产者和消费者、缓存消息等
- 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘
- 运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
2. 相关术语
名称 | 解释 |
---|---|
broker | 消息中间件处理节点,⼀个 Kafka 节点就是⼀个 broker,⼀个或者多个 broker 可以组成⼀个 Kafka 集群 |
Topic | Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定⼀个 topic |
Producer | 消息生产者,向 broker 发送消息的客户端 |
Consumer | 消息消费者,从 broker 读取消息的客户端 |
ConsumerGroup | 每个 Consumer 属于⼀个特定的 Consumer Group,⼀条消息可以被多个不同的 Consumer Group 消费,但是⼀个 Consumer Group 中只能有⼀个 Consumer 能够消费该消息 |
Partition | 物理上的概念,⼀个 topic 可以分为多个 partition,每个 partition 内部消息是有序的 |
3. 安装
安装 Kafka 之前需要先安装 JDK 和 Zookeeper,在官网下载 Kafka 安装包:http://kafka.apache.org/downloads,直接解压即可
需要修改配置文件,进⼊到 config 目录内,修改 server.properties
# broker.id 属性在 kafka 集群中必须唯一
broker.id= 0
# kafka 部署的机器 ip 和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9092
# kafka 的消息存储文件
log.dir=/usr/local/data/kafka-logs
# kafka 连接 zookeeper 的地址
zookeeper.connect= 192.168.65.60:2181
server.properties 核心配置详解:
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个 broker 都可以用⼀个唯⼀的非负整数 id 进行标识,作为 broker 的 名字 |
log.dirs | /tmp/kafka-logs | kafka 存放数据的路径,这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行 |
listeners | PLAINTEXT://192.168.65.60:9092 | server 接受客户端连接的端⼝,ip 配置 kafka 本机 ip 即可 |
zookeeper.connect | localhost:2181 | zooKeeper 连接字符串的格式为:hostname:port,此处 hostname 和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port;zookeeper 如果是集群,连接⽅式为 hostname1:port1,hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间,默认数据保存时间对所有 topic 都⼀样 |
num.partitions | 1 | 创建 topic 的默认分区数 |
default.replication.factor | 1 | ⾃动创建 topic 的默认副本数量,建议设置为⼤于等于 2 |
min.insync.replicas | 1 | 当 producer 设置 acks 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每⼀个 repica 的写数据都是成功的),如果这个数目没有达到,producer 发送消息会产生异常 |
delete.topic.enable | false | 是否允许删除主题 |
进入到 bin 目录下,使用命令来启动
./kafka-server-start.sh -daemon../config/server.properties
验证是否启动成功:进入到 zk 中的节点看 id 是 0 的 broker 有没有存在(上线)
ls /brokers/ids/
实现消息的生产和消费
1. 主题 Topic
topic 可以实现消息的分类,不同消费者订阅不同的 topic
执行以下命令创建名为 test
的 topic,这个 topic 只有一个 partition,并且备份因子也设置为 1
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test
查看当前 kafka 内有哪些 topic
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181
2. 发送消息
把消息发送给 broker 中的某个 topic,打开⼀个 kafka 发送消息的客户端,然后开始⽤客户端向 kafka 服务器发送消息
kafka 自带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到 kafka 集群中。在默认情况下,每一个行会被当做成一个独立的消息
./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test
3. 消费消息
对于 consumer,kafka 同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用 kafka 的消费者客户端,从指定 kafka 服务器的指定 topic 中消费消息
方式一:从最后一条消息的 偏移量+1 开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test
消息的发送方会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进行存储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后一个消息的下一个偏移量开始消费
4. 单播消息
一个消费组里只有一个消费者能消费到某一个 topic 中的消息,可以创建多个消费者,这些消费者在同一个消费组中
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic test
5. 多播消息
在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。kafka 实现多播,只需要让不同的消费者处于不同的消费组即可
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test
6. 查看消费组及信息
# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list
# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
- Currennt-offset:当前消费组的已消费偏移量
- Log-end-offset:主题对应分区消息的结束偏移量(HW)
- Lag:当前消费组未消费的消息数
7. 其他细节
-
生产者将消息发送给 broker,broker 会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
-
消息的保存是有序的,通过 offset 偏移量来描述消息的有序性
-
消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置
主题与分区
主题 Topic 在 kafka 中是⼀个逻辑概念,kafka 通过 topic 将消息进行分类。不同的 topic 会被订阅该 topic 的消费者消费。但是有⼀个问题,如果说这个 topic 的消息非常多,消息是会被保存到 log 日志文件中的,这会出现文件过大的问题,因此,kafka 提出了 Partition 分区的概念
通过 partition 将⼀个 topic 中的消息分区来存储,这样的好处有多个:
为⼀个主题创建多个分区
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1
通以下命令查看 topic 的分区信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
分区的作用:
- 可以分布式存储
- 可以并行写
了解了 Partition,再补充一个 Kafka 细节:在消息日志文件中,kafka 内部创建了 __consumer_offsets 主题包含了 50 个分区。这个主题用来存放消费者某个主题的偏移量,每个消费者会把消费的主题的偏移量自主上报给 kafka 中的默认主题:consumer_offsets。因此 kafka 为了提升这个主题的并发性,默认设置了 50 个分区
- 提交到哪个分区:通过 hash 函数:
hash(consumerGroupId) % __consumer_offsets 主题的分区数
- 提交到该主题中的内容是:key 是 consumerGroupId + topic + 分区号,value 就是当前 offset 的值
- 文件中保存的消息,默认保存七天,七天到后消息会被删除
Kafka 集群
1. 搭建
创建三个 server.properties 文件
# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://192.168.65.60:9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
通过命令启动三台 broker
./kafka-server-start.sh -daemon../config/server0.properties
./kafka-server-start.sh -daemon../config/server1.properties
./kafka-server-start.sh -daemon../config/server2.properties
搭建完后通过查看 zk 中的 /brokers/ids 看是否启动成功
2. 副本
下面的命令,在创建主题时,除了指明了主题的分区数以外,还指明了副本数,分别是:一个主题,两个分区、三个副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,其中的关键数据:
-
replicas:当前副本所存在的 broker 节点
-
leader:副本里的概念
-
follower:leader 处理所有针对这个 partition 的读写请求,而 follower 被动复制 leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果 leader 所在的 broker 挂掉,那么就会进行新 leader 的选举
3. broker、主题、分区、副本
Kafka 集群中由多个 broker 组成,⼀个 broker 存放⼀个 topic 的不同 partition 以及它们的副本
4. 集群消息的发送
./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic
5. 集群消息的消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic
6. 分区消费组消费者的细节
- ⼀个 partition 只能被⼀个消费组中的⼀个消费者消费,目的是为了保证消费的顺序性,但是多个 partion 的多个消费者的消费顺序性是得不到保证的
- 一个消费者可以消费多个 partition,如果消费者挂了,那么会触发rebalance机制,由其他消费者来消费该分区
- 消费组中消费者的数量不能比一个 topic 中的 partition 数量多,否则多出来的消费者消费不到消息
Java 中使用 Kafka
1. 生产者
1.1 引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
1.2 生产者发送消息
/**
* 消息的发送方
*/
public class MyProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
// 把发送的 key 从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig, StringSerializer.class.getName());
// 把发送消息 value 从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig, StringSerializer.class.getName());
// 2.创建⽣产消息的客户端,传⼊参数
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 3.创建消息
// key: 作⽤是决定了往哪个分区上发
// value: 具体要发送的消息内容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");
// 4.发送消息,得到消息发送的元数据并输出
RecordMetadata Metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:" + "topic-" + Metadata.topic() + "|partition-" + Metadata.partition() + "|offset-" + Metadata.offset());
}
}
1.3 发送消息到指定分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getorderId().toString(), JSON.toJSONString(order));
如果未指定分区,则会通过业务 Key 的 hash 运算,得出要发送的分区,公式为:hash(key)%partitionNum
1.4 同步发送消息
⽣产者同步发消息,在收到 kafka 的 ack 告知发送成功之前将⼀直处于阻塞状态
// 等待消息发送成功的同步阻塞方法
RecordMetadata Metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" +Metadata.topic() + "|partition-"+ Metadata.partition() + "|offset-" +Metadata.offset());
1.5 异步发送消息
异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法
// 指定发送分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getorderId().toString(),JSON.toJSONString(order));
// 异步回调方式发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata Metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" +
exception.getStackTrace());
}
if (Metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" +Metadata.topic() + "|partition-"+ Metadata.partition() + "|offset-" + Metadata.offset());
}
}
});
1.6 生产者中的 ack 的配置
在同步发送的前提下,生产者在获得集群返回的 ack 之前会⼀直阻塞,那么集群什么时候返回 ack 呢?此时 ack 有三个配置:
- acks = 0:表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息,性能最高,但最容易丢消息
- acks = 1:至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
- acks = -1 或 all:需要等待
min.insync.replicas
(默认为 1 ,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证,一般是金融级别,或跟钱打交道的场景才会使用这种配置
props.put(ProducerConfig.ACKS_CONfig, "1");
// 发送失败,默认会重试三次,每次间隔 100ms
props.put(ProducerConfig.RETRIES_CONfig, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONfig, 100)
1.7 消息发送的缓冲区
-
kafka 默认会创建⼀个消息缓冲区,用来存放要发送的消息,缓冲区是 32m
props.put(ProducerConfig.BUFFER_MEMORY_CONfig, 33554432);
-
kafka 本地线程会在缓冲区中⼀次拉 16k 的数据,发送到 broker
props.put(ProducerConfig.BATCH_SIZE_CONfig, 16384);
-
如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发到 broker
props.put(ProducerConfig.LINGER_MS_CONfig, 10);
2. 消费者
2.1 消费消息
public class MySimpleConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONfig, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, StringDeserializer.class.getName());
// 1.创建⼀个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 2.消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3. poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
2.2 自动提交和手动提交 offset
无论是自动提交还是手动提交,都需要把所属的 消费组 + 消费的某个主题 + 消费的某个分区 + 消费的偏移量
提交到集群的 _consumer_offsets 主题里面
-
自动提交:消费者 poll 消息下来以后自动提交 offset
// 是否自动提交 offset,默认就是 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, "true"); // 自动提交 offset 的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONfig, "1000");
注意:如果消费者还没消费完 poll 下来的消息就自动提交了偏移量,此时消费者挂了,于是下⼀个消费者会从已提交的 offset 的下⼀个位置开始消费消息,之前未被消费的消息就丢失掉了
-
手动提交:需要把自动提交的配置改成 false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, "false");
手动提交又分成了两种:
-
手动同步提交
在消费完消息后调用同步提交的方法,当集群返回 ack 前⼀直阻塞,返回 ack 后表示提交成功,执行之后的逻辑
while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } // 所有的消息已消费完 if (records.count() > 0) { // 有消息 // ⼿动同步提交 offset, 当前线程会阻塞直到 offset 提交成功 // ⼀般使⽤同步提交, 因为提交之后⼀般也没有什么逻辑代码了 consumer.commitSync(); // ====阻塞=== 提交成功 } }
-
手动异步提交
在消息消费完后提交,不需要等到集群 ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用
while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } // 所有的消息已消费完 if (records.count() > 0) { // 手动异步提交 offset,当前线程提交 offset 不会阻塞,可以继续处理后⾯的程序逻辑 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit Failed for " + offsets); System.err.println("Commit Failed exception: " + exception.getStackTrace()); } } }); } }
-
2.3 长轮询 poll 消息
消费者建立与 broker 之间的长连接,开始 poll 消息,默认⼀次 poll 五百条消息
// ⼀次 poll 最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONfig, 500)
可以根据消费速度的快慢来设置,如果两次 poll 的时间超出了 30s 的时间间隔,kafka 会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者
代码中设置了长轮询的时间是 1000 毫秒
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
- 如果⼀次 poll 到 500 条,就直接执行 for 循环
- 如果这⼀次没有 poll 到 500 条,且时间在1秒内,那么长轮询继续 poll,要么到 500 条,要么到 1s
- 如果多次 poll 都没达到 500 条,且 1 秒时间到了,那么直接执行 for 循环
2.4 健康状态检查
消费者每隔 1s 向 Kafka 集群发送心跳,集群发现如果有超过 10s 没有续约的消费者,将被踢出消费组,触发该消费组的 rebalance 机制,将该分区交给消费组里的其他消费者进行消费
// consumer 给 broker 发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONfig, 1000);
// kafka 如果超过 10 秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONfig, 10 * 1000)
2.5 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6 消息回溯消费
也即从头开始消费消息
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,
0)));
2.7 指定偏移量消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
2.8 从指定时间点消费
根据时间,去所有的 partition 中确定该时间对应的 offset,然后去所有的 partition 中找到该 offset 之后的消息开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 从一小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) {
continue;
}
// 根据消费⾥的 timestamp 确定 offset
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
2.9 新消费组的消费 offset 规则
新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的 offset+1 开始消费(消费新消息),可以通过以下的设置,让新的消费者第⼀次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)
- Latest:默认的,消费新消息
- earliest:第⼀次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig, "earliest");
SpringBoot 中使用 Kafka
1. 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 编写配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094
producer: # ⽣产者
retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码⽅式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
# TIME
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
# COUNT
# TIME | COUNT 有⼀个条件满⾜时提交
# COUNT_TIME
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤AckNowledgment.ackNowledge()后提交
# MANUAL
# 手动调⽤AckNowledgment.ackNowledge()后⽴即提交,⼀般使⽤这种
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: 172.16.253.21
3. 编写生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/msg")
public class MyKafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
return "send success!";
}
}
4. 编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.AckNowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, AckNowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交offset
ack.ackNowledge();
}
}
配置消费主题、分区和偏移量
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "3") // concurrency 就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, AckNowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交offset
ack.ackNowledge();
}
Kafka 集群 Controller、Rebalance、HW
1. Controller
Kafka 集群中的 broker 在 zookeeper 中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的 controller,负责管理整个集群中的所有分区和副本的状态:
- 当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本,选举的规则是从 isr 集合中最左边获取
- 当集群中有 broker 新增或减少,controller 会同步信息给其他 broker
- 当集群中有分区新增或减少,controller 会同步信息给其他 broker
2. Rebalance
如果消费者没有指明分区消费,那么当消费组里消费者和分区的关系发生变化,就会触发 rebalance 机制,重新调整消费者该消费哪个分区
在触发 rebalance 机制之前,消费者消费哪个分区有三种分配策略:
- range:通过公式来计算某个消费者消费哪个分区,公式为:前面的消费者是
(分区总数/消费者数量)+1
,之后的消费者是分区总数/消费者数量
- 轮询:大家轮着来
- sticky:粘合策略,如果需要 rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要全部重新分配,所以建议开启
3. HW 和 LEO
LEO 是某个副本最后消息的消息位置(log-end-offset),HW 是已完成同步的位置。消息在写入 broker 时,且每个 broker 完成这条消息的同步后,HW 才会变化。在这之前,消费者是消费不到这条消息的,在同步完成之后,HW 更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失
Kafka 线上问题优化
1. 防止消息丢失
生产者:
- 使用同步发送
- 把 ack 设成 1 或者 all,并且设置同步的分区数 >= 2
消费者:
- 把自动提交改成手动提交
2. 防止重复消费
如果生产者发送完消息后,却因为网络抖动,没有收到 ack,但实际上 broker 已经收到了。此时生产者会进行重试,于是 broker 就会收到多条相同的消息,而造成消费者的重复消费
解决方案:
3. 保证消息的顺序消费
生产者:使用同步发送,ack 设置成非 0 的值
消费者:主题只能设置⼀个分区,消费组中只能有⼀个消费者
4. 解决消息积压
所谓消息积压,就是消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致 kafka 中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个 kafka 对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩
解决方案:
- 在这个消费者中,使用多线程,充分利用机器的性能消费消息
- 通过业务的架构设计,提升业务层面消费的性能
- 创建多个消费组,多个消费者,部署到其他机器上,⼀起消费,提高消费者的消费速度
- 创建⼀个消费者,该消费者在 kafka 另建⼀个主题,配上多个分区,多个分区再配上多个消费者。该消费者将 poll 下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始⼀起消费了
5. 实现延时队列
假设一个应用场景:订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现,实现方案如下:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。