微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在一个事务中执行多个 neo4j 密码查询的最佳实践 - 使用 Async API

如何解决在一个事务中执行多个 neo4j 密码查询的最佳实践 - 使用 Async API

我已经编写了测试代码从java应用程序连接到neo4j服务器并在一个事务中执行两个密码查询。我没有在neo4j Documentation中找到执行两个查询和检索结果的详细示例

虽然代码有效,但我的问题是:

  • 这些是使用 neo4j(AsyncSession,Transaction) 和 java(CompletionStage/ CompletableFuture) 接口的最佳实践吗?
  • 如果我在最后使用 CompletableFuture.get(),查询是否异步执行?

其他细节:

  • Neo4j 版本:v4.2.1
  • Neo4j 驱动程序版本:4.1.1
  • Java jdk v8

    package com.abc.test;
    
    import com.abc.Logger;
    import com.abc.LoggerFactory;
    import org.neo4j.driver.AuthTokens;
    import org.neo4j.driver.Driver;
    import org.neo4j.driver.GraphDatabase;
    import org.neo4j.driver.SessionConfig;
    import org.neo4j.driver.async.AsyncSession;
    import org.neo4j.driver.async.ResultCursor;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.ExecutionException;
    import java.util.stream.Collectors;
    
    public class Neo4jConnectorTest {
        private Logger logger = LoggerFactory.getLogger(Neo4jConnectorTest.class);
    
        private Driver driver;
        private SessionConfig sessionConfig = SessionConfig.builder()
                //.withDefaultAccessMode(AccessMode.READ)
                .withDatabase("testdb").build();
    
        private Neo4jConnectorTest(String uri,String user,String password) {
            driver = GraphDatabase.driver(uri,AuthTokens.basic(user,password));
        }
    
        public static void main(String... args) throws ExecutionException,InterruptedException {
            Neo4jConnectorTest example = new Neo4jConnectorTest("neo4j://x.x.x.x:7687","neo4j","neo4j");
            List<String> listofProductTitles = example.readProductTitles4();
        }
    
        private List<String> readProductTitles4() throws ExecutionException,InterruptedException {
            AsyncSession session = driver.asyncSession(sessionConfig);
    
            CompletionStage<List<CompletionStage<List<String>>>> listofProductTitles = session
                    .beginTransactionAsync()
                    .thenComposeAsync(tx -> {
                        logger.debug("Transaction created");
    
                        List<CompletionStage<ResultCursor>> list = new ArrayList<>(2);
                        String query = "MERGE (p:Product {id: $id }) ON CREATE SET p.id = $id RETURN p.id";
                        Map<String,Object> parameters = Collections.singletonMap("id","P1");
                        CompletionStage<ResultCursor> r1 = tx.runAsync(query,parameters);
                        list.add(r1);
    
                        query = "MERGE (p:Product {id: $id }) ON CREATE SET p.id = $id RETURN p.id";
                        parameters = Collections.singletonMap("id","P2");
                        CompletionStage<ResultCursor> r2 = tx.runAsync(query,parameters);
                        list.add(r2);
    
                        CompletableFuture<Void> allFutures = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));
    
                        CompletableFuture<List<CompletionStage<List<String>>>> result = allFutures
                                .thenApplyAsync(v -> {
                                    logger.debug("v: {}",v);
    
                                    List<CompletionStage<List<String>>> productList = list
                                            .stream()
                                            .map(stage -> {
                                                return stage
                                                        .thenComposeAsync(resultCursor ->
                                                        {
                                                            return resultCursor
                                                                    .listAsync(record ->
                                                                            record.get(0).asstring()
                                                                    );
                                                        });
                                            })
                                            .collect(Collectors.toList());
    
                                    return CompletableFuture.completedFuture(productList);
                                })
                                .thenComposeAsync(ignore -> {
                                    logger.debug("ignore: {}",ignore);
                                    logger.debug("closing transaction Now..");
                                    return tx
                                            .commitAsync()
                                            .thenComposeAsync((ignore2) -> {
                                                logger.debug("transaction committed successfully: {}",ignore2);
                                                return ignore;
                                            });
    
                                    //return ignore;
                                });
                        return result;
                        //return CompletableFuture.completedFuture(false);
                    })
                    .exceptionally(e -> {
                        logger.error(" 2-- e: {}",e);
                        List<CompletionStage<List<String>>> nn = Collections.emptyList();
                        return nn; //CompletableFuture.completedFuture(Collections.emptyList());
                    })
                    .thenComposeAsync((obj) ->
                            session.closeAsync()
                                    .thenApplyAsync((ignore) -> {
                                        logger.debug("session closed. result: {}",obj);
                                        return obj;
                                    })
                    )
                    .thenComposeAsync((obj) ->
                            driver.closeAsync()
                                    .thenApplyAsync((ignore) -> {
                                        logger.debug("driver closed. result: {}",obj);
                                        return obj;
                                    })
                    )
                    .exceptionally(e -> {
                        logger.error(" ERROR -- e: {}",e.getMessage());
                        List<CompletionStage<List<String>>> nn = Collections.emptyList();
                        return nn; //CompletableFuture.completedFuture(Collections.emptyList());
                    });
    
            return getResult(listofProductTitles);
        }
    
        private List<String> getResult(CompletionStage<List<CompletionStage<List<String>>>> listofProductTitles) throws ExecutionException,InterruptedException {
            CompletionStage<List<String>> finalList = listofProductTitles
                    .thenApplyAsync((x) -> {
    
                        return x.stream()
                                .map(xx -> {
                                    List<String> r;
                                    try {
                                        r = xx.toCompletableFuture().get();
                                    } catch (InterruptedException | ExecutionException e) {
                                        logger.error(" error occurred: {}",e.getMessage());
                                        r = Collections.emptyList();
                                    }
                                    return r;
                                })
                                .flatMap(List::stream)
                                .collect(Collectors.toList());
    
                    });
    
            List<String> output = finalList.toCompletableFuture().get();
            logger.debug("final list: {}",output);
            return output;
        }
    
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。