微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

错误:回调已被调用使用异步瀑布并尝试永远运行程序时

如何解决错误:回调已被调用使用异步瀑布并尝试永远运行程序时

第二次调用代码时出现此错误。 尝试侦听队列并发布回队列。第一次有效,但第二次无效。在 done() 上得到这个错误。 我通过一个单独的程序向 Q 发布消息。它工作得很好。 我希望 program1 中的这个 processMessage() 永远运行并收听 Q 中的消息。

我通过单独的程序向 Q 发布了第二条消息。它在被调用的程序 mq.js 中失败并出现以下错误

Error: Callback was already called.
    at /Users/vkbalakr/sports/clip-mc/services/base/node_modules/async/dist/async.js:966:32
        
        at .../mq.js:221:18

调用program1

let client;
function processMessage() {
    let steps=[]
    steps.push((done)=> {
        config = "..."
        return done(null,config);
    })
    steps.push((config,done)=> {
        let subscribetoQueue = "xyz"
        mq.subscribetoMq(subscribetoQueue,null,function (err,data,client) {
          if (err) {
            return done(err);
          }
          this.client = client;
          return done(null,data);
        });
    })
    steps.push((data,done) => {
        const publishToTopic = "zxy";
        mq.publishOnMq(publishToTopic,client,data) {
          if (err) {
            return done(err);
          }
          return done(null,data);
        });
      });

      async.waterfall(steps,result) {
        if (err) {
          logger.error("Final Error caught:",err);
        } else {
          logger.debug("Final Success with result:",result);
        }
      });

}

processMessage() 

调用的程序 mq.js

let client;
function subscribetoMq(subscribetoTopic,cb) {
  const steps = [];

  // Connect client - STOMP broker supports STOMP over WebSockets.
  steps.push((done) => {
    // See https://stomp-js.github.io/api-docs/latest/ for API documentation
    client = new StompJs.Client({....})

    client.onConnect = function (frame) {
      return done(); // Good,it's active
    };

    client.onWebSocketError = function (frame) {
    };

    client.onStompError = function (frame) {
    };
    client.activate();
  });
    
    steps.push((done)=> {
    const subscription = client.subscribe(subscribetoTopic,(message) => {
        const body = message.body;
        if (body !== null && body !== '' && body !== -0) {
          jsonBody = JSON.parse(message.body);
          const jsonBodyParsed = JSON.parse(jsonBody);
          const validationStatus = validateSchema(jsonBodyParsed);//retruna true or false
          if (!validationStatus.code) {
            return done(new Error(validationStatus.errMsg));
          }
          message.ack();
          return done(null,jsonBodyParsed,client); // **<== Fails here on 2nd call ????**
        } else {
          return done();
        }
      },headers);
  });

  async.waterfall(steps,(err,result,client) => {
    if (err) {
      return cb(new Error("Error Subscribing"));
    }
    return cb(null,client);
  });
}

function publishOnMq(publishToTopic,cb) {
  const steps = [];
  this.client = client;
  steps.push((done) => {
    if (client !== null) {
      client = new StompJs.Client({
        brokerURL,connectHeaders: {
          login: brokerLogin,passcode: brokerPasscode,},debug(str) {
        },webSocketFactory() {
          return new WebSocket(brokerURL,this.stompVersions.protocolVersions(),{ agent: agent });
        },reconnectDelay: 5000,heartbeatIncoming: 20000,heartbeatOutgoing: 20000,});


      client.onConnect = function (frame) {
        return done(); // Good,it's active
      };

      client.onWebSocketError = function (frame) {
      };

      client.onStompError = function (frame) {
      };
      client.activate();
    } else {
      return done();
    }
  });

  // Publish messages
  steps.push((done) => {
    client.publish({
      destination: publishToTopic,headers: { 'content-type': 'application/json' },skipContentLengthHeader: true,body: JSON.stringify(data),});
    client.deactivate();
    done(null,{ result: 'published' });
  });

  async.waterfall(steps,result) => {
      if (err) {
        return cb(new Error("Error Publishing"));
      }
      return cb(null,result);
  });
}

TIA,

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。