微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Java RabbitMQ 连接已关闭

如何解决Java RabbitMQ 连接已关闭

我需要将消息推送到外部rabbitmq。我的 java 配置成功声明要推送的队列,但是每次尝试推送时,我都会遇到下一个异常:

web_1       | com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown; protocol method: #method<connection.close>(reply-code=200,reply-text=OK,class-id=0,method-id=0)
web_1       |   at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
web_1       |   at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicpublish(ChannelN.java:710)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicpublish(ChannelN.java:685)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicpublish(ChannelN.java:675)
web_1       |   at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicpublish(AutorecoveringChannel.java:207)
web_1       |   at com.ruddi.logiweb.service.impl.MQServiceImpl.send(MQServiceImpl.java:33)
web_1       |   at com.ruddi.logiweb.controller.DriverController.addDriver(DriverController.java:105)
web_1       |   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
web_1       |   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
web_1       |   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
web_1       |   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
web_1       |   at org.springframework.web.method.support.invocableHandlerMethod.doInvoke(invocableHandlerMethod.java:197)
web_1       |   at org.springframework.web.method.support.invocableHandlerMethod.invokeForRequest(invocableHandlerMethod.java:141)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.ServletinvocableHandlerMethod.invokeAndHandle(ServletinvocableHandlerMethod.java:106)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
web_1       |   at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
web_1       |   at org.springframework.web.servlet.dispatcherServlet.dodispatch(dispatcherServlet.java:1060)
web_1       |   at org.springframework.web.servlet.dispatcherServlet.doService(dispatcherServlet.java:962)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
web_1       |   at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
web_1       |   at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
web_1       |   at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
web_1       |   at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.logout.logoutFilter.doFilter(logoutFilter.java:116)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92)
web_1       |   at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77)
web_1       |   at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
web_1       |   at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
web_1       |   at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
web_1       |   at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
web_1       |   at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
web_1       |   at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
web_1       |   at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
web_1       |   at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
web_1       |   at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
web_1       |   at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:687)
web_1       |   at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
web_1       |   at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
web_1       |   at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
web_1       |   at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
web_1       |   at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
web_1       |   at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
web_1       |   at org.apache.tomcat.util.net.socketProcessorBase.run(SocketProcessorBase.java:49)
web_1       |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
web_1       |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
web_1       |   at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
web_1       |   at java.base/java.lang.Thread.run(Thread.java:829)

豆子:

@Bean("mqChannel")
    public Channel channel() {
        ConnectionFactory factory = new ConnectionFactory();

        String uri = "amqp://sxtswmgm:dh1N5aBEUnam53urt2VMrF9HSi7IDWAf@stingray.rmq.cloudamqp.com/sxtswmgm";

        try {
            factory.setUri(uri);
        }
        catch (Exception e) {
            e.printstacktrace();
        }
//        factory.setHost("rabbitmq");
//        factory.setPort(5672);
//        factory.setUsername("guest");
//        factory.setPassword("guest");

        Channel channel = null;

        try (Connection connection = factory.newConnection()){
            channel = connection.createChannel();
            channel.queueDeclare("logiweb",false,null);
        }
        catch(Exception e) {
            log.info("EXCEPTION IN BEAN");
            e.printstacktrace();
        }

        return channel;
    }

发送消息服务类:

@Service
@Slf4j
public class MQServiceImpl implements MQService {
    Channel channel;

    @Autowired
    public MQServiceImpl(
            @Qualifier("mqChannel") Channel channel) {
        this.channel = channel;
    }

    /**
     * sending message to mq
     * @param message message to send
     */
    @Override
    public void send(String message) {
        log.info("send(message) method was called");
        try {
            channel.basicpublish("","logiweb",null,message.getBytes());
        } catch (Exception e) {
            e.printstacktrace();
        }
    }
}

顺便说一下,当我在本地启动 rabbitmq 时,它运行良好。当我使用容器化rabbitmq或外部cloudamqp时出现这个问题

所以,再一次,当我运行我的应用程序时,我可以在 RabbitMQ 仪表板中看到新队列(它已声明),但每次推送消息的尝试都以异常结束。

解决方法

我正在努力理解这些代码是如何组合在一起的,但这部分在我看来绝对是错误的:

    try (Connection connection = factory.newConnection()){
        channel = connection.createChannel();
        channel.queueDeclare("logiweb",false,null);
    }

try 块完成时,connection 资源将自动关闭。但是 Connection.close()javadoc 声明:

"使用 AMQP.REPLY_SUCCESS 关闭代码和消息“OK”关闭此连接及其所有通道。等待所有关闭操作完成。"

因此,您创建的 Channel 将在 channel() 方法返回之前关闭。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?