如何解决无法在 nodejs 中使用 STOMP 协议使用 ActiveMQ 优先级消息
我有一个应用程序向队列发送消息,另一个应用程序订阅队列并处理它。我希望 OTP 消息的优先级高于其他消息,因此我尝试使用 ActiveMQ 消息优先级来实现这一点。
这是使用stompit库的nodejs中使用STOMP协议进行ActiveMQ连接的代码:
const serverPrimary = {
host: keys.activeMQ.host,port: keys.activeMQ.port,ssl: ssl,connectHeaders: {
host: '/',login: keys.activeMQ.username,passcode: keys.activeMQ.password,'heart-beat': '5000,5000',},}
connManager = new stompit.ConnectFailover(
[serverPrimary,serverFailover],reconnectOptions,)
connManager.on('error',function (e) {
const connectArgs = e.connectArgs
const address = connectArgs.host + ':' + connectArgs.port
logger.error({ error: e,customMessage: address })
})
channelPool = new stompit.ChannelPool(connManager)
发送消息代码
const pushMessagetoAMQ = (queue,message) => {
const queues = Object.values(activeMQ.queues)
if (!queues.includes(queue)) {
_mqLog(mqLogMessages.unkNownQueue + queue)
return
}
//Priority header is set
const header = {
destination: queue,priority: 7
}
//If message is not a string
if (typeof message !== 'string') message = JSON.stringify(message)
//Logging message before sending
_mqLog(
mqLogMessages.sending,{ service: services.amq },{ header: header,message: message },)
//Sending message to amq
_sendMessagetoAMQ(header,message,error => {
if (error) {
_mqError(error,mqLogMessages.sendingError,{ service: services.amq })
}
})
}
const _sendMessagetoAMQ = (headers,body,callback) => {
channelPool.channel((error,channel) => {
if (error) {
callback(error)
return
}
channel.send(headers,callback)
})
}
const amqSubscribe = (queue,callback,ack = 'client-individual') => {
log({ customMessage: 'Subscribing to ' + queue })
const queues = Object.values(activeMQ.queues)
if (!queues.includes(queue)) {
return
}
channelPool.channel((error,channel) => {
let header = {
destination: queue,ack: ack,'activemq.prefetchSize': 1,}
//Check for error
if (error) {
_mqError(error,mqLogMessages.baseError,header)
} else {
channel.subscribe(
header,_synchronisedHandler((error,next) => {
//Check for error
if (error) {
_mqError(error,mqLogMessages.subscriptionError,header)
next()
} else {
//Read message
message.readString('utf-8',function (error,body) {
if (error) {
_mqError(error,mqLogMessages.readError,header)
next()
} else {
//Message read successfully call callback
callback(body,() => {
//AckNowledgment callback
channel.ack(message)
next()
})
}
})
}
}),)
}
})
}
Activemq.xml
<policyEntries>
<policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
.......
我尝试推送具有不同优先级的不同消息,并在所有消息都推送到队列后打开第二个应用程序(即订阅消息的应用程序)。但是,消息的执行顺序与发送的顺序相同。优先级没有改变任何东西。有什么我想念的吗? 我是否必须在消费者端添加一些东西才能工作?
解决方法
默认情况下,在 ActiveMQ“Classic”(由 Amazon MQ 使用)中禁用对优先级的支持。正如 documentation 所述:
...支持[消息优先级]默认禁用,因此需要通过xml配置使用每个目标策略启用它...
您需要在 prioritizedMessages="true"
中为您的队列设置 policyEntry
,例如:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" prioritizedMessages="true"/>
...
需要明确的是,这是在 activemq.xml
中的代理(即不是客户端)上配置的,它适用于所有类型的客户端。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。