如何解决如何从 Spring 集成文件中读取嵌套的 txt 文件
@EnableBinding(Source.class)
@Configuration
@EnableIntegrationManagement
public class FileSourceConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceConfig.class);
private FileSourceProperties properties;
Source source;
public FileSourceConfig(FileSourceProperties properties,Source source) {
this.properties = properties;
this.source = source;
}
@Bean
public DynamicRegexPatternFilter getFilter(){
return new DynamicRegexPatternFilter();
}
@Bean
public MessageChannel linesChannel() {
return new DirectChannel();
}
/* To poll the file for every given TimeUnit.SECONDS */
@Bean(name = { "defaultPoller",PollerMetadata.DEFAULT_POLLER })
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(properties.getPollPeriod(),TimeUnit.SECONDS));
return pollerMetadata;
}
@Bean
public IntegrationFlow fileInboundChannelFlow() {
FileInboundChannelAdapterSpec messageSourceSpec = Files
.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());
messageSourceSpec = messageSourceSpec.filter(getFilter());
//messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());
//Setting random UUID as messagekey
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
.split(new FileSplitter(true,true))
.enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY,"T(java.util.UUID).randomUUID().toString()"));
return flowBuilder.<Object,Class<?>>route(Object::getClass,m -> m.channelMapping(FileSplitter.FileMarker.class,"markers.input").channelMapping(String.class,"lines.input"))
.get();
}
@Bean
public IntegrationFlow lines() {
return f -> f.headerFilter("file_originalFile") .channel(source.output());
}
@Bean
public IntegrationFlow logErrors() {
return f -> f.log(LoggingHandler.Level.ERROR,"error",m -> "Error in sending message :"+m.getPayload());
}
@Bean
public IntegrationFlow markers() {
return f -> f.log().<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
.handle(m -> m.getHeaders(),e -> e.id("archive").advice(afteradvice()));
}}
有人可以建议如何从 inbound/a/a.txt 和 inblund/b/b.txt 读取文件
请在下面找到过滤器代码:-
public class DynamicRegexPatternFilter extends AbstractFileListFilter<File> {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicRegexPatternFilter.class);
@Autowired
private FileSourceProperties properties;
@Override
public boolean accept(File file) {
String[] quaterRange = {"[0][0-3]","[0][4-6]","[0][7-9]","[1][0-2]"};
String fileNameRegex = this.properties.getFilenameRegex();
//logic here
return Pattern.compile(fileNameRegex)
.matcher(file.getName())
.matches();
}
解决方法
参见RecursiveDirectoryScanner
。
默认情况下,FileReadingMessageSource
带有 DefaultDirectoryScanner
。因此,您只需使用该 messageSourceSpec
配置您的 RecursiveDirectoryScanner
:
FileInboundChannelAdapterSpec messageSourceSpec =
Files.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile())
.scanner(new RecursiveDirectoryScanner());
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。