使用 Spring Batch 集成为 AWS S3 中的每个新文件启动 JobLaunchRequest

如何解决使用 Spring Batch 集成为 AWS S3 中的每个新文件启动 JobLaunchRequest

我正在关注文档:Spring Batch Integration 结合 Integration AWS 以汇集 AWS S3。

但是在某些情况下,每个文件的批处理执行不起作用。

AWS S3 池工作正常,所以当我放置一个文件或当我启动应用程序并且存储桶中有文件时,应用程序与本地目录同步:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory ps3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(ps3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory("remote-bucket");
        //synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),"simpleMetadataStore"));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME,poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer ps3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(ps3InboundFileSynchronizer);
        messageSource.setautocreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
        //messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),"fsSimpleMetadataStore"));
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

我按照教程制作了FileMessagetoJobRequest 我不会把代码在这里,因为它和文档一样

所以我创建了 bean IntegrationFlow 和 FileMessagetoJobRequest:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource ps3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows.from(ps3InboundFileSynchronizingMessageSource,c -> c.poller(Pollers.fixedrate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessagetoJobRequest())
                .handle(jobLaunchingGateway())
                .log(LoggingHandler.Level.WARN,"headers.id + ': ' + payload")
                .get();
    }

    @Bean
    public FileMessagetoJobRequest fileMessagetoJobRequest() {
        FileMessagetoJobRequest fileMessagetoJobRequest = new FileMessagetoJobRequest();
        fileMessagetoJobRequest.setFileParameterName("input.file.name");
        fileMessagetoJobRequest.setJob(delimitedFileJob);
        return fileMessagetoJobRequest;
    }

所以在 JobLaunchingGateway 中,我认为是问题所在:

如果我是这样创建的:

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

案例 1(应用程序启动时桶为空):

  • 我在 AWS S3 中上传一个文件
  • 池化工作并且文件出现在本地目录中;
  • 但是转换/作业没有被触发;

案例 2(当应用程序启动时,Bucket 已经有一个文件):

  • 作业已启动:
2021-01-12 13:32:34.451  INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524  INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [delimitedFileJob]
  • 如果我在 S3 中添加第二个文件,则不会像情况 1 那样启动作业。

案例 3(Bucket 有多个文件):

  • 文件在本地目录中正确同步
  • 但该作业仅对最后一个文件执行一次。

所以在 docs 之后,我将网关更改为:

    @Bean
    @ServiceActivator(inputChannel = IN_CHANNEL_NAME,poller = @Poller(fixedrate="1000"))
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        //JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        //jobLaunchingGateway.setoutputChannel(replyChannel());
        jobLaunchingGateway.setoutputChannel(s3FilesChannel());
        return jobLaunchingGateway;
    }

使用这个新的网关实现,如果我在 S3 中放置一个文件,应用程序会做出反应但没有转换并给出错误

Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest

如果存储桶中有两个文件(当应用程序启动时)FILE1.csv 和 FILE2.csv,则该作业会针对 FILE1.csv 正确运行,但会为 FILE2.csv 提供上述错误

实现这样的事情的正确方法是什么?

明确地说,我想在这个存储桶中接收数千个 csv 文件,使用 Spring Batch 读取和处理,但我还需要尽快从 S3 获取每个新文件

提前致谢。

解决方法

JobLaunchingGateway 确实只希望我们将 JobLaunchRequest 作为有效载荷。

既然你在 @InboundChannelAdapter(value = IN_CHANNEL_NAME,poller = @Poller(fixedDelay = "30")) bean 定义中有那个 S3InboundFileSynchronizingMessageSource,那么在 @ServiceActivator(inputChannel = IN_CHANNEL_NAME 之间没有 JobLaunchingGateway 转换器的情况下使用 FileMessageToJobRequest 真的是错误的.

您的 integrationFlow 对我来说看起来不错,但是您确实需要从 @InboundChannelAdapter bean 中删除那个 S3InboundFileSynchronizingMessageSource 并完全依赖于 c.poller() 配置。

另一种方法是离开 @InboundChannelAdapter,然后从 IntegrationFlow 开始 IN_CHANNEL_NAME,而不是 MessageSource

由于您有多个轮询器针对同一个 S3 源,而且两者都基于相同的本地目录,因此看到这么多意外情况并不奇怪。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?