如何解决错误:回调已被调用使用异步瀑布并尝试永远运行程序时
第二次调用代码时出现此错误。 尝试侦听队列并发布回队列。第一次有效,但第二次无效。在 done() 上得到这个错误。 我通过一个单独的程序向 Q 发布消息。它工作得很好。 我希望 program1 中的这个 processMessage() 永远运行并收听 Q 中的消息。
我通过单独的程序向 Q 发布了第二条消息。它在被调用的程序 mq.js 中失败并出现以下错误:
Error: Callback was already called.
at /Users/vkbalakr/sports/clip-mc/services/base/node_modules/async/dist/async.js:966:32
at .../mq.js:221:18
调用program1:
let client;
function processMessage() {
let steps=[]
steps.push((done)=> {
config = "..."
return done(null,config);
})
steps.push((config,done)=> {
let subscribetoQueue = "xyz"
mq.subscribetoMq(subscribetoQueue,null,function (err,data,client) {
if (err) {
return done(err);
}
this.client = client;
return done(null,data);
});
})
steps.push((data,done) => {
const publishToTopic = "zxy";
mq.publishOnMq(publishToTopic,client,data) {
if (err) {
return done(err);
}
return done(null,data);
});
});
async.waterfall(steps,result) {
if (err) {
logger.error("Final Error caught:",err);
} else {
logger.debug("Final Success with result:",result);
}
});
}
processMessage()
调用的程序 mq.js:
let client;
function subscribetoMq(subscribetoTopic,cb) {
const steps = [];
// Connect client - STOMP broker supports STOMP over WebSockets.
steps.push((done) => {
// See https://stomp-js.github.io/api-docs/latest/ for API documentation
client = new StompJs.Client({....})
client.onConnect = function (frame) {
return done(); // Good,it's active
};
client.onWebSocketError = function (frame) {
};
client.onStompError = function (frame) {
};
client.activate();
});
steps.push((done)=> {
const subscription = client.subscribe(subscribetoTopic,(message) => {
const body = message.body;
if (body !== null && body !== '' && body !== -0) {
jsonBody = JSON.parse(message.body);
const jsonBodyParsed = JSON.parse(jsonBody);
const validationStatus = validateSchema(jsonBodyParsed);//retruna true or false
if (!validationStatus.code) {
return done(new Error(validationStatus.errMsg));
}
message.ack();
return done(null,jsonBodyParsed,client); // **<== Fails here on 2nd call ????**
} else {
return done();
}
},headers);
});
async.waterfall(steps,(err,result,client) => {
if (err) {
return cb(new Error("Error Subscribing"));
}
return cb(null,client);
});
}
function publishOnMq(publishToTopic,cb) {
const steps = [];
this.client = client;
steps.push((done) => {
if (client !== null) {
client = new StompJs.Client({
brokerURL,connectHeaders: {
login: brokerLogin,passcode: brokerPasscode,},debug(str) {
},webSocketFactory() {
return new WebSocket(brokerURL,this.stompVersions.protocolVersions(),{ agent: agent });
},reconnectDelay: 5000,heartbeatIncoming: 20000,heartbeatOutgoing: 20000,});
client.onConnect = function (frame) {
return done(); // Good,it's active
};
client.onWebSocketError = function (frame) {
};
client.onStompError = function (frame) {
};
client.activate();
} else {
return done();
}
});
// Publish messages
steps.push((done) => {
client.publish({
destination: publishToTopic,headers: { 'content-type': 'application/json' },skipContentLengthHeader: true,body: JSON.stringify(data),});
client.deactivate();
done(null,{ result: 'published' });
});
async.waterfall(steps,result) => {
if (err) {
return cb(new Error("Error Publishing"));
}
return cb(null,result);
});
}
TIA,
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。