微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring Batch 使用 CompositeItemWriter 和 CompositeItemProcessor

如何解决Spring Batch 使用 CompositeItemWriter 和 CompositeItemProcessor

使用 Spring Batch,我必须在两个不同的表中写入,但使用相同的 ItemReader。

我不知道如何使用一个 ItemReader 和一个 CompositeItemWriter。

这是作业配置:

public class JobConfiguration {

    @Autowired
    @SuppressWarnings("squid:S3305")
    private ItemReaderSurveillance itemReaderSurveillance;

    @Autowired
    @SuppressWarnings("squid:S3305")
    private ItemWriterSurveillance itemWriterSurveillance;

    @Autowired
    @SuppressWarnings("squid:S3305")
    private ItemWriterSurveillanceEcheance itemWriterSurveillanceEcheance;

    @Autowired
    @SuppressWarnings("squid:S3305")
    private CompositeitemprocessorSurveillance compositeitemprocessor;

    @Bean(name = "importSurveillanceJob")
    public Job job(JobBuilderFactory jobs) {

        return jobs.get("importSurveillanceStep")
                .listener(jobListener)
                .start(steptaskletCreationRepertoireReport())
                .next(steptaskletCreationRepertoireArchive())
                .next(stepSurveillanceReadProcessWrite())
                .next(stepZipFile())
                .build();
    }

    @Bean
    protected Step stepSurveillanceReadProcessWrite() {
        return stepBuilderFactory.get("stepSurveillanceReadProcessWrite")
                .<SurveillanceLineFile,CompositeResultSurveillance>chunk(Integer.valueOf(commitInterval))
                .reader(itemReaderSurveillance)
                .processor(compositeitemprocessor)
                .writer(compositeItemWriter())
                .faultTolerant()
                .retryLimit(0)
                .build();
    }

    @Bean
    public CompositeItemWriter<CompositeResultSurveillance> compositeItemWriter(){
        CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
        compositeItemWriter.setDelegates(Arrays.asList(itemWriterSurveillance,itemWriterSurveillanceEcheance));
        return compositeItemWriter;
    }
}

项目作家:

@Slf4j
@StepScope
@Component
public class ItemWriterSurveillance implements ItemWriter<FoaSurveillance>,StepExecutionListener {
String fileName;
    JobExecution mJobExecution;
    StepExecution stepExecution;

    @Autowired
    private FoaSurveillanceDao foaSurveillanceDao;

    @Override
    public void write(List<? extends FoaSurveillance> foaSurveillances) {

        ExecutionContext executionContext = stepExecution.getExecutionContext();
        
        // Process data
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        mJobExecution = stepExecution.getJobExecution();
        this.stepExecution = stepExecution;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return ExitStatus.COMPLETED;
    }
}

@Slf4j
@StepScope
@Component
public class ItemWriterSurveillanceEcheance implements ItemWriter<FoaSurveillanceEcheance>,StepExecutionListener {

    String fileName;
    JobExecution mJobExecution;
    StepExecution stepExecution;

    @Autowired
    private FoaSurveillanceEcheanceDao foaSurveillanceEcheanceDao;

    @Override
    public void write(List<? extends FoaSurveillanceEcheance> foaSurveillanceEcheances) {

        ExecutionContext executionContext = stepExecution.getExecutionContext();
        // Process data
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        mJobExecution = stepExecution.getJobExecution();
        this.stepExecution = stepExecution;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return ExitStatus.COMPLETED;
    }
}

组合两个 itemprocessor :

@Slf4j
@Component
public class CompositeitemprocessorSurveillance implements itemprocessor<SurveillanceLineFile,CompositeResultSurveillance>,StepExecutionListener {

    private StepExecution stepExecution;

    @Autowired
    itemprocessorSurveillance itemprocessorSurveillance;

    @Autowired
    itemprocessorSurveillanceEcheance itemprocessorSurveillanceEcheance;

    @Override
    public CompositeResultSurveillance process(SurveillanceLineFile surveillanceLineFile) throws Exception {
        CompositeResultSurveillance compositeResultSurveillance = new CompositeResultSurveillance();
        compositeResultSurveillance.setFoaSurveillance(itemprocessorSurveillance.process(surveillanceLineFile));
        compositeResultSurveillance.setFoaSurveillanceEcheance(itemprocessorSurveillanceEcheance.process(surveillanceLineFile));
        return compositeResultSurveillance;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

itemprocessorSurveillance :

@Slf4j
@Component
public class itemprocessorSurveillance implements itemprocessor<SurveillanceLineFile,FoaSurveillance>,StepExecutionListener {

    String fileName;

    private StepExecution stepExecution;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return ExitStatus.COMPLETED;
    }

    @Override
    public FoaSurveillance process(SurveillanceLineFile surveillanceLineFile) throws Exception {
        ExecutionContext executionContext = stepExecution.getExecutionContext();
        
        // Process Data
    }

以及处理器返回的 CompositeResult :

@Getter
@Setter
public class CompositeResultSurveillance {

    private FoaSurveillance foaSurveillance;
    private FoaSurveillanceEcheance foaSurveillanceEcheance;
}

现在我在 itemprocessorSurveillance 上有一个 NPE,因为 stepExecution 在 process 方法上为 null。

我不知道出了什么问题。有什么帮助吗?

解决方法

这是因为您的 ItemProcessorSurveillance 实现了两个接口:ItemProcessorStepExecutionListener,但仅在步骤中注册为 ItemProcessor。它还应注册为侦听器,以便在适当时调用 beforeStep 以设置 stepExecution 字段。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。