微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何将 KAFKA 的属性外部化为不同的类并将其调用到主类?

如何解决如何将 KAFKA 的属性外部化为不同的类并将其调用到主类?

我一直在尝试使用抽象 OOP 来外部化属性代码,这样我就可以将它调用到生产者类,但我似乎无法调用它。这是代码,任何帮助将不胜感激。

public class Producer{

    private static final Logger logger = LogManager.getLogger(Producer.class);
    public static void main(String[] args) {
        logger.info("Creating Kafka Producer...");

        KafkaProducer<Integer,String> producer = new KafkaProducer<>(PropConfigs().prodProps());

        logger.info("Start sending messages...");

        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            producer.send(new ProducerRecord<>(AppConfigs.topicName,i,"Message " + i + " Test"),new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata,Exception e) {
                    if(e == null){
                        logger.info("\nReceived Metadata" + " Topic:" + recordMetadata.topic() + " Partition: " + recordMetadata.partition() + " Offset: " + recordMetadata.offset() + " Time: " + recordMetadata.timestamp() + "\n");
                    } else {
                        logger.error("Error",e);
                    }
                }
            });
        }

        logger.info("Finished - Closing Kafka Producer.");
        producer.flush();
        producer.close();

    }
}

我想将所有 props.setProperty 转移到不同的类,然后将其调用到生产者类。这就是我想做的:

package org.timothy.producer.common;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class PropConfigs {

    public static Properties prodProps(){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.CLIENT_ID_CONfig,AppConfigs.applicationID);
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,AppConfigs.bootstrapServers);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,IntegerSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONfig,"all");
        props.setProperty(ProducerConfig.RETRIES_CONfig,"3");
        props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");

        return new Properties(props);
    }

}

如何将其调用到主类?或者也许将其应用于: Properties props = new Properties(); KafkaProducer producer = new KafkaProducer(props);

解决方法

 KafkaProducer<Integer,String> producer = new KafkaProducer<>((new PropConfigs()).prodProps()); 

或者将它们设为静态...

 KafkaProducer<Integer,String> producer = new KafkaProducer<>(PropConfigs.prodProps()); 

https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。