如何解决org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 mouTopic 与融合卡夫卡
我正在使用 Confluent Kafka,并且所需的主题已经存在。向 Kafka
发送数据时出现以下异常。
org.springframework.kafka.KafkaException:发送失败;嵌套异常是 org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 moutopic。
Below is my config class:
@SpringBootApplication
@EnableConfigurationProperties
@EnableKafka
public class Application {
public static void main(final String[] args) {
SpringApplication.run(Application.class,args);
}
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String,String> producerFactory() {
Map<String,Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
以下是gradle依赖项:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.apache.kafka:kafka-clients'
compile "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion"
compile group: 'com.graphql-java-kickstart',name: 'graphql-spring-boot-starter',version: '8.1.0'
compile group: 'com.graphql-java-kickstart',name: 'graphql-java-tools',version: '6.3.0'
compile "com.graphql-java:graphiql-spring-boot-starter:5.0.2"
compile "commons-lang:commons-lang:2.6"
compile "commons-collections:commons-collections:3.2"
compile "org.apache.commons:commons-lang3:3.7"
compile group: 'org.springframework.boot',name: 'spring-boot-starter-validation',version: '2.3.5.RELEASE'
compile "ch.qos.logback:logback-core:1.1.11"
compile "ch.qos.logback:logback-access:1.2.3"
compile "net.logstash.logback:logstash-logback-encoder:4.8"
compile 'com.graphql-java:graphql-java-extended-scalars:1.0'
compile group: 'org.mongodb',name: 'mongo-java-driver',version: '3.12.0'
compile group: 'com.graphql-java-kickstart',name: 'playground-spring-boot-starter',version: '5.10.0'
compile group: 'org.apache.poi',name: 'poi-ooxml',version: '3.17'
compile group: 'com.fasterxml.jackson.core',name: 'jackson-databind',version: '2.10.0'
compile group: 'com.fasterxml.jackson.core',name: 'jackson-annotations',version: '2.9.4'
runtimeOnly 'com.graphql-java-kickstart:altair-spring-boot-starter:8.1.0'
runtimeOnly 'com.graphql-java-kickstart:voyager-spring-boot-starter:8.1.0'
testImplementation 'com.graphql-java-kickstart:graphql-spring-boot-starter-test:8.1.0'
testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion"
testCompile "io.rest-assured:json-schema-validator:$restAssuredVersion"
}
我的休息端点:
@GetMapping("/sendOfferDataToKafka")
public String allocateOffers() {
Message<String> message = MessageBuilder
.withPayload("data")
.setHeader(KafkaHeaders.TOPIC,"moutopic")
.setHeader(KafkaHeaders.MESSAGE_KEY,"999")
.setHeader(KafkaHeaders.PARTITION_ID,0)
.build();
System.err.print("message" + message.toString());
kafkaTemplate.send(message);
return message.toString();
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。