微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – Spring无法将事务传播到ForkJoin的RecursiveAction

我正在尝试实现一个多线程解决方案,以便我可以并行化我的业务逻辑,包括读取和写入数据库.

技术堆栈:Spring 4.0.2,Hibernate 4.3.8

以下是一些要讨论的代码

组态

@Configuration
public class PartitionersConfig {

    @Bean
    public ForkJoinPoolfactorybean forkJoinPoolfactorybean() {
        final ForkJoinPoolfactorybean poolFactory = new ForkJoinPoolfactorybean();
        return poolFactory;
    }
}

服务

@Service
@Transactional
public class MyService {

    @Autowired
    private OtherService otherService;

    @Autowired
    private ForkJoinPool forkJoinPool;

    @Autowired
    private MyDao myDao;

    public void performPartitionedActionOnIds() {
        final ArrayList<UUID> ids = otherService.getIds();

        MyIdPartitioner task = new MyIdsPartitioner(ids,myDao,ids.size() - 1);
        forkJoinPool.invoke(task);
    }
}

存储库/ DAO

@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {

    public MyData getData(List<UUID> list) {
        // ... 
    }
}

RecursiveAction

public class MyIdsPartitioner extends RecursiveAction {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 100;

    private ArrayList<UUID> ids;
    private int fromIndex;
    private int toIndex;

    private MyDao myDao;

    public MyIdsPartitioner(ArrayList<UUID> ids,MyDao myDao,int fromIndex,int toIndex) {
        this.ids = ids;
        this.fromIndex = fromIndex;
        this.toIndex = toIndex;
        this.myDao = myDao;
    }

    @Override
    protected void compute() {
        if (computationSetIsSamllEnough()) {
            computeDirectly();
        } else {
            int leftToIndex = fromIndex + (toIndex - fromIndex) / 2;
            MyIdsPartitioner leftPartitioner = new MyIdsPartitioner(ids,fromIndex,leftToIndex);
            MyIdsPartitioner rightPartitioner = new MyIdsPartitioner(ids,leftToIndex + 1,toIndex);

            invokeAll(leftPartitioner,rightPartitioner);
        }
    }

    private boolean computationSetIsSamllEnough() {
        return (toIndex - fromIndex) < THRESHOLD;
    }

    private void computeDirectly() {
        final List<UUID> subList = ids.subList(fromIndex,toIndex);
        final MyData myData = myDao.getData(sublist);
        modifyTheData(myData);
    }

    private void modifyTheData(MyData myData) {
        // ...
        // write to DB
    }
}

执行此操作后,我得到:

No existing transaction found for transaction marked with propagation ‘mandatory’

我理解这是完全正常的,因为事务不会通过不同的线程传播.因此,一个解决方案是在每个线程as proposed in another similar question中手动创建一个事务.但这对我来说不够令人满意,所以我一直在搜索.

在Spring的论坛中我找到了a discussion on the topic.我发现一段非常有趣:

“I can imagine one Could manually propagate the transaction context to another thread,but I don’t think you should really try it. Transactions are bound to single threads with a reason – the basic underlying resource – jdbc connection – is not threadsafe. Using one single connection in multiple threads would break fundamental jdbc request/response contracts and it would be a small wonder if it would work in more then trivial examples.”

因此,第一个问题出现了:是否值得对数据库的读/写进行简化,这是否真的会损害数据库的一致性?
如果上面的引用不正确,我怀疑,有没有办法实现以下目的:
MyIdPartitioner将被Spring管理 – 使用@Scope(“prototype”) – 并传递所需的参数以进行递归调用,这样就可以将事务管理留给Spring了吗?

解决方法

这应该可以使用atomikos( http://www.atomikos.com)并可选择使用嵌套事务.

如果这样做,那么如果同一根事务的多个线程写入数据库中的相同表,请注意避免死锁.

原文地址:https://www.jb51.cc/java/129185.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐