使用jdbc将行批量插入Spanner时的低加载性能

如何解决使用jdbc将行批量插入Spanner时的低加载性能

背景:我正在尝试将TSV格式的数据文件(从MySQL数据库转储)加载到GCP Spanner表中。

  • 客户端库:官方的Spanner JDBC依赖项v1.15.0
  • 表模式:2个字符串型列和10个int型列
  • GCP Spanner实例:配置为具有5个节点的多区域nam6

我的加载程序在GCP VM中运行,并且是访问Spanner实例的排他性客户端。自动提交已启用。批处理插入是我的程序执行的唯一DML操作,批处理大小约为1500。在每次提交中,它完全用尽了突变限制(即20000)。同时,提交大小小于5MB(值的两个字符串型列中的一个是小尺寸)。根据主键的第一列对行进行分区,以便每次提交都可以发送到很少的分区,以提高性能。

通过以上所有配置和优化,插入速率仅为每秒1k行。这让我很失望,因为我要插入的行超过8亿行。我确实注意到the official doc提到了大约。多区域Spanner实例的峰值写入(总计QPS)为1800。

所以我在这里有两个问题:

  1. 考虑到如此低的峰值写入QPS,是否意味着GCP不希望或不支持客户将大型数据集迁移到多区域Spanner实例?
  2. 我发现Spanner监控的读取延迟很高。我没有任何阅读要求。我的猜测是,写行Spanner需要首先读取并检查是否存在具有相同主键的行。如果我的猜测是正确的,为什么要花那么多时间?如果没有,我可以得到有关这些读取操作如何发生的任何指导吗?

    screenshot

解决方法

我不太清楚您是如何设置正在加载数据的客户端应用程序的。我的最初印象是您的客户端应用程序可能无法并行执行足够的事务。通常,您应该能够以每秒1000行以上的速度插入数据,但是这将要求您确实并行执行多个事务(可能来自多个VM)。我使用下面的简单示例测试了从本地计算机到单个节点Spanner实例的负载吞吐量,这使我的吞吐量约为1,500行/秒。

使用在与Spanner实例相同的网络区域中的一个或多个VM中运行的客户端应用程序进行的多节点设置应该能够实现更高的数量。

import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        while (true) {
          try (PreparedStatement ps =
              connection.prepareStatement("INSERT INTO Test (Id,Col1,Col2) VALUES (?,?,?)")) {
            for (int i = 0; i < 150; i++) {
              ps.setLong(1,rnd.nextLong());
              ps.setString(2,randomString(100));
              ps.setString(3,randomString(100));
              ps.addBatch();
              rowCount.incrementAndGet();
            }
            ps.executeBatch();
          }
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

您还可以尝试调整其他几项以获得更好的结果:

  • 减少每批的行数可以产生更好的总体结果。
  • 如果可能,使用InsertOrUpdate变异对象比使用DML语句要有效得多(请参见下面的示例)。

使用Mutation代替DML的示例:

import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc {

  public static void main(String[] args) {
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) {
      executor.submit(new InsertOrUpdateMutationRunnable());
    }
  }

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertOrUpdateMutationRunnable implements Runnable {
    @Override
    public void run() {
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
        CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
        CloudSpannerJdbcConnection csConnection =
            connection.unwrap(CloudSpannerJdbcConnection.class);
        while (true) {
          ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
          for (int i = 0; i < 150; i++) {
            builder.add(
                Mutation.newInsertOrUpdateBuilder("Test")
                    .set("Id")
                    .to(rnd.nextLong())
                    .set("Col1")
                    .to(randomString(100))
                    .set("Col2")
                    .to(randomString(100))
                    .build());
            rowCount.incrementAndGet();
          }
          csConnection.write(builder.build());
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        }
        }
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
    }

    private final Random rnd = new Random();

    private String randomString(int maxLength) {
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    }
  }
}

上面的简单示例为我提供了大约35,000行/秒的吞吐量,而无需进行任何进一步的调整。

附加信息2020-08-21 :突变对象比(批量)DML语句更有效的原因是,Cloud Spanner在内部将DML语句转换为读取查询,然后用于创建突变。需要对批处理中的每个DML语句执行此转换,这意味着具有1,500个简单插入语句的DML批处理将触发1,500个(小型)读取查询,并且需要转换为1,500个变异。这很可能也是您在监控中看到读取延迟的原因。

否则,您是否愿意分享一些有关客户端应用程序的外观以及您正在运行多少实例的信息?

,

要插入超过8亿行,并且看到您是Java程序员,我是否建议使用Beam on Dataflow?

spanner writer in Beam的设计使其写入时尽可能地高效-通过相似的键对行进行分组,然后按需对行进行批处理。 Beam on Dataflow还可以使用多个辅助VM并行执行多个文件读取和扳手写入...

使用多区域扳手实例,您应该能够获得大约1800 rows per node per second的插入速度(如Knut的答复所建议的,如果行很小且是批处理的,则插入速度会更高),并且如果有5个扳手节点,则大概可以有10个和20个并行运行的导入器线程-无论是使用导入器程序还是使用Dataflow。

(披露:我是Beam SpannerIO的维护者)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res