如何解决在一个事务中执行多个 neo4j 密码查询的最佳实践 - 使用 Async API
我已经编写了测试代码从java应用程序连接到neo4j服务器并在一个事务中执行两个密码查询。我没有在neo4j Documentation中找到执行两个查询和检索结果的详细示例
虽然代码有效,但我的问题是:
- 这些是使用 neo4j(AsyncSession,Transaction) 和 java(CompletionStage/ CompletableFuture) 接口的最佳实践吗?
- 如果我在最后使用 CompletableFuture.get(),查询是否异步执行?
其他细节:
- Neo4j 版本:v4.2.1
- Neo4j 驱动程序版本:4.1.1
- Java jdk v8
package com.abc.test;
import com.abc.Logger;
import com.abc.LoggerFactory;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.ResultCursor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class Neo4jConnectorTest {
private Logger logger = LoggerFactory.getLogger(Neo4jConnectorTest.class);
private Driver driver;
private SessionConfig sessionConfig = SessionConfig.builder()
//.withDefaultAccessMode(AccessMode.READ)
.withDatabase("testdb").build();
private Neo4jConnectorTest(String uri,String user,String password) {
driver = GraphDatabase.driver(uri,AuthTokens.basic(user,password));
}
public static void main(String... args) throws ExecutionException,InterruptedException {
Neo4jConnectorTest example = new Neo4jConnectorTest("neo4j://x.x.x.x:7687","neo4j","neo4j");
List<String> listofProductTitles = example.readProductTitles4();
}
private List<String> readProductTitles4() throws ExecutionException,InterruptedException {
AsyncSession session = driver.asyncSession(sessionConfig);
CompletionStage<List<CompletionStage<List<String>>>> listofProductTitles = session
.beginTransactionAsync()
.thenComposeAsync(tx -> {
logger.debug("Transaction created");
List<CompletionStage<ResultCursor>> list = new ArrayList<>(2);
String query = "MERGE (p:Product {id: $id }) ON CREATE SET p.id = $id RETURN p.id";
Map<String,Object> parameters = Collections.singletonMap("id","P1");
CompletionStage<ResultCursor> r1 = tx.runAsync(query,parameters);
list.add(r1);
query = "MERGE (p:Product {id: $id }) ON CREATE SET p.id = $id RETURN p.id";
parameters = Collections.singletonMap("id","P2");
CompletionStage<ResultCursor> r2 = tx.runAsync(query,parameters);
list.add(r2);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));
CompletableFuture<List<CompletionStage<List<String>>>> result = allFutures
.thenApplyAsync(v -> {
logger.debug("v: {}",v);
List<CompletionStage<List<String>>> productList = list
.stream()
.map(stage -> {
return stage
.thenComposeAsync(resultCursor ->
{
return resultCursor
.listAsync(record ->
record.get(0).asstring()
);
});
})
.collect(Collectors.toList());
return CompletableFuture.completedFuture(productList);
})
.thenComposeAsync(ignore -> {
logger.debug("ignore: {}",ignore);
logger.debug("closing transaction Now..");
return tx
.commitAsync()
.thenComposeAsync((ignore2) -> {
logger.debug("transaction committed successfully: {}",ignore2);
return ignore;
});
//return ignore;
});
return result;
//return CompletableFuture.completedFuture(false);
})
.exceptionally(e -> {
logger.error(" 2-- e: {}",e);
List<CompletionStage<List<String>>> nn = Collections.emptyList();
return nn; //CompletableFuture.completedFuture(Collections.emptyList());
})
.thenComposeAsync((obj) ->
session.closeAsync()
.thenApplyAsync((ignore) -> {
logger.debug("session closed. result: {}",obj);
return obj;
})
)
.thenComposeAsync((obj) ->
driver.closeAsync()
.thenApplyAsync((ignore) -> {
logger.debug("driver closed. result: {}",obj);
return obj;
})
)
.exceptionally(e -> {
logger.error(" ERROR -- e: {}",e.getMessage());
List<CompletionStage<List<String>>> nn = Collections.emptyList();
return nn; //CompletableFuture.completedFuture(Collections.emptyList());
});
return getResult(listofProductTitles);
}
private List<String> getResult(CompletionStage<List<CompletionStage<List<String>>>> listofProductTitles) throws ExecutionException,InterruptedException {
CompletionStage<List<String>> finalList = listofProductTitles
.thenApplyAsync((x) -> {
return x.stream()
.map(xx -> {
List<String> r;
try {
r = xx.toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
logger.error(" error occurred: {}",e.getMessage());
r = Collections.emptyList();
}
return r;
})
.flatMap(List::stream)
.collect(Collectors.toList());
});
List<String> output = finalList.toCompletableFuture().get();
logger.debug("final list: {}",output);
return output;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。