在处理失败的情况下,如何将STOMP消息重新传递给使用者?

如何解决在处理失败的情况下,如何将STOMP消息重新传递给使用者?

高级体系结构

JMS(生产者/消费者) Artemis(STOMP) Websocket经纪人中继服务 STOMP-over-Websocket-client(生产者/消费者)

一些观察

  1. 在STOMP使用者中,使用 client-individual ack订阅,无论我是 NACK 还是 ACK ,消息都将被丢弃阿耳emi弥斯。我希望将邮件重新发送给相同或任何其他消费者。有办法实现吗?

  2. 在JMS使用者中,如果在Artemis上接收到消息的使用者掉线了,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复正常运行,便会传递持久的消息。

class StompSessionHandlerImpl implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession session,StompHeaders connectedHeaders) {
        session.setAutoReceipt(Boolean.FALSE);
        StompHeaders headers1 = new StompHeaders();
        headers1.setDestination("/queue/msg");
        headers1.add("durable-subscription-name",messagingUtil.getServiceSubscriptionChannel());
        headers1.add("Authorization","Bearer ".concat(token));
        headers1.setAck("client-individual");
        session.subscribe(headers1,this);

    }

    @Override
    public void handleException(StompSession session,StompCommand command,StompHeaders headers,byte[] payload,Throwable exception) {
        session.acknowledge(Objects.requireNonNull(headers.getMessageId()),false);
    }

    @Override
    public void handleTransportError(StompSession session,Throwable exception) {
        synchronized (StompSessionHandlerImpl.msgSenderLock) {
            if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
                initStompSession();
            }
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return COMessage.class;
    }

    @Override
    public void handleFrame(StompHeaders headers,Object payload) {
        if (payload == null) return;
        COMessage msg = (COMessage) payload;
     try {
        stompMessagingService.handleReceivedMessages(msg);
        self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(),true);
       } catch (Exception e) {
           self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(),false);
       }

    }


    @PreDestroy
    public void cleanUp() {
        self.stompMessagingService.getStompSession().disconnect();
    }

}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
    @Bean
    public WebSocketStompClient stompClient() {
        WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
        List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false);
        converter.setObjectMapper(objectMapper);
        stompClient.setMessageConverter(converter);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10000);
        scheduler.initialize();
        stompClient.setTaskScheduler(scheduler);
        stompClient.setDefaultHeartbeat(new long[]{20000,20000});
        stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
        ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
        return stompClient;
    }
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private String host;

    private String password;

    private String user;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/queue","/topic","/exchange")
                .setRelayHost(host)
                .setClientLogin(user)
                .setClientPasscode(password)
                .setSystemHeartbeatSendInterval(20000)
                .setSystemLogin(user)
                .setSystemPasscode(password)
                .setUserDestinationBroadcast("/topic/unresolved-user")
                .setUserRegistryBroadcast("/topic/log-user-registry");
        config.setApplicationDestinationPrefixes("/device");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
        registry.setErrorHandler(new StompSubProtocolErrorHandler());
    }

    @Bean
    public DefaultSimpUserRegistry getDefaultSimpRegistry() {
        return new DefaultSimpUserRegistry();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(Integer.MAX_VALUE);
        registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
        registry.setTimeToFirstMessage(300000);
        registry.setSendTimeLimit(300000);
        registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });

    }

}
class ArtemisConfig extends ArtemisAutoConfiguration {

    @Bean("mqConnectionFactory")
    public ConnectionFactory senderActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory =
               new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionTTL(-1L);
        connectionFactory.setClientID(clientID);
        connectionFactory.setEnableSharedClientID(true);
        connectionFactory.setPreAcknowledge(Boolean.FALSE);
        return connectionFactory;
    }

    @Bean("mqCachingConnectionFactory")
    @Primary
    public ConnectionFactory cachingConnectionFactory() {
        return new CachingConnectionFactory(senderActiveMQConnectionFactory());
    }

    @Bean("jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setMessageConverter(jsonMessageConverter);
        jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
        jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
        return jmsTemplate;
    }

    @PreDestroy
    public void cleanUp() {
        if (connection.isStarted()) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.error("Failed to close the JMS connection {0}",e);
            }
        }
    }

}

解决方法

使用ActiveMQ Artemis时,STOMP ACK框架会告诉代理该消息已被成功使用,因此应将其从队列中删除。一个STOMP NACK帧告诉代理该消息已成功使用 ,因此代理将丢弃该消息。 STOMP规范未指定此处的确切行为。它只说:

NACKACK相反。它用于告诉服务器客户端没有使用该消息。然后,服务器可以将消息发送到其他客户端,将其丢弃,或将其放入死信队列。确切的行为是特定于服务器的。

NACK具有与ACK相同的标头:id(必填)和transaction(可选)。

NACK适用于一条消息(如果订阅的确认模式为client-individual)或之前发送且尚未确认或未确认的所有消息(如果订阅的确认模式为{{1} }。

如果您希望重新发送该消息,则既不确认也不拒绝该消息,并且在关闭消费者的连接时,该消息将重新放置在队列中,以传递给另一个(或相同)客户端。

将来,我希望这种行为是可配置的。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res