微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

JMSItemReader 根据块大小重新连接到 Solace

如何解决JMSItemReader 根据块大小重新连接到 Solace

我有连接到 Solace 队列的 Spring 批处理应用程序,只要消息在队列中或接收器超时,就会轮询记录。我正在使用基于块的处理但是我可以看到我的批处理应用程序重新连接到它试图处理的每个块的 Solace。就我的理解而言,与 Solace 的连接应该只发生一次,SolSession 应该根据定义的块大小开始/结束..我的理解是正确的还是预期的行为?

Spring 批处理应用程序日志

2021-04-01 11:54:17.033  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connecting to host 'orig=tcp://localhost:55555,scheme=tcp://,host=localhost,port=55555' (host 1 of 1,smfclient 1,attempt 1 of 1,this_host_attempt: 1 of 1)
2021-04-01 11:54:17.111  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcp://localhost:55555,port=55555' (smfclient 1)
2021-04-01 11:54:17.158  INFO 11144 --- [           main] com.solacesystems.jms.solSession         : SolSession started.
Received data of size 100
2021-04-01 11:54:24.122  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Channel Closed (smfclient 1)
2021-04-01 11:54:24.122  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connecting to host 'orig=tcp://localhost:55555,smfclient 2,this_host_attempt: 1 of 1)
2021-04-01 11:54:24.187  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcp://localhost:55555,port=55555' (smfclient 2)
2021-04-01 11:54:24.219  INFO 11144 --- [           main] com.solacesystems.jms.solSession         : SolSession started.
Received data of size 100
2021-04-01 11:54:31.036  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Channel Closed (smfclient 2)
2021-04-01 11:54:31.036  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connecting to host 'orig=tcp://tcp://localhost:55555,smfclient 3,this_host_attempt: 1 of 1)
2021-04-01 11:54:31.098  INFO 11144 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcp://tcp://localhost:55555,port=55555' (smfclient 3)
2021-04-01 11:54:31.145  INFO 11144 --- [           main] com.solacesystems.jms.solSession         : SolSession started.

下面是我的 Spring 批处理配置类:

@EnableJms
@Configuration
@EnableBatchProcessing
public class SpringBatchJmsConfig {
    
    
    public static final Logger logger=LoggerFactory.getLogger(SpringBatchJmsConfig.class.getName());
    
    @Autowired
    private jmstemplate jmstemplate;
    
    @Autowired
    public JobBuilderFactory jobBuilFactory;
    
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    
    @Autowired
    public ConnectionFactory connectionFactory;
    
    @Autowired
    public CustomItemWriter writer;
    
    
    @Bean
    public DefaultJmsListenerContainerFactory cFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setPubSubDomain(false);
        return factory;
        
    }
    
    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter=new MappingJackson2MessageConverter();
        converter.settargettype(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }
    
    
    
    
    @Bean
    public JmsItemReader<Hcpsample1> hcpJmsItemReader(MessageConverter messageConverter){
        
        JmsItemReader<Hcpsample1> hcpJmsItemReader=new JmsItemReader();
        hcpJmsItemReader.setjmstemplate(jmstemplate);
        hcpJmsItemReader.setItemType(Hcpsample1.class);
        return hcpJmsItemReader;
    }
    
    
    @Bean
    public FlatFileItemWriter<Hcpsample1> hcpFlatFileItemWriter(){
        FlatFileItemWriter<Hcpsample1> hcpFlatFileItemWriter=new FlatFileItemWriter();
        hcpFlatFileItemWriter.setLineAggregator(hcp->hcp.toString());
        hcpFlatFileItemWriter.setLineseparator(System.lineseparator());
        hcpFlatFileItemWriter.setResource(new FileSystemResource("hcp.txt"));
        return hcpFlatFileItemWriter;
        
    }
    
    
    @Bean
    public Job readJmsAndWritetoFileJob() {
        
        return jobBuilFactory.get("readJmsAndWritetoFileJob").flow(step1()).end().build();
    }
    
    
    private Step step1() {
        return stepBuilderFactory.get("step1").<Hcpsample1,Hcpsample1>chunk(100).
                reader(hcpJmsItemReader(messageConverter())).writer(writer).build();
    }
    

}

CustomItemWriter

@Component
public class CustomItemWriter implements ItemWriter<Hcpsample1> {

    @Override
    public void write(List<? extends Hcpsample1> items) throws Exception {
        System.out.println("Received data of size " +items.size());
        
    }

}

application.properties

solace.jms.host=tcp://localhost:55555
solace.jms.msg-vpn=DevVPN
solace.jms.client-username=test
solace.jms.client-password=test
spring.jms.template.default-destination=SpringBatchTestQueue
spring.jms.template.receive-timeout=2s
logging.level.com.solacesystems=INFO

请注意,我将 Spring 批处理与 solace-spring-boot-starter 一起用于 Solace 自动配置

解决方法

JmsItemreader 不会为每个块重新打开 JMS 连接。它将为整个作业使用相同的连接。如果超时后 item reader 返回 null,则作业将完成,此时连接关闭。

是的..我的用例是每小时运行一次此作业并处理所有消息

在这种情况下,您看到每个计划运行(即新作业执行)的新连接是正常的。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。