Spring Batch 多进程重载,每个进程下有多个线程

如何解决Spring Batch 多进程重载,每个进程下有多个线程

我有一个场景,我需要大约 50-60 个不同的进程同时运行并执行一项任务。

每个进程都必须使用 sql 查询通过传递值获取要在后续任务中运行的数据来从数据库获取数据。 select col_1,col_2,col_3 from table_1 where col_1 = :Process_1;

 @Bean
    public Job partitioningJob() throws Exception {
        return jobBuilderFactory.get("parallelJob")
                .incrementer(new RunIdIncrementer())
                .flow(masterStep())
                .end()
                .build();
    }

    @Bean
    public Step masterStep() throws Exception {
        //How to fetch data from configuration and pass all values in partitioner one by one.
        // Can we give the name for every process so that it is helpful in logs and monitoring.
        return stepBuilderFactory.get("masterStep")
                .partitioner(slaveStep())
                .partitioner("partition",partitioner())
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws Exception {
        //Hit DB with sql query and fetch the data.

    }

    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<Map<String,String>,Map<String,String>>chunk(1)
                .processtask()
                .build();
    }

由于我们在 Apache Camel 中有 Aggregator 和 parallelProcessing,Spring Batch 是否有任何类似的功能可以完成相同的工作?

我是 Spring Batch 的新手,目前正在探索它是否可以处理卷。 因为这将是一个 24*7 全天候运行的高负载应用程序,并且每个进程都需要并发运行,其中每个线程都应该能够支持进程内的多个线程。

有没有办法监视这些进程,以便它无论如何都会终止,我应该能够重新启动该特定进程? 请帮忙解决这个问题。

解决方法

请找出以上问题的答案。

  1. parallelProcessing - 本地和远程分区支持并行处理,可以处理大量数据,因为我们目前每天处理 200 到 300 百万个数据。

  2. 它是否可以处理大量数据 - 是的,它可以处理大量数据并且已经得到充分证明。

  3. 每个进程都需要并发运行,其中每个线程都应该能够支持进程内的多个线程 - Spring 批处理将根据您的 ThreadPool 进行处理。确保根据系统资源配置池。

  4. 有没有办法监控这些进程以使其终止 - 是的。分区的每个并行过程是一个步骤,您可以在BATCH_STEP_EXECUTION中进行监控并了解所有详细信息

  5. 应该能够重新启动该特定进程 - 是的,这是一个内置功能,可以从失败的步骤重新启动。大量作业我们总是使用容错,以便稍后处理拒绝。这也是内置功能。

下面的示例项目

https://github.com/ngecom/springBatchLocalParition/tree/master

添加了数据库 - H2 并在资源文件夹中创建可用的表。我们总是更喜欢使用数据源池,池大小会大于您的线程池大小。

示例项目总结

  1. 从表“customer”中读取并划分为步骤分区
  2. 每一步分区写入新表“new_customer”
  3. JobConfiguration.java 方法名称“taskExecutor()”中可用的线程池配置
  4. slaveStep() 中可用的块大小。
  5. 您可以根据并行步骤计算内存大小并配置为 VM 最大内存。

查询帮你根据上面的问题执行后分析

SELECT * FROM NEW_CUSTOMER;   
SELECT * FROM BATCH_JOB_EXECUTION bje;
SELECT * FROM BATCH_STEP_EXECUTION bse WHERE JOB_EXECUTION_ID=2; 
SELECT * FROM BATCH_STEP_EXECUTION_CONTEXT bsec WHERE STEP_EXECUTION_ID=4; 

如果你想改成MYSQL,添加下面的数据源

spring.datasource.hikari.minimum-idle=5 
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.idle-timeout=600000 
spring.datasource.hikari.max-lifetime=1800000 
spring.datasource.hikari.auto-commit=true 
spring.datasource.hikari.poolName=SpringBoot-HikariCP
spring.datasource.url=jdbc:mysql://localhost:3306/ngecomdev
spring.datasource.username=ngecom
spring.datasource.password=ngbilling

请始终参考下面的guthub URL。你会从中得到很多想法。

https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?