如何解决Java RabbitMQ 连接已关闭
我需要将消息推送到外部rabbitmq。我的 java 配置成功声明要推送的队列,但是每次尝试推送时,我都会遇到下一个异常:
web_1 | com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown; protocol method: #method<connection.close>(reply-code=200,reply-text=OK,class-id=0,method-id=0)
web_1 | at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
web_1 | at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicpublish(ChannelN.java:710)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicpublish(ChannelN.java:685)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicpublish(ChannelN.java:675)
web_1 | at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicpublish(AutorecoveringChannel.java:207)
web_1 | at com.ruddi.logiweb.service.impl.MQServiceImpl.send(MQServiceImpl.java:33)
web_1 | at com.ruddi.logiweb.controller.DriverController.addDriver(DriverController.java:105)
web_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
web_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
web_1 | at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
web_1 | at java.base/java.lang.reflect.Method.invoke(Method.java:566)
web_1 | at org.springframework.web.method.support.invocableHandlerMethod.doInvoke(invocableHandlerMethod.java:197)
web_1 | at org.springframework.web.method.support.invocableHandlerMethod.invokeForRequest(invocableHandlerMethod.java:141)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.ServletinvocableHandlerMethod.invokeAndHandle(ServletinvocableHandlerMethod.java:106)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
web_1 | at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
web_1 | at org.springframework.web.servlet.dispatcherServlet.dodispatch(dispatcherServlet.java:1060)
web_1 | at org.springframework.web.servlet.dispatcherServlet.doService(dispatcherServlet.java:962)
web_1 | at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
web_1 | at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
web_1 | at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
web_1 | at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
web_1 | at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
web_1 | at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
web_1 | at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.logout.logoutFilter.doFilter(logoutFilter.java:116)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92)
web_1 | at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77)
web_1 | at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
web_1 | at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
web_1 | at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
web_1 | at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
web_1 | at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
web_1 | at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
web_1 | at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
web_1 | at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
web_1 | at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
web_1 | at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:687)
web_1 | at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
web_1 | at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
web_1 | at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
web_1 | at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
web_1 | at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
web_1 | at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
web_1 | at org.apache.tomcat.util.net.socketProcessorBase.run(SocketProcessorBase.java:49)
web_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
web_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
web_1 | at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
web_1 | at java.base/java.lang.Thread.run(Thread.java:829)
豆子:
@Bean("mqChannel")
public Channel channel() {
ConnectionFactory factory = new ConnectionFactory();
String uri = "amqp://sxtswmgm:dh1N5aBEUnam53urt2VMrF9HSi7IDWAf@stingray.rmq.cloudamqp.com/sxtswmgm";
try {
factory.setUri(uri);
}
catch (Exception e) {
e.printstacktrace();
}
// factory.setHost("rabbitmq");
// factory.setPort(5672);
// factory.setUsername("guest");
// factory.setPassword("guest");
Channel channel = null;
try (Connection connection = factory.newConnection()){
channel = connection.createChannel();
channel.queueDeclare("logiweb",false,null);
}
catch(Exception e) {
log.info("EXCEPTION IN BEAN");
e.printstacktrace();
}
return channel;
}
发送消息服务类:
@Service
@Slf4j
public class MQServiceImpl implements MQService {
Channel channel;
@Autowired
public MQServiceImpl(
@Qualifier("mqChannel") Channel channel) {
this.channel = channel;
}
/**
* sending message to mq
* @param message message to send
*/
@Override
public void send(String message) {
log.info("send(message) method was called");
try {
channel.basicpublish("","logiweb",null,message.getBytes());
} catch (Exception e) {
e.printstacktrace();
}
}
}
顺便说一下,当我在本地启动 rabbitmq 时,它运行良好。当我使用容器化rabbitmq或外部cloudamqp时出现这个问题
所以,再一次,当我运行我的应用程序时,我可以在 RabbitMQ 仪表板中看到新队列(它已声明),但每次推送消息的尝试都以异常结束。
解决方法
我正在努力理解这些代码是如何组合在一起的,但这部分在我看来绝对是错误的:
try (Connection connection = factory.newConnection()){
channel = connection.createChannel();
channel.queueDeclare("logiweb",false,null);
}
当 try
块完成时,connection
资源将自动关闭。但是 Connection.close()
的 javadoc 声明:
"使用 AMQP.REPLY_SUCCESS 关闭代码和消息“OK”关闭此连接及其所有通道。等待所有关闭操作完成。"
因此,您创建的 Channel
将在 channel()
方法返回之前关闭。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。