微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

反应程序在将所有消息发送到 Kafka 之前提前退出

如何解决反应程序在将所有消息发送到 Kafka 之前提前退出

这是之前响应式 kafka 问题 (Issue while sending the Flux of data to the reactive kafka) 的后续问题。

我正在尝试使用反应式方法向 kafka 发送一些日志记录。这是使用反应式 kafka 发送消息的反应式代码

public class LogProducer {

    private final KafkaSender<String,String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONfig,"log-producer");
        props.put(ProducerConfig.ACKS_CONfig,"all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        SenderOptions<String,String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic,Flux<Logs.Data> records) throws InterruptedException {
    
        AtomicInteger sentCount = new AtomicInteger(0);
        sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            Thread.sleep(0,5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic,id,lrec.toString()),id);
        })).doOnNext(res -> sentCount.incrementAndGet()).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' Failed. "
                    + e,topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'",sentCount,topic);
        })
        .subscribe();
    }

}


public class ExecuteQuery implements Runnable {

    private LogProducer producer = new LogProducer("localhost:9092");

    @Override
    public void run() {
        Flux<Logs.Data> records = ...
        producer.sendMessages(kafkaTopic,records);
        .....
        .....
        // processing related to the messages sent
    }

}

因此,即使存在 Thread.sleep(0,5);,有时它也不会将所有消息发送到 kafka,并且程序会提前打印 SUCCESS 消息 (log.info("[SUCCESS]: {} records sent to the topic: '{}'",topic);)。有没有更具体的方法解决这个问题。例如,使用某种回调,让线程等待所有消息发送成功。

我有一个 spring 控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样

public class Main {

private scheduledexecutorservice scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);

public static void main(String[] args) {
     QueryScheduler scheduledQuery = new QueryScheduler();
     scheduler.scheduleAtFixedrate(scheduledQuery,5,TimeUnit.MINUTES);
}

class QueryScheduler implements Runnable {

  @Override
  public void run() {
      // preprocessing related to time
      executor.execute(new ExecuteQuery());
      // postprocessing related to time
  }

}
}

解决方法

您的 Thread.sleep(0,5); // sleep for 5 ns 没有任何值可以阻止主线程,因此它会在需要时退出,而您的 ExecuteQuery 可能尚未完成其工作。

不清楚您如何启动您的应用程序,但我建议 Thread.sleep() 正好在主线程中进行阻塞。准确地说是在 public static void main(String[] args) { 方法实现中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。