Spring异步通信(三)Kafka
Kafka设计为集群运行,实现很强的可扩展性。
通过将主题在集群的所有实例上进行分区(partition),它能够具有更强的弹性。
RabbitMQ 主要处理Exchange中的队列
Kafka仅使用主题实现消息的发布/订阅
kafka主题会复制到集群的所有代理商。集群中的每个节点都会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。
更进一步讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
默认监听9092端口
集群多个Kafka服务器:
spring:
kafka:
bootstrap-servers:
- kafka.tacocloud.com:9092
- kafka.tacocloud.com:9093
- kafka.tacocloud.com:9094
二、 通过KafkaTemplate 发送消息
KafkaTemplate 与 RabbitTemplate 和 jmstemplate 模板相似,但同时差异很大,在发送消息的时候,这一点很明显!!!
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, timestamp, key, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
return this.doSend(record);
}
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
if (!producerRecord.headers().iterator().hasNext()) {
byte[] correlationId = (byte[])message.getHeaders().get("kafka_correlationId", byte[].class);
if (correlationId != null) {
producerRecord.headers().add("kafka_correlationId", correlationId);
}
}
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
return this.send(this.defaultTopic, data);
}
public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data) {
return this.send(this.defaultTopic, key, data);
}
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data) {
return this.send(this.defaultTopic, partition, key, data);
}
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data) {
return this.send(this.defaultTopic, partition, timestamp, key, data);
}
KafkaTemplate通过泛型类型化,在发送信息的时候直接处理领域类型。所以不需要类型消息转换器。
Kafka 发送消息使用如下参数设置消息进行发送:
其中有一个ProducerRecord对象,只是一个简单对象,把上述参数放到对象中
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
还可以发送Message对象
例子:发送信息
public interface OrderMessagingService {
void sendOrder(Order order);
}
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}
三、 编写Kafka监听器
跟RabbitMQ 和 JMS 还有不同的是 它没有提供接受信息的方法
意味着 消费来自Kafka主题的消息只有一种办法就是编写消息监听器。
例子:标明当有消息抵达名为tacocloud.orders.topic的主题时候,该方法就会调用。我们只需把Order(载荷)对象传递给handle() 如果想获得其他信息如下ConsumerRecord 或 Message对象
@Profile("kafka-listener")
@Component
@Slf4j
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<String, Order> record) {
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
//
// Alternate implementation
//
// @KafkaListener(topics="tacocloud.orders.topic")
// public void handle(Order order, Message<Order> message) {
// MessageHeaders headers = message.getHeaders();
// log.info("Received from partition {} with timestamp {}",
// headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
// headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
// ui.displayOrder(order);
// }
}
学习Spring 实战第五代
异步消息在要通信的应用程序之间提供了一个中间层,这样能够实现更松散的耦合和更强的可扩展性。
谢谢大家观看,我将铭记于心。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。