微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring Integration - 生产者队列容量限制

如何解决Spring Integration - 生产者队列容量限制

Spring 集成 - 生产者队列容量限制

我们正在使用远程分区和 MessageChannelPartitionHandler 将分区消息发送到队列(ActiveMQ),以便工作人员进行挑选和处理。 该作业有大量数据要处理,许多分区消息正在发布到队列,并且来自 replyChannel 的响应聚合器因消息超时而失败,因为无法在给定时间内处理所有消息。 我们还尝试通过使用队列容量 来限制发布到队列的消息,这导致服务器崩溃,并由于保存所有这些分区消息的内存问题而生成堆转储 在内存中。

我们想控制 StepExecution split 本身的创建,这样就不会发生内存问题。 示例案例是大约 4k 分区消息被发布到队列,整个作业需要大约 3 小时。

我们可以控制向 QueueChannel 发布消息吗?

<bean id="senExtractMemberMasterPartitionHandler"   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
        <property name="messagingOperations" ref="senExtractMemberMasterPartitionMsgTemplate" />
        <property name="replyChannel" ref="senExtractProcessingMasteraggregatedChannel" />
        <property name="stepName" value="senExtractGeneratePrintRequestWorkerStep" />
        <property name="gridSize" value="500" />
</bean>
<bean id="senExtractMemberMasterPartitionMsgTemplate" class="org.springframework.integration.core.MessagingTemplate">
    <property name="defaultChannel" ref="senExtractProcessingMasterRequestChannel" />
    <property name="receiveTimeout" value="18000000" />
</bean>

<integration:channel id="senExtractProcessingMasteraggregatedChannel" >
    <integration:queue />
    <integration:interceptors>
        <integration:wire-tap channel="masterLoggingChannel" />
    </integration:interceptors>
</integration:channel>


<int-jms:outbound-gateway
    id="senExtractMasterOutGateway" 
    connection-factory="masterJMSConnectionFactory"
    correlation-key="JMSCorrelationID"
    request-channel="senExtractProcessingMasterRequestChannel"
    request-destination-name="senExtractRequestQueue" 
    reply-channel="senExtractProcessingMasterReplyChannel"
    reply-destination-name="senExtractReplyQueue" 
    async="true"
    auto-startup="true"
    reply-timeout="18000000" 
    receive-timeout="6000">
    <integration:poller ref="masterPoller"/>
    <int-jms:reply-listener />  
</int-jms:outbound-gateway>

解决方法

该作业有大量数据要处理,许多分区消息正在发布到队列中,并且来自 replyChannel 的响应聚合器因消息超时而失败,因为无法在给定时间内处理所有消息。

您需要增加超时时间或添加更多工作人员。 MessageChannelPartitionHandler 的 Javadoc 对此很清楚:

The receive timeout needs to be set realistically in the MessagingTemplate
and the aggregator,so that there is a good chance of all work being done.

我们想控制 StepExecution 拆分本身的创建

Spring Batch 为此提供了 StepExecutionSplitter 接口。如果默认的 (SimpleStepExecutionSplitter) 不适合您的需要,您可以为您的分区步骤提供自定义实现。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?