微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ActiveMQ 未从 KahaDB 文件存储传送消息

如何解决ActiveMQ 未从 KahaDB 文件存储传送消息

我们的 Spring 5 应用程序配置为使用 ActiveMQ 并持久化消息。我们决定使用 KahaDB 文件存储。

为了测试消息持久性,我注释掉了 MessageListener bean 并将消息发送到队列(通过 jmstemplate)并验证消息已写入 kahadb 数据日志文件。 取消对 MessageListener bean 的注释并重新启动代理后,消息不会传递到侦听器。 我无法弄清楚为什么消息在代理重启时没有传递,任何帮助将不胜感激。

下面是将 KahaDb 持久化添加到 ActiveMQ 配置的代码

   @Bean(initMethod = "start",destroyMethod = "stop")
   public brokerService brokerServiceConfig() throws Exception {
        brokerService brokerService = new brokerService();
        brokerService.addConnector("vm://localhost");
        brokerService.setbrokerName("order-broker");
        PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
        File kahaDir = new File("/home/test");
        kahaDbAdapter.setDirectory(kahaDir);
        brokerService.setPersistenceAdapter(kahaDbAdapter);
        brokerService.setPersistent(true);
   }

    @Bean
    public jmstemplate jmstemplate(){
        jmstemplate template = new jmstemplate();
        template.setConnectionFactory(connectionFactory());
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(DeliveryMode.PERSISTENT);
        return template;
    }
    
    @Bean
    @DependsOn({"brokerService"})
    private static ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setbrokerURL("vm://my-broker?create=false");
        
        List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
        trustedPackageList.add("com.mypackages");
        activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
        CachingConnectionFactory connFactory = new CachingConnectionFactory();
        activeMQConnectionFactory.setcopyMessageOnSend(false);
        activeMQConnectionFactory.setUseAsyncSend(true);
        connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
        return connFactory;
    }

    @Bean
    public DefaultMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory()); 
        dmlc.setSessionTransacted(false);
        dmlc.setSessionAckNowledgeMode(Session.AUTO_ACKNowLEDGE);   
        dmlc.setDestinationName(TEST_QUEUE);
        dmlc.setMaxConcurrentConsumers(25); 
        dmlc.setMessageListener(msgListener()); 
        return container;
    }       
    
    @Bean
    public TestMsgListener msgListener() {
        return new TestMsgListener(); // This is a Message Driven POJO.
    }

    // MessageProducer code -
    @Autowired
    private jmstemplate jTemplate;
    
    @Autowired
    private ActiveMQQueue testQueue;

    public void sendMessage() {
        try {
            MySerializedobject obj = <code to create new object>;
            jTemplate.convertAndSend(this.testQueue,obj);
        }catch(Throwable e) {
        }
    }

    // MessageListener code -
    public class TestMsgListener implements MessageListener {

    @Autowired
    public MessageConverter converter;

    public final void onMessage(Message message) {
        try {
             MySerializedobject obj = 
             (MySerializedobject)converter.fromMessage(message));

        } catch (Throwable e) {
          // log error.
        }
    }
 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。