如何解决在处理失败的情况下,如何将STOMP消息重新传递给使用者?
高级体系结构
JMS(生产者/消费者) Artemis(STOMP) Websocket经纪人中继服务 STOMP-over-Websocket-client(生产者/消费者)
一些观察
-
在STOMP使用者中,使用 client-individual ack订阅,无论我是 NACK 还是 ACK ,消息都将被丢弃阿耳emi弥斯。我希望将邮件重新发送给相同或任何其他消费者。有办法实现吗?
-
在JMS使用者中,如果在Artemis上接收到消息的使用者掉线了,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复正常运行,便会传递持久的消息。
class StompSessionHandlerImpl implements StompSessionHandler {
@Override
public void afterConnected(StompSession session,StompHeaders connectedHeaders) {
session.setAutoReceipt(Boolean.FALSE);
StompHeaders headers1 = new StompHeaders();
headers1.setDestination("/queue/msg");
headers1.add("durable-subscription-name",messagingUtil.getServiceSubscriptionChannel());
headers1.add("Authorization","Bearer ".concat(token));
headers1.setAck("client-individual");
session.subscribe(headers1,this);
}
@Override
public void handleException(StompSession session,StompCommand command,StompHeaders headers,byte[] payload,Throwable exception) {
session.acknowledge(Objects.requireNonNull(headers.getMessageId()),false);
}
@Override
public void handleTransportError(StompSession session,Throwable exception) {
synchronized (StompSessionHandlerImpl.msgSenderLock) {
if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
initStompSession();
}
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return COMessage.class;
}
@Override
public void handleFrame(StompHeaders headers,Object payload) {
if (payload == null) return;
COMessage msg = (COMessage) payload;
try {
stompMessagingService.handleReceivedMessages(msg);
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(),true);
} catch (Exception e) {
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(),false);
}
}
@PreDestroy
public void cleanUp() {
self.stompMessagingService.getStompSession().disconnect();
}
}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Bean
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000,20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue","/topic","/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
class ArtemisConfig extends ArtemisAutoConfiguration {
@Bean("mqConnectionFactory")
public ConnectionFactory senderActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(-1L);
connectionFactory.setClientID(clientID);
connectionFactory.setEnableSharedClientID(true);
connectionFactory.setPreAcknowledge(Boolean.FALSE);
return connectionFactory;
}
@Bean("mqCachingConnectionFactory")
@Primary
public ConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jsonMessageConverter);
jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
return jmsTemplate;
}
@PreDestroy
public void cleanUp() {
if (connection.isStarted()) {
try {
connection.close();
} catch (JMSException e) {
log.error("Failed to close the JMS connection {0}",e);
}
}
}
}
解决方法
使用ActiveMQ Artemis时,STOMP ACK
框架会告诉代理该消息已被成功使用,因此应将其从队列中删除。一个STOMP NACK
帧告诉代理该消息已成功使用 ,因此代理将丢弃该消息。 STOMP规范未指定此处的确切行为。它只说:
NACK
与ACK
相反。它用于告诉服务器客户端没有使用该消息。然后,服务器可以将消息发送到其他客户端,将其丢弃,或将其放入死信队列。确切的行为是特定于服务器的。
NACK
具有与ACK
相同的标头:id
(必填)和transaction
(可选)。NACK适用于一条消息(如果订阅的确认模式为
client-individual
)或之前发送且尚未确认或未确认的所有消息(如果订阅的确认模式为{{1} }。
如果您希望重新发送该消息,则既不确认也不拒绝该消息,并且在关闭消费者的连接时,该消息将重新放置在队列中,以传递给另一个(或相同)客户端。
将来,我希望这种行为是可配置的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。