微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – 无法让ActiveMQ重新发送我的消息

我有一个Java编写的单线程ActiveMQ使用者.我所要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它.如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送.

它或多或少都在工作,除了重发部分:每次重新启动我的消费者时,它会为每个仍然在队列中的消息收到一条消息,但是在发送它们之后,消息永远不会被重新发送.

我的代码看起来像:

public boolean init() throws JMSException,FileNotFoundException,IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setinitialRedeliveryDelay(500);
    policy.setBackOffMultiplier(2);
    policy.setUseExponentialBackOff(true);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true); // ????
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted,ActiveMQSession.INDIVIDUAL_ACKNowLEDGE);
    destination = session.createQueue(subject); //???

    consumer = session.createConsumer(destination);
    //consumer.setMessageListener(this); // message listener had same behavIoUr

}

private void process() {
    while(true) {
        System.out.println("Waiting...");
        try {
            Message message = consumer.receive();
            onMessage(message);
        } catch (JMSException e) {
            e.printstacktrace();
        }
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
    }
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(!client.sendMessage(msg)) {
                System.out.println("Webservice call Failed. Keeping message");
                //message.
            } else {
                message.ackNowledge();
            }

            if (transacted) {
                if ((messagesReceived % batch) == 0) {
                    System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
                    session.commit();
                }
            }
        } catch (JMSException e) {
            e.printstacktrace();
        }
    }
}

我目前没有使用交易(也许我应该这样做?).

我确定我错过了一些简单的东西,很快就会拍打我的额头,但我似乎无法弄清楚这是怎么回事.谢谢!

编辑:我自己也不能回答这个问题:

好的,经过一些实验,事实证明交易是实现这一目标的唯一方法.这是新代码

public boolean init() throws JMSException,url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setinitialRedeliveryDelay(1000L);
    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true);
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted,ActiveMQSession.CLIENT_ACKNowLEDGE);
    destination = session.createQueue(subject);

    consumer = session.createConsumer(destination);
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(client.sendMessage(msg)) {
                if(transacted) {
                    System.out.println("Call succeeded - committing message");
                    session.commit();
                }
                //message.ackNowledge();
            } else {
                if(transacted) {
                    System.out.println("Webservice call Failed. Rolling back message");
                    session.rollback();
                }
            }

        } catch (JMSException e) {
            e.printstacktrace();
        }
    }
}

现在,重新传送策略中指定的消息每1000毫秒重新发送一次.

希望这有助于其他人!

原文地址:https://www.jb51.cc/java/129501.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐