如何解决JMSItemReader 根据块大小重新连接到 Solace
我有连接到 Solace 队列的 Spring 批处理应用程序,只要消息在队列中或接收器超时,就会轮询记录。我正在使用基于块的处理但是我可以看到我的批处理应用程序重新连接到它试图处理的每个块的 Solace。就我的理解而言,与 Solace 的连接应该只发生一次,SolSession 应该根据定义的块大小开始/结束..我的理解是正确的还是预期的行为?
Spring 批处理应用程序日志
2021-04-01 11:54:17.033 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connecting to host 'orig=tcp://localhost:55555,scheme=tcp://,host=localhost,port=55555' (host 1 of 1,smfclient 1,attempt 1 of 1,this_host_attempt: 1 of 1)
2021-04-01 11:54:17.111 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcp://localhost:55555,port=55555' (smfclient 1)
2021-04-01 11:54:17.158 INFO 11144 --- [ main] com.solacesystems.jms.solSession : SolSession started.
Received data of size 100
2021-04-01 11:54:24.122 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Channel Closed (smfclient 1)
2021-04-01 11:54:24.122 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connecting to host 'orig=tcp://localhost:55555,smfclient 2,this_host_attempt: 1 of 1)
2021-04-01 11:54:24.187 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcp://localhost:55555,port=55555' (smfclient 2)
2021-04-01 11:54:24.219 INFO 11144 --- [ main] com.solacesystems.jms.solSession : SolSession started.
Received data of size 100
2021-04-01 11:54:31.036 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Channel Closed (smfclient 2)
2021-04-01 11:54:31.036 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connecting to host 'orig=tcp://tcp://localhost:55555,smfclient 3,this_host_attempt: 1 of 1)
2021-04-01 11:54:31.098 INFO 11144 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcp://tcp://localhost:55555,port=55555' (smfclient 3)
2021-04-01 11:54:31.145 INFO 11144 --- [ main] com.solacesystems.jms.solSession : SolSession started.
下面是我的 Spring 批处理配置类:
@EnableJms
@Configuration
@EnableBatchProcessing
public class SpringBatchJmsConfig {
public static final Logger logger=LoggerFactory.getLogger(SpringBatchJmsConfig.class.getName());
@Autowired
private jmstemplate jmstemplate;
@Autowired
public JobBuilderFactory jobBuilFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public ConnectionFactory connectionFactory;
@Autowired
public CustomItemWriter writer;
@Bean
public DefaultJmsListenerContainerFactory cFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setPubSubDomain(false);
return factory;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter=new MappingJackson2MessageConverter();
converter.settargettype(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public JmsItemReader<Hcpsample1> hcpJmsItemReader(MessageConverter messageConverter){
JmsItemReader<Hcpsample1> hcpJmsItemReader=new JmsItemReader();
hcpJmsItemReader.setjmstemplate(jmstemplate);
hcpJmsItemReader.setItemType(Hcpsample1.class);
return hcpJmsItemReader;
}
@Bean
public FlatFileItemWriter<Hcpsample1> hcpFlatFileItemWriter(){
FlatFileItemWriter<Hcpsample1> hcpFlatFileItemWriter=new FlatFileItemWriter();
hcpFlatFileItemWriter.setLineAggregator(hcp->hcp.toString());
hcpFlatFileItemWriter.setLineseparator(System.lineseparator());
hcpFlatFileItemWriter.setResource(new FileSystemResource("hcp.txt"));
return hcpFlatFileItemWriter;
}
@Bean
public Job readJmsAndWritetoFileJob() {
return jobBuilFactory.get("readJmsAndWritetoFileJob").flow(step1()).end().build();
}
private Step step1() {
return stepBuilderFactory.get("step1").<Hcpsample1,Hcpsample1>chunk(100).
reader(hcpJmsItemReader(messageConverter())).writer(writer).build();
}
}
CustomItemWriter
:
@Component
public class CustomItemWriter implements ItemWriter<Hcpsample1> {
@Override
public void write(List<? extends Hcpsample1> items) throws Exception {
System.out.println("Received data of size " +items.size());
}
}
application.properties
:
solace.jms.host=tcp://localhost:55555
solace.jms.msg-vpn=DevVPN
solace.jms.client-username=test
solace.jms.client-password=test
spring.jms.template.default-destination=SpringBatchTestQueue
spring.jms.template.receive-timeout=2s
logging.level.com.solacesystems=INFO
请注意,我将 Spring 批处理与 solace-spring-boot-starter
一起用于 Solace 自动配置。
解决方法
JmsItemreader 不会为每个块重新打开 JMS 连接。它将为整个作业使用相同的连接。如果超时后 item reader 返回 null,则作业将完成,此时连接关闭。
是的..我的用例是每小时运行一次此作业并处理所有消息
在这种情况下,您看到每个计划运行(即新作业执行)的新连接是正常的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。