微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法在 nodejs 中使用 STOMP 协议使用 ActiveMQ 优先级消息

如何解决无法在 nodejs 中使用 STOMP 协议使用 ActiveMQ 优先级消息

我有一个应用程序向队列发送消息,另一个应用程序订阅队列并处理它。我希望 OTP 消息的优先级高于其他消息,因此我尝试使用 ActiveMQ 消息优先级来实现这一点。

这是使用stompit库的nodejs中使用STOMP协议进行ActiveMQ连接的代码

const serverPrimary = {
  host: keys.activeMQ.host,port: keys.activeMQ.port,ssl: ssl,connectHeaders: {
    host: '/',login: keys.activeMQ.username,passcode: keys.activeMQ.password,'heart-beat': '5000,5000',},}
connManager = new stompit.ConnectFailover(
  [serverPrimary,serverFailover],reconnectOptions,)
connManager.on('error',function (e) {
  const connectArgs = e.connectArgs
  const address = connectArgs.host + ':' + connectArgs.port
  logger.error({ error: e,customMessage: address })
})

    channelPool = new stompit.ChannelPool(connManager)

发送消息代码

const pushMessagetoAMQ = (queue,message) => {
      const queues = Object.values(activeMQ.queues)
      if (!queues.includes(queue)) {
        _mqLog(mqLogMessages.unkNownQueue + queue)
        return
      }
      //Priority header is set
      const header = {
        destination: queue,priority: 7
      }
      //If message is not a string
      if (typeof message !== 'string') message = JSON.stringify(message)
      //Logging message before sending
      _mqLog(
        mqLogMessages.sending,{ service: services.amq },{ header: header,message: message },)
      //Sending message to amq
      _sendMessagetoAMQ(header,message,error => {
        if (error) {
          _mqError(error,mqLogMessages.sendingError,{ service: services.amq })
        }
      })
    }

const _sendMessagetoAMQ = (headers,body,callback) => {
      channelPool.channel((error,channel) => {
        if (error) {
          callback(error)
          return
        }
        channel.send(headers,callback)
      })
    }

这是在第二个应用程序中订阅队列的代码

const amqSubscribe = (queue,callback,ack = 'client-individual') => {
  log({ customMessage: 'Subscribing to ' + queue })
  const queues = Object.values(activeMQ.queues)
  if (!queues.includes(queue)) {
    return
  }
  channelPool.channel((error,channel) => {
    let header = {
      destination: queue,ack: ack,'activemq.prefetchSize': 1,}
    //Check for error
    if (error) {
      _mqError(error,mqLogMessages.baseError,header)
    } else {
      channel.subscribe(
        header,_synchronisedHandler((error,next) => {
          //Check for error
          if (error) {
            _mqError(error,mqLogMessages.subscriptionError,header)
            next()
          } else {
            //Read message
            message.readString('utf-8',function (error,body) {
              if (error) {
                _mqError(error,mqLogMessages.readError,header)
                next()
              } else {
                //Message read successfully call callback
                callback(body,() => {
                  //AckNowledgment callback
                  channel.ack(message)
                  next()
                })
              }
            })
          }
        }),)
    }
  })
}

Activemq.xml

<policyEntries>
            <policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
.......

我尝试推送具有不同优先级的不同消息,并在所有消息都推送到队列后打开第二个应用程序(即订阅消息的应用程序)。但是,消息的执行顺序与发送的顺序相同。优先级没有改变任何东西。有什么我想念的吗? 我是否必须在消费者端添加一些东西才能工作?

解决方法

默认情况下,在 ActiveMQ“Classic”(由 Amazon MQ 使用)中禁用对优先级的支持。正如 documentation 所述:

...支持[消息优先级]默认禁用,因此需要通过xml配置使用每个目标策略启用它...

您需要在 prioritizedMessages="true" 中为您的队列设置 policyEntry,例如:

 <destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" prioritizedMessages="true"/>
      ...

需要明确的是,这是在 activemq.xml 中的代理(即不是客户端)上配置的,它适用于所有类型的客户端。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。