如何解决NodeJS @ azure / service-bus:如何更新消息锁
由于我可能长时间处理消息,而消息已超出最大锁定时间。我想知道NodeJS中更新消息锁的语法。
let { ServiceBusClient,ReceiveMode } = require('@azure/service-bus'),serviceBusClient = ServiceBusClient.createFromConnectionString(SERVICE_BUS_SECRET),queueClient = serviceBusClient.createQueueClient(XXX_QUEUE),receiver = queueClient.createReceiver(ReceiveMode.peekLock),errorHandler = function() {
...
},messageHandler = function(message) {
// want to renew message lock here
}
receiver.registerMessageHandler(messageHandler,errorHandler);
解决方法
使用方法registerMessageHandler
时,我们可以提供选项maxMessageAutoRenewLockDurationInSeconds
。这意味着SDK会自动更新消息锁定之前的最大持续时间(以秒为单位)。但是请注意,一旦消息解决或用户提供的onMessage处理程序完成其执行,此自动更新就会停止。有关更多详细信息,请参阅here和here
此外,如果您要手动续订锁,请将maxMessageAutoRenewLockDurationInSeconds
设置为零。
例如
import {
SendableMessageInfo,ServiceBusClient,OnMessage,OnError,delay,ReceiveMode,ServiceBusMessage,MessagingError,} from "@azure/service-bus";
const connectionString =
"";
const queueName = "myqueue";
let receivedMessage: ServiceBusMessage;
let elapsedTime = 0;
const interval = 1000 * 10;
const testDurationInMilliseconds = 1000 * 60;
async function sendMessage(): Promise<void> {
const ns = ServiceBusClient.createFromConnectionString(connectionString);
const client = ns.createQueueClient(queueName);
try {
const sender = client.createSender();
const message: SendableMessageInfo = {
messageId: "test",body: "test",label: `test`,};
console.log("send");
await sender.send(message);
await sender.close();
} finally {
await client.close();
await ns.close();
}
}
async function receiveMessage(): Promise<void> {
const ns = ServiceBusClient.createFromConnectionString(connectionString);
const client = ns.createQueueClient(queueName);
try {
const receiver = client.createReceiver(ReceiveMode.peekLock);
const receiverPromise = new Promise((resolve,_reject) => {
const onMessageHandler: OnMessage = async (brokeredMessage) => {
receivedMessage = brokeredMessage;
console.log("Received message: ",receivedMessage.messageId);
const startTime = Date.now();
while (elapsedTime < testDurationInMilliseconds) {
// simulate the user making an async call that takes time.
await delay(interval);
const data = await receiver.renewMessageLock(receivedMessage);
elapsedTime = Date.now() - startTime;
// log how long we've executed.
console.log(`still executing after ${elapsedTime}`);
console.log(data.toJSON());
console.log("\n");
}
await brokeredMessage.complete();
console.log("Completed message: ",receivedMessage.messageId);
};
const onErrorHandler: OnError = (err) => {
if ((err as MessagingError).retryable === true) {
console.log(
"Receiver will be recreated. A recoverable error occurred:",err
);
resolve();
} else {
console.log("Error occurred: ",err);
}
};
receiver.registerMessageHandler(onMessageHandler,onErrorHandler,{
autoComplete: false,maxMessageAutoRenewLockDurationInSeconds: 0,});
});
await receiverPromise;
await receiver.close();
} finally {
await client.close();
await ns.close();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。