微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

骆驼在抛出异常后不删除线程

如何解决骆驼在抛出异常后不删除线程

我面临下一个问题:我有一条骆驼路线,它会抛出一个异常来完成执行。此异常在我记录 2 条消息的camel-context.xml 文件中捕获。这个问题是当我在这条路线上收到很多事件时(5 分钟内 3K)线程没有被骆驼关闭,所以我得到了一个

java.lang.OutOfMemoryError:无法创建新的本地线程”

停止路由的异常

“尝试交付后耗尽:1 次捕获:org.apache.camel.CamelExecutionException:”。

    Failed delivery for (MessageId: queue_comercial.tech.queue.patrimonialmutuactivos2salesforce_ID_c31189e4-c95d-11eb-86e0-02010a818cf6 on ExchangeId: ID-int-salesforce-events-48-jgv9b-1623268712584-0-4706). Exhausted after delivery attempt: 1 caught: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-int-salesforce-events-48-jgv9b-1623268712584-0-4706];,received_at=2021-06-09T20:04:04.720268+00:00
Message History;,received_at=2021-06-09T20:04:04.721787+00:00
---------------------------------------------------------------------------------------------------------------------------------------;,received_at=2021-06-09T20:04:04.722608+00:00
RouteId              ProcessorId          Processor                                                                        Elapsed (ms);,received_at=2021-06-09T20:04:04.723272+00:00
[_routeRecibos     ] [_routeRecibos     ] [jms://queue://comercial.tech.queue.patrimonialmutuactivos2salesforce?cacheLeve] [        17];,received_at=2021-06-09T20:04:04.723988+00:00
[_routeRecibos     ] [_logRecibos1Header] [log                                                                           ] [         0];,received_at=2021-06-09T20:04:04.724822+00:00
[_routeRecibos     ] [_logRecibos1Body  ] [log                                                                           ] [         0];,received_at=2021-06-09T20:04:04.725480+00:00
[_routeRecibos     ] [_unmarshalRecibos ] [unmarshal[org.apache.camel.model.dataformat.JsonDataFormat@3ba5626a]          ] [         0];,received_at=2021-06-09T20:04:04.726417+00:00
[_routeRecibos     ] [_beanRecibosTransf] [bean[ref:entidadReciboToSalesforceTransformer method:transform]               ] [        15];,received_at=2021-06-09T20:04:04.727367+00:00
;,received_at=2021-06-09T20:04:04.728144+00:00
Stacktrace;,received_at=2021-06-09T20:04:04.728840+00:00
---------------------------------------------------------------------------------------------------------------------------------------;,received_at=2021-06-09T20:04:04.729590+00:00
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-int-salesforce-events-48-jgv9b-1623268712584-0-4706];,received_at=2021-06-09T20:04:04.730306+00:00
011at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1846) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.731106+00:00
011at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:385) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.731987+00:00
011at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:275) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.732683+00:00
011at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.733315+00:00
011at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.734075+00:00
011at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:76) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.734738+00:00
011at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.735420+00:00
011at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.736151+00:00
011at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) [camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.736887+00:00
011at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) [camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.737642+00:00
011at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.738386+00:00
011at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) [camel-core-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.739176+00:00
011at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113) [camel-jms-2.21.0.fuse-000112-redhat-3.jar!/:2.21.0.fuse-000112-redhat-3];,received_at=2021-06-09T20:04:04.739964+00:00
011at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.740715+00:00
011at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.741395+00:00
011at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.742167+00:00
011at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.742935+00:00
011at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.743735+00:00
011at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.744369+00:00
011at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.745117+00:00
011at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057) [spring-jms-4.3.17.RELEASE.jar!/:4.3.17.RELEASE];,received_at=2021-06-09T20:04:04.745780+00:00
011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_262];,received_at=2021-06-09T20:04:04.746429+00:00
011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_262];,received_at=2021-06-09T20:04:04.747272+00:00
011at java.lang.Thread.run(Thread.java:748) [na:1.8.0_262];,received_at=2021-06-09T20:04:04.748176+00:00
Caused by: java.lang.OutOfMemoryError: unable to create new native thread;,received_at=2021-06-09T20:04:04.748864+00:00

每次我重新启动应用程序时都会发生这种情况,因为我有许多此路由的入站事件等待处理。有人知道如何将骆驼配置为在每次异常后关闭每个线程吗?

我的实现:

路线:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    <routeContext id="patrimonialRoutes" xmlns="http://camel.apache.org/schema/spring">
        <route autoStartup="{{events.processors.ordenes.enabled:true}}" id="_routeOrdenes">
            <from id="_fromOrdenes" uri="jms:queue://{{events.queues.patrimonialvidaahorro2comercial}}?connectionFactory=#amqConnectionFactory&amp;receiveTimeout=60000&amp;transacted=true&amp;lazyCreateTransactionManager=false&amp;cacheLevelName=CACHE_CONSUMER&amp;errorHandlerLogStackTrace=false"/>
            <log id="_logordenes1Headers"
                logName="es.mma.comercial.sfdcevents.ordenes"
                loggingLevel="INFO" message="Recibida Orden patrimonial: HEADERS: ${headers}"/>
            <log id="_logordenes1Body"
                logName="es.mma.comercial.sfdcevents.ordenes"
                loggingLevel="INFO" message="Recibida Orden patrimonial: BODY: ${body}"/>
            <unmarshal id="_unmarshalOrdenes">
                <json library="Jackson" unmarshalTypeName="es.mma.comercial.sfdcevents.dto.EntidadOrden"/>
            </unmarshal>
            
            <!-- IN THIS BEAN IS WHERE THE EXCEPTION IS THROWN -->
            
            <bean id="_beanordenesTransformer1" method="transform" ref="entidadOrdenToSalesforceTransformer"/>
            ...
        </route>

启动异常的类:

private String processResponseConsultaHistoricopolizas(Response responseApiVidaAhorro) throws JsonProcessingException,JsonMappingException,IOException,JsonParseException,ApiVidaAhorroException {

    String idSolicitud = "";
    final String BLOQUE_TIPO_L = "L";
    mapper = new ObjectMapper();
    List<HistoricopolizasBloqueTipo> historicopolizasBloqueTipo = (List<HistoricopolizasBloqueTipo>) mapper.readValue(responseApiVidaAhorro.body().string(),List.class);
    
    log.debug("### processResponseConsultaHistoricopolizas: Respuesta consultaHistoricopolizas: " + toString(historicopolizasBloqueTipo));
    
    if ((historicopolizasBloqueTipo == null) || (historicopolizasBloqueTipo.size() == 0)) {
// HERE IS THE EXCEPTION

        throw new ApiVidaAhorroException(API_VIDA_AHORRO_RESPONSE_ERROR_EMPTY_BLOQUES);
    }

捕获异常的骆驼上下文:

<camel:camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <camel:routeContextRef ref="patrimonialRoutes"/>
    
    <dataFormats>
        <json id="serializer" library="Jackson" />
    </dataFormats>

    <!-- Estrategia de reintentos de llamada a SF para envío de eventos DiH: -->
    <redeliveryPolicyProfile id="commErrorsRedeliveryPolicy" useExponentialBackOff="true" 
        redeliveryDelay="${camel.component.salesforce.redelivery-policy.delay}" 
        backOffMultiplier="${camel.component.salesforce.redelivery-policy.backOff-multiplier}" 
        maximumRedeliveryDelay="${camel.component.salesforce.redelivery-policy.maximum-delay}" 
        maximumRedeliveries="-1" logRetryAttempted="true" retryAttemptedLogLevel="WARN"
        logRetryStackTrace="false"/>

    <!-- Tratamiento de errores en la enTrada de datos: se captura la excepción y a log. -->
    <onException>
        <exception>es.mma.comercial.sfdcevents.service.ApiVidaAhorroException</exception>

        <handled>
            <constant>true</constant>
        </handled>

        <log message="${exception.message}" loggingLevel="WARN" logName="es.mma.comercial.sfdcevents" />

        <!-- Esta cabecera es únicamente para los endpoints REST -->
        <setHeader headerName="Exchange.HTTP_RESPONSE_CODE">
            <constant>400</constant>
        </setHeader>

        <transform>
            <simple>exception.message</simple>
        </transform>
        <log message="Caused by message: '${property.srcmessage}'" loggingLevel="WARN" logName="es.mma.comercial.sfdcevents" />
    </onException>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。