微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 mouTopic 与融合卡夫卡

如何解决org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 mouTopic 与融合卡夫卡

我正在使用 Confluent Kafka,并且所需的主题已经存在。向 Kafka 发送数据时出现以下异常。

org.springframework.kafka.KafkaException:发送失败;嵌套异常是 org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 moutopic。

Below  is my config class:

@SpringBootApplication
@EnableConfigurationProperties
@EnableKafka
public class Application {

    public static void main(final String[] args) {

        SpringApplication.run(Application.class,args);
    }

    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String,String> producerFactory() {
        Map<String,Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

以下是gradle依赖项:

    dependencies {

        implementation 'org.springframework.boot:spring-boot-starter-actuator'
        implementation 'org.apache.kafka:kafka-streams'
        implementation 'org.springframework.kafka:spring-kafka'
        implementation 'org.apache.kafka:kafka-clients'
        compile "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion"
        compile group: 'com.graphql-java-kickstart',name: 'graphql-spring-boot-starter',version: '8.1.0'
        compile group: 'com.graphql-java-kickstart',name: 'graphql-java-tools',version: '6.3.0'
        compile "com.graphql-java:graphiql-spring-boot-starter:5.0.2"
        compile "commons-lang:commons-lang:2.6"
        compile "commons-collections:commons-collections:3.2"
        compile "org.apache.commons:commons-lang3:3.7"
        compile group: 'org.springframework.boot',name: 'spring-boot-starter-validation',version: '2.3.5.RELEASE'
        compile "ch.qos.logback:logback-core:1.1.11"
        compile "ch.qos.logback:logback-access:1.2.3"
        compile "net.logstash.logback:logstash-logback-encoder:4.8"
        compile 'com.graphql-java:graphql-java-extended-scalars:1.0'
        compile group: 'org.mongodb',name: 'mongo-java-driver',version: '3.12.0'
        compile group: 'com.graphql-java-kickstart',name: 'playground-spring-boot-starter',version: '5.10.0'
        compile group: 'org.apache.poi',name: 'poi-ooxml',version: '3.17'
        compile group: 'com.fasterxml.jackson.core',name: 'jackson-databind',version: '2.10.0'
        compile group: 'com.fasterxml.jackson.core',name: 'jackson-annotations',version: '2.9.4'
        runtimeOnly 'com.graphql-java-kickstart:altair-spring-boot-starter:8.1.0'
        runtimeOnly 'com.graphql-java-kickstart:voyager-spring-boot-starter:8.1.0'
        testImplementation 'com.graphql-java-kickstart:graphql-spring-boot-starter-test:8.1.0'
        testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion"
        testCompile "io.rest-assured:json-schema-validator:$restAssuredVersion"
    }

我的休息端点:

 @GetMapping("/sendOfferDataToKafka")
    public String allocateOffers() {
     Message<String> message = MessageBuilder
     .withPayload("data")
     .setHeader(KafkaHeaders.TOPIC,"moutopic")
     .setHeader(KafkaHeaders.MESSAGE_KEY,"999")
     .setHeader(KafkaHeaders.PARTITION_ID,0)
     .build();

      System.err.print("message" + message.toString());

      kafkaTemplate.send(message);

      return message.toString();
 }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。