如何解决使用jdbc将行批量插入Spanner时的低加载性能
背景:我正在尝试将TSV格式的数据文件(从MySQL数据库转储)加载到GCP Spanner表中。
- 客户端库:官方的Spanner JDBC依赖项v1.15.0
- 表模式:2个字符串型列和10个int型列
- GCP Spanner实例:配置为具有5个节点的多区域nam6
我的加载程序在GCP VM中运行,并且是访问Spanner实例的排他性客户端。自动提交已启用。批处理插入是我的程序执行的唯一DML操作,批处理大小约为1500。在每次提交中,它完全用尽了突变限制(即20000)。同时,提交大小小于5MB(值的两个字符串型列中的一个是小尺寸)。根据主键的第一列对行进行分区,以便每次提交都可以发送到很少的分区,以提高性能。
通过以上所有配置和优化,插入速率仅为每秒1k行。这让我很失望,因为我要插入的行超过8亿行。我确实注意到the official doc提到了大约。多区域Spanner实例的峰值写入(总计QPS)为1800。
所以我在这里有两个问题:
- 考虑到如此低的峰值写入QPS,是否意味着GCP不希望或不支持客户将大型数据集迁移到多区域Spanner实例?
- 我发现Spanner监控的读取延迟很高。我没有任何阅读要求。我的猜测是,写行Spanner需要首先读取并检查是否存在具有相同主键的行。如果我的猜测是正确的,为什么要花那么多时间?如果没有,我可以得到有关这些读取操作如何发生的任何指导吗?
解决方法
我不太清楚您是如何设置正在加载数据的客户端应用程序的。我的最初印象是您的客户端应用程序可能无法并行执行足够的事务。通常,您应该能够以每秒1000行以上的速度插入数据,但是这将要求您确实并行执行多个事务(可能来自多个VM)。我使用下面的简单示例测试了从本地计算机到单个节点Spanner实例的负载吞吐量,这使我的吞吐量约为1,500行/秒。
使用在与Spanner实例相同的网络区域中的一个或多个VM中运行的客户端应用程序进行的多节点设置应该能够实现更高的数量。
import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
while (true) {
try (PreparedStatement ps =
connection.prepareStatement("INSERT INTO Test (Id,Col1,Col2) VALUES (?,?,?)")) {
for (int i = 0; i < 150; i++) {
ps.setLong(1,rnd.nextLong());
ps.setString(2,randomString(100));
ps.setString(3,randomString(100));
ps.addBatch();
rowCount.incrementAndGet();
}
ps.executeBatch();
}
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
您还可以尝试调整其他几项以获得更好的结果:
- 减少每批的行数可以产生更好的总体结果。
- 如果可能,使用
InsertOrUpdate
变异对象比使用DML语句要有效得多(请参见下面的示例)。
使用Mutation
代替DML的示例:
import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertOrUpdateMutationRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertOrUpdateMutationRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
CloudSpannerJdbcConnection csConnection =
connection.unwrap(CloudSpannerJdbcConnection.class);
while (true) {
ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
for (int i = 0; i < 150; i++) {
builder.add(
Mutation.newInsertOrUpdateBuilder("Test")
.set("Id")
.to(rnd.nextLong())
.set("Col1")
.to(randomString(100))
.set("Col2")
.to(randomString(100))
.build());
rowCount.incrementAndGet();
}
csConnection.write(builder.build());
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
上面的简单示例为我提供了大约35,000行/秒的吞吐量,而无需进行任何进一步的调整。
附加信息2020-08-21 :突变对象比(批量)DML语句更有效的原因是,Cloud Spanner在内部将DML语句转换为读取查询,然后用于创建突变。需要对批处理中的每个DML语句执行此转换,这意味着具有1,500个简单插入语句的DML批处理将触发1,500个(小型)读取查询,并且需要转换为1,500个变异。这很可能也是您在监控中看到读取延迟的原因。
否则,您是否愿意分享一些有关客户端应用程序的外观以及您正在运行多少实例的信息?
,要插入超过8亿行,并且看到您是Java程序员,我是否建议使用Beam on Dataflow?
spanner writer in Beam的设计使其写入时尽可能地高效-通过相似的键对行进行分组,然后按需对行进行批处理。 Beam on Dataflow还可以使用多个辅助VM并行执行多个文件读取和扳手写入...
使用多区域扳手实例,您应该能够获得大约1800 rows per node per second的插入速度(如Knut的答复所建议的,如果行很小且是批处理的,则插入速度会更高),并且如果有5个扳手节点,则大概可以有10个和20个并行运行的导入器线程-无论是使用导入器程序还是使用Dataflow。
(披露:我是Beam SpannerIO的维护者)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。