微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Broker 不可用 (loadMetadataForTopics) - kafka 节点消费者

如何解决Broker 不可用 (loadMetadataForTopics) - kafka 节点消费者

node 用于我的 nodejs 代码我有一个用于请求/响应的 api。 首先,我发出一个请求 http://localhost:3000/number1 ,在我启动一个消费者从一个 kafka 主题一个分区“接收”消费消息之后,我尝试找到 id = number1 的消息。在我想使用此值向用户返回响应之后。所以我创建了一个像下面这样的消费者:

options = {
    kafkaHost: 'kafka:9092'
}
const client_node = new kafka_node.KafkaClient(options);

var Consumer = kafka_node.Consumer
var consumer_node = new Consumer(
  client_node,[
    { topic: 'receive.kafka.entities',partition: 0,offset: 0}
  ],{
    autoCommit: false,fetchMaxWaitMs: 100,fromOffset: 'earliest',groupId: 'kafka-node-group',asyncpush: false,}
);


const read = (callback)=>{
            let ret = "1"
            consumer_node.on('message',async function (message) {
                var parse1 = JSON.parse(message.value)
                var parse2 = JSON.parse(parse1.payload)
                var id = parse2.fullDocument.id
                var lastOffset = message.highWaterOffset - 1
                //check if there is a query
               if(lastOffset <= message.offset || ret !== "1"){
                    return callback(ret)
                }
                else if(id === back2){
                    ret = parse2.fullDocument
                }
            }); 
 }

let error = {
   id: "The entity " + back2 + " not found "
 }

read((data)=>{
 consumer_node.close(true,function(message){
     if(data != "1"){
           res.status(200).send(data)
     }
     else{
           res.status(404).send(error)
     }
  })
})

如果我尝试发出一个连续的请求,在第一个请求之后我会得到响应:

{
    "message": "broker not available (loadMetadataForTopics)"
}

我的 Docker-compose file1 如下:

zookeeper:
      image: confluentinc/cp-zookeeper:5.4.1
      container_name: stellio-zookeeper
      ports:
        - 2181:2181
      environment:
        ZOOKEEPER_SERVER_ID: 1
        ZOOKEEPER_CLIENT_PORT: 2181
        ZOOKEEPER_TICK_TIME: 2000
      networks:
        - default
        - localnet

kafka:
      image: confluentinc/cp-enterprise-kafka:latest
      container_name: kafka
      ports:
        - 9092:9092
        - 9101:9101
      environment:
        KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
        KAFKA_LISTENER_Security_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_broKER_LISTENER_NAME: INTERNAL
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_broKER_ID: 1
        KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
        KAFKA_JMX_PORT: 9101
        KAFKA_JMX_HOSTNAME: localhost
        CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
        CONFLUENT_METRICS_ENABLE: 'true'
        CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' 
        KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 
      depends_on:
        - zookeeper
      networks:
        - default
        - localnet
        - my-proxy-net-kafka

networks:
    default: # this network (app2)
        driver: bridge
    my-proxy-net-kafka:
        external:
            name: kafka_network

Docker-compose file2

app:
        container_name: docker-node
        hostname: docker-node
        restart: always
        build: .
        command: nodemon /usr/src/app/index.js
        networks:
            - default
            - proxynet-kafka
        ports:
            - '3000:3000'
        volumes:
            - .:/usr/src/app

networks:
    default:
        driver: bridge
    proxynet-kafka:
        name: kafka_network

为什么会这样?你能帮我解决这个问题吗?

[如果您想了解更多信息,请随时问我:)]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。