异步serviceTasks的Activiti Job Executor问题Activiti> = 5.17

如何解决异步serviceTasks的Activiti Job Executor问题Activiti> = 5.17

请考虑下图

enter image description here

MyProcess.bpmn

<?xml version="1.0" encoding="UTF-8"?>
<deFinitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <process id="myProcess" name="My process" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <userTask id="evl" name="Evaluation"></userTask>
    <boundaryEvent id="timer_event_autocomplete" name="Timer" attachedToRef="evl" cancelActivity="false">
      <timerEventDeFinition>
        <timeDate>PT2S</timeDate>
      </timerEventDeFinition>
    </boundaryEvent>
    <serviceTask id="timer_service" name="Timed Autocomplete" activiti:async="true" activiti:class="com.example.service.TimerService"></serviceTask>
    <serviceTask id="store_docs_service" name="Store Documents" activiti:async="true" activiti:class="com.example.service.StoreDocsService"></serviceTask>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="evl"></sequenceFlow>
    <sequenceFlow id="flow2" sourceRef="timer_event_autocomplete" targetRef="timer_service"></sequenceFlow>
    <sequenceFlow id="flow3" sourceRef="evl" targetRef="store_docs_service"></sequenceFlow>
    <sequenceFlow id="flow4" sourceRef="store_docs_service" targetRef="endevent1"></sequenceFlow>
    <endEvent id="endevent1" name="End"></endEvent>
  </process>

</deFinitions>

要用语言描述它,有一个用户任务(评估)和一个计时器连接到它(配置为在2秒内触发)。触发计时器后,其Java委托TimerService中的定时自动完成异步服务任务将尝试完成用户任务(评估)。完成用户任务(评估)后,流程移至另一个异步服务任务(存储文档),调用其Java委托StoreDocsService,流程结束。

TimerService.java

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = execution.getEngineservices().getTaskService().createTaskQuery().active().singleResult();
        execution.getEngineservices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***",task.getId());
    }
}

StoreDocsService.java

public class StoreDocsService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(StoreDocsService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Store Documents ***");
    }
}

App.java

public class App
{
    public static void main( String[] args ) throws Exception
    {

//        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
//        demoAsyncJobExecutor.setCorePoolSize(10);
//        demoAsyncJobExecutor.setMaxPoolSize(50);
//        demoAsyncJobExecutor.setKeepAliveTime(10000);
//        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
//                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
                .setJobExecutorActivate(true)
                ;
        ProcessEngine processEngine = cfg.buildProcessEngine();
        String pName = processEngine.getName();
        String ver = ProcessEngine.VERSION;
        System.out.println("ProcessEngine [" + pName + "] Version: [" + ver + "]");

        RepositoryService repositoryService = processEngine.getRepositoryService();
        Deployment deployment = repositoryService.createDeployment()
                .addClassPathResource("MyProcess.bpmn").deploy();
        ProcessDeFinition processDeFinition = repositoryService.createProcessDeFinitionQuery()
                .deploymentId(deployment.getId()).singleResult();
        System.out.println(
                "Found process deFinition ["
                        + processDeFinition.getName() + "] with id ["
                        + processDeFinition.getId() + "]");

        final Map<String,Object> variables = new HashMap<String,Object>();
        final RuntimeService runtimeService = processEngine.getRuntimeService();

        ProcessInstance id = runtimeService.startProcessInstanceByKey("myProcess",variables);
        System.out.println("Started Process Id: "+id.getId());
        try {
            final TaskService taskService = processEngine.getTaskService();
//            List<Task> tasks = taskService.createTaskQuery().active().list();
//            while (!tasks.isEmpty()) {
//                Task task = tasks.get(0);
//                taskService.complete(task.getId());
//                tasks = taskService.createTaskQuery().active().list();
//            }

        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {

        }

        while(!runtimeService.createExecutionQuery().list().isEmpty()) {
        }
        processEngine.close();
    }


}

Activiti 5.15

当计时器触发时,上图将按说明执行。我们使用Activiti的DefaultJobExecutor 正如我们在日志中看到的:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.15]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-1] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Shutting down the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] stopped job acquisition

Activiti> = 5.17 仅将pom.xml中的activiti版本更改为5.17.0及更高版本(测试到5.22.0)并执行相同的代码,该流程执行计时器的Java委托TimerService,从而完成了用户任务(评估),但从未调用过存储文档Java委托StoreDocsService。要添加更多内容,似乎流程永无止境,执行仍然停留在Store Documents异步服务任务上。

日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnableImpl  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
  • 更改为异步作业执行器。 5.17版本的一项功能是新的异步作业执行程序(但是认的非异步执行程序仍配置为认)。因此,尝试通过以下几行在App.java中启用异步执行器:
        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
        demoAsyncJobExecutor.setCorePoolSize(10);
        demoAsyncJobExecutor.setMaxPoolSize(50);
        demoAsyncJobExecutor.setKeepAliveTime(10000);
        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
                .setAsyncExecutorEnabled(true)
                .setAsyncExecutor(demoAsyncJobExecutor)
                ;

该流程似乎正确执行,在StoreDocsService之后调用TimerService,但是它永远不会结束(while(!runtimeService.createExecutionQuery().list().isEmpty())中的App.java语句始终为真)!

日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 10,maxPoolSize 50 and keepAliveTime 10000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-3] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***

!!!!更新!!!

Activiti 6.0.0

尝试了相同的情况,但使用的是Activiti版本6.0.0TimerService中需要进行的更改无法从Engineservices获取DelegateExecution

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = Context.getProcessEngineConfiguration().getTaskService().createTaskQuery().active().singleResult();
        Context.getProcessEngineConfiguration().getTaskService().complete(task.getId());
//        Task task = execution.getEngineservices().getTaskService().createTaskQuery().active().singleResult();
//        execution.getEngineservices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***",task.getId());
    }
}

,此版本仅具有异步执行器,因此ProcessEngineConfiguration中的App.java更改为:

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
//                .setJobExecutorActivate(true)
                ;

使用6.0.0版本和异步执行程序,该过程成功完成,如我们在日志中所见:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 2,maxPoolSize 10 and keepAliveTime 5000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-3] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} starting to reset expired jobs
ProcessEngine [default] Version: [6.0.0.4]
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Task: 10 autocompleted by timer ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Shutting down the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[activiti-reset-expired-jobs] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} stopped resetting expired jobs
[activiti-acquire-timer-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} stopped async job due acquisition
[activiti-acquire-async-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} stopped async job due acquisition

Process finished with exit code 0

2个问题:

  1. 我们已从Activiti 5.15升级5.22.0,并且我们不使用异步作业执行器。有什么方法可以使这张图的功能保持其在5.15中的行为?
  2. 如果不可避免地要切换到异步作业执行器,那么为了使此过程成功完成,我们缺少什么呢?

上述示例项目可以在https://github.com/pleft/DemoActiviti

中找到

解决方法

如果没有明确回答需要设置环境和调试的问题,我建议您至少迁移到Activiti 6。 Activiti的5.x分支已经维护了5年以上,实际上已经失效。 由于核心开发人员都已转移到“ Flowable”项目,因此即使是6.x系列也已被放弃。 如果您选择使用Activiti 5.x,则可以选择:

  1. 自己维护代码库(并希望将所做的任何更改/改进都归还给项目)。
  2. 签订Activiti支持服务合同。有两家供应商提供此类服务。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?