微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring异步通信三Kafka

Spring异步通信(三)Kafka

Kafka设计为集群运行,实现很强的可扩展性。
通过将主题在集群的所有实例上进行分区(partition),它能够具有更强的弹性。
RabbitMQ 主要处理Exchange中的队列
Kafka仅使用主题实现消息的发布/订阅
kafka主题会复制到集群的所有代理商。集群中的每个节点都会担任一个或多个主题的首领(leader),负责该主题的数据并将其复制到集群中的其他节点上。
更进一步讲,每个主题可以划分为多个分区。在这种情况下,集群中的每个节点是某个主题一个或多个分区的首领,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。

Kafka集群是由多个代理组成的,每个代理作为主题分区的首领

Kafka集群是由多个代理组成的,每个代理作为主题分区的首领
一、 搭建kafka环境

依赖

	<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

认监听9092端口
集群多个Kafka服务器:

spring:
	kafka:	
		bootstrap-servers:
			- kafka.tacocloud.com:9092
			- kafka.tacocloud.com:9093
			- kafka.tacocloud.com:9094

二、 通过KafkaTemplate 发送消息
KafkaTemplate 与 RabbitTemplate 和 jmstemplate 模板相似,但同时差异很大,在发送消息的时候,这一点很明显!!!

public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, timestamp, key, data);
        return this.doSend(producerRecord);
    }

    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
        return this.doSend(record);
    }

    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!producerRecord.headers().iterator().hasNext()) {
            byte[] correlationId = (byte[])message.getHeaders().get("kafka_correlationId", byte[].class);
            if (correlationId != null) {
                producerRecord.headers().add("kafka_correlationId", correlationId);
            }
        }

        return this.doSend(producerRecord);
    }
 	public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
        return this.send(this.defaultTopic, data);
    }

    public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data) {
        return this.send(this.defaultTopic, key, data);
    }

    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data) {
        return this.send(this.defaultTopic, partition, key, data);
    }

    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data) {
        return this.send(this.defaultTopic, partition, timestamp, key, data);
    }

KafkaTemplate通过泛型类型化,在发送信息的时候直接处理领域类型。所以不需要类型消息转换器。
Kafka 发送消息使用如下参数设置消息进行发送:

  • 消息要发送到的主题(send方法的必选参数)
  • 主题要写入的分区(可选)
  • 记录上要发送的key(可选)
  • 时间戳(可选,认为System.currentTimeMillis())
  • 载荷(必选)

其中有一个ProducerRecord对象,只是一个简单对象,把上述参数放到对象中

 private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

还可以发送Message对象

例子:发送信息

public interface OrderMessagingService {

void sendOrder(Order order);

}

@Service
public class KafkaOrderMessagingService
                               implements OrderMessagingService {

private KafkaTemplate<String, Order> kafkaTemplate;

@Autowired
public KafkaOrderMessagingService(
       KafkaTemplate<String, Order> kafkaTemplate) {
 this.kafkaTemplate = kafkaTemplate;
}

@Override
public void sendOrder(Order order) {
 kafkaTemplate.send("tacocloud.orders.topic", order);
}

}

三、 编写Kafka监听器
跟RabbitMQ 和 JMS 还有不同的是 它没有提供接受信息的方法
意味着 消费来自Kafka主题的消息只有一种办法就是编写消息监听器。

例子:标明当有消息抵达名为tacocloud.orders.topic的主题时候,该方法就会调用。我们只需把Order(载荷)对象传递给handle() 如果想获得其他信息如下ConsumerRecord 或 Message对象

@Profile("kafka-listener")
@Component
@Slf4j
public class OrderListener {
  
  private KitchenUI ui;

  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }

  @KafkaListener(topics="tacocloud.orders.topic")
  public void handle(Order order, ConsumerRecord<String, Order> record) {
    log.info("Received from partition {} with timestamp {}",
        record.partition(), record.timestamp());
    
    ui.displayOrder(order);
  }
  
//
// Alternate implementation
//
//  @KafkaListener(topics="tacocloud.orders.topic")
//  public void handle(Order order, Message<Order> message) {
//    MessageHeaders headers = message.getHeaders();
//    log.info("Received from partition {} with timestamp {}",
//        headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
//        headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
//    ui.displayOrder(order);
//  }
  
}

学习Spring 实战第五代

异步消息在要通信的应用程序之间提供了一个中间层,这样能够实现更松散的耦合和更强的可扩展性。
谢谢大家观看,我将铭记于心。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐