如何解决如何高效地并行查询 google-cloud-spanner 多线程?
(对不起,这是 TL;DR;但我很绝望,想要彻底!)
我们正在将一项服务从 AWS 迁移到 GCP,并从 DynamoDB 切换到 Cloud Spanner 作为后端数据存储。
数据存储(spanner)包含网络服务用户查询的数据。在生产负载中,被查询的数据在 1% 到 10% 的时间内被找到。我有一个简单的多线程 Java 测试客户端,用于查询我们的服务,只要过去 1 分钟的平均吞吐量增加,就会不断添加新线程。
我的测试客户端在 GCE 虚拟机(64 CPU)上运行,当使用 DynamoDB 数据源时,我可以获得多达 3700 个线程,一旦我们的服务自动扩展到配置的 pod,平均可以推动 50k req/s最大节点数。每个线程每 1000 个请求从 Dynamo 读取 100 个哈希值(10% 的命中率)。
我现在需要将我的 Java 客户端切换到查询 Spanner 以获取 10% 的请求中使用的数据。我的查询通常如下所示:
SELECT A,B,C FROM data_table LIMIT 250 OFFSET XXX
理论上,我希望每个线程选择唯一行的块。我使用 OFFSET 从唯一位置开始读取每个线程,一旦每个记录块用完,我将 OFFSET 增加到开始偏移 + totalRows 并选择另一个数据块。
我意识到这个查询可能不会转化为每个实现,但这个概念应该是正确的,即每个线程都可以在线程的生命周期内查询 Spanner 以获得唯一的数据集。
我尝试将 java-spanner-jdbc 与 c3p0 连接池一起使用,并仅通过标准 DriverManager.getConnection() 路由。我使用了 min/max Session 配置以及 numChannels,但似乎没有什么能帮助我扩大规模。 TBH,我还是不明白会话和渠道之间的相关性。
我还使用 singleUseReadOnlyTransaction()、batchReadOnlyTransaction() 和最近的 txn.partitionQuery() 尝试了原生 SpannerDB 客户端。
因为 partitionQuery() 感觉很像 DynamoDB 代码,所以这感觉是正确的方向,但是因为我的查询(基于 https://cloud.google.com/spanner/docs/reads 处的“并行读取数据”示例)有一个 LIMIT 子句,我收到错误:
com.google.cloud.spanner.SpannerException:INVALID_ARGUMENT: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: 查询不是 root 可分区,因为它在根目录中没有 DistributedUnion。 请运行 EXPLAIN 以获取查询计划详细信息。
删除 LIMIT 子句可以解决这个问题,但随后的查询需要一个永恒的时间!
所以问题是,如果 partitionQuery() 路由是正确的,我如何使用“分页”限制进行并行查询?如果这不是最佳途径,我应该使用什么来获得每个线程具有唯一数据集的最佳并行读取吞吐量?
[编辑] 根据 Knut Olav Loite 在下面的评论,分区或批量查询不是正确的方法,所以我又回到了单次使用的只读查询。
这是我创建spannerDbClient的代码:
RetrySettings retrySettings = RetrySettings.newBuilder()
.setInitialRpcTimeout(Duration.ofSeconds(SPANNER_INITIAL_TIMEOUT_RETRY_SECONDS))
.setMaxRpcTimeout(Duration.ofSeconds(SPANNER_MAX_TIMEOUT_RETRY_SECONDS))
.setMaxAttempts(SPANNER_MAX_RETRY_ATTEMPTS)
.setTotalTimeout(Duration.ofSeconds(SPANNER_TOTAL_TIMEOUT_RETRY_SECONDS))
.build();
SpannerOptions.Builder builder = SpannerOptions.newBuilder()
.setSessionPoolOption(SessionPoolOptions.newBuilder()
.setFailIfPoolExhausted()
.setMinSessions(SPANNER_MIN_SESSIONS)
.setMaxSessions(SPANNER_MAX_SESSIONS)
.build()
)
.setNumChannels(SPANNER_NUM_CHANNELS);
if (credentials != null) {
builder.setCredentials(credentials);
}
builder.getSpannerStubSettingsBuilder()
.executeSqlSettings()
.setRetryableCodes(StatusCode.Code.DEADLINE_EXCEEDED,StatusCode.Code.UNAVAILABLE)
.setRetrySettings(retrySettings);
spanner = builder.build().getService();
databaseId = DatabaseId.of(
projectName,instanceName,databaseName
);
spannerDbClient = spanner.getDatabaseClient(databaseId);
这是我执行实际查询的方法:
List<Entry> entry = new ArrayList<>();
try (ResultSet resultSet = spannerDbClient
.singleUseReadOnlyTransaction(TimestampBound.ofMaxStaleness(5,TimeUnit.SECONDS))
.executeQuery(Statement.newBuilder(String.format("SELECT * from %s LIMIT %d OFFSET %d",tableName,limit,offset)).build())) {
while (resultSet.next()) {
entry.add(getEntryFromResultSet(resultSet));
}
}
我添加了计时器代码来显示查询的时间,这就是 50 个线程的情况。这是使用 maxSession=50,minSession=50,numChannels=4(默认)的共享 spannerDbClient 实例:
--> [0h:00m:00s] Throughput: Total 0,Interval 0 (0 req/s),0/0 threads reporting
[tId:099][00:00:00.335] Spanner query,LIMIT 250 OFFSET 99000
[tId:146][00:00:00.382] Spanner query,LIMIT 250 OFFSET 146000
[tId:140][00:00:00.445] Spanner query,LIMIT 250 OFFSET 140000
[tId:104][00:00:00.494] Spanner query,LIMIT 250 OFFSET 104000
[tId:152][00:00:00.363] Spanner query,LIMIT 250 OFFSET 152000
[tId:149][00:00:00.643] Spanner query,LIMIT 250 OFFSET 149000
[tId:143][00:00:00.748] Spanner query,LIMIT 250 OFFSET 143000
[tId:163][00:00:00.682] Spanner query,LIMIT 250 OFFSET 163000
[tId:155][00:00:00.799] Spanner query,LIMIT 250 OFFSET 155000
[tId:166][00:00:00.872] Spanner query,LIMIT 250 OFFSET 166000
[tId:250][00:00:00.870] Spanner query,LIMIT 250 OFFSET 250000
[tId:267][00:00:01.319] Spanner query,LIMIT 250 OFFSET 267000
[tId:229][00:00:01.917] Spanner query,LIMIT 250 OFFSET 229000
[tId:234][00:00:02.256] Spanner query,LIMIT 250 OFFSET 234000
[tId:316][00:00:02.401] Spanner query,LIMIT 250 OFFSET 316000
[tId:246][00:00:02.844] Spanner query,LIMIT 250 OFFSET 246000
[tId:312][00:00:02.989] Spanner query,LIMIT 250 OFFSET 312000
[tId:176][00:00:03.497] Spanner query,LIMIT 250 OFFSET 176000
[tId:330][00:00:03.140] Spanner query,LIMIT 250 OFFSET 330000
[tId:254][00:00:03.879] Spanner query,LIMIT 250 OFFSET 254000
[tId:361][00:00:03.816] Spanner query,LIMIT 250 OFFSET 361000
[tId:418][00:00:03.635] Spanner query,LIMIT 250 OFFSET 418000
[tId:243][00:00:04.503] Spanner query,LIMIT 250 OFFSET 243000
[tId:414][00:00:04.006] Spanner query,LIMIT 250 OFFSET 414000
[tId:324][00:00:04.457] Spanner query,LIMIT 250 OFFSET 324000
[tId:498][00:00:03.865] Spanner query,LIMIT 250 OFFSET 498000
[tId:252][00:00:04.945] Spanner query,LIMIT 250 OFFSET 252000
[tId:494][00:00:04.211] Spanner query,LIMIT 250 OFFSET 494000
[tId:444][00:00:04.780] Spanner query,LIMIT 250 OFFSET 444000
[tId:422][00:00:04.951] Spanner query,LIMIT 250 OFFSET 422000
[tId:397][00:00:05.234] Spanner query,LIMIT 250 OFFSET 397000
[tId:420][00:00:05.106] Spanner query,LIMIT 250 OFFSET 420000
[tId:236][00:00:05.985] Spanner query,LIMIT 250 OFFSET 236000
[tId:406][00:00:05.429] Spanner query,LIMIT 250 OFFSET 406000
[tId:449][00:00:05.291] Spanner query,LIMIT 250 OFFSET 449000
[tId:437][00:00:05.929] Spanner query,LIMIT 250 OFFSET 437000
[tId:341][00:00:06.611] Spanner query,LIMIT 250 OFFSET 341000
[tId:475][00:00:06.223] Spanner query,LIMIT 250 OFFSET 475000
[tId:490][00:00:06.186] Spanner query,LIMIT 250 OFFSET 490000
[tId:416][00:00:06.460] Spanner query,LIMIT 250 OFFSET 416000
[tId:328][00:00:07.446] Spanner query,LIMIT 250 OFFSET 328000
[tId:322][00:00:07.679] Spanner query,LIMIT 250 OFFSET 322000
[tId:158][00:00:09.357] Spanner query,LIMIT 250 OFFSET 158000
[tId:496][00:00:08.183] Spanner query,LIMIT 250 OFFSET 496000
[tId:256][00:00:09.250] Spanner query,LIMIT 250 OFFSET 256000
--> [0h:00m:10s] Throughput: Total 9848,Interval +9848 (984 req/s),44/50 threads reporting
[tId:492][00:00:08.646] Spanner query,LIMIT 250 OFFSET 492000
[tId:390][00:00:09.810] Spanner query,LIMIT 250 OFFSET 390000
[tId:366][00:00:10.142] Spanner query,LIMIT 250 OFFSET 366000
[tId:320][00:00:10.451] Spanner query,LIMIT 250 OFFSET 320000
[tId:318][00:00:10.619] Spanner query,LIMIT 250 OFFSET 318000
--> [0h:00m:20s] Throughput: Total 56051,Interval +46203 (4620 req/s),50/50 threads reporting
--> [0h:00m:30s] Throughput: Total 102172,Interval +46121 (4612 req/s),50/50 threads reporting
请注意,无论偏移量如何,查询时间只会增加,并且在开始报告结果之前,初始 spanner 查询需要 10 到 20 秒才能返回所有 50 个线程的数据。如果我将限制增加到 1000,那么所有 50 个线程几乎需要 2 分钟才能从 Spanner 返回结果。
将其与 DynamoDb 等效项(限制为 1000 除外)进行比较,其中所有查询在不到 1 秒内返回并且所有 50 个线程都在显示 10 秒状态更新之前报告结果:
--> [0h:00m:00s] Throughput: Total 0,0/0 threads reporting
[tId:045] Dynamo query,LIMIT 1000 [00:00:00.851]
[tId:138] Dynamo query,LIMIT 1000 [00:00:00.463]
[tId:183] Dynamo query,LIMIT 1000 [00:00:00.121]
[tId:122] Dynamo query,LIMIT 1000 [00:00:00.576]
[tId:095] Dynamo query,LIMIT 1000 [00:00:00.708]
[tId:072] Dynamo query,LIMIT 1000 [00:00:00.778]
[tId:115] Dynamo query,LIMIT 1000 [00:00:00.619]
[tId:166] Dynamo query,LIMIT 1000 [00:00:00.296]
[tId:058] Dynamo query,LIMIT 1000 [00:00:00.814]
[tId:179] Dynamo query,LIMIT 1000 [00:00:00.242]
[tId:081] Dynamo query,LIMIT 1000 [00:00:00.745]
[tId:106] Dynamo query,LIMIT 1000 [00:00:00.671]
[tId:162] Dynamo query,LIMIT 1000 [00:00:00.348]
[tId:035] Dynamo query,LIMIT 1000 [00:00:00.889]
[tId:134] Dynamo query,LIMIT 1000 [00:00:00.513]
[tId:187] Dynamo query,LIMIT 1000 [00:00:00.090]
[tId:158] Dynamo query,LIMIT 1000 [00:00:00.405]
[tId:191] Dynamo query,LIMIT 1000 [00:00:00.095]
[tId:195] Dynamo query,LIMIT 1000 [00:00:00.096]
[tId:199] Dynamo query,LIMIT 1000 [00:00:00.144]
[tId:203] Dynamo query,LIMIT 1000 [00:00:00.112]
[tId:291] Dynamo query,LIMIT 1000 [00:00:00.102]
[tId:303] Dynamo query,LIMIT 1000 [00:00:00.094]
[tId:312] Dynamo query,LIMIT 1000 [00:00:00.101]
[tId:318] Dynamo query,LIMIT 1000 [00:00:00.075]
[tId:322] Dynamo query,LIMIT 1000 [00:00:00.086]
[tId:326] Dynamo query,LIMIT 1000 [00:00:00.096]
[tId:330] Dynamo query,LIMIT 1000 [00:00:00.085]
[tId:334] Dynamo query,LIMIT 1000 [00:00:00.114]
[tId:342] Dynamo query,LIMIT 1000 [00:00:00.096]
[tId:391] Dynamo query,LIMIT 1000 [00:00:00.081]
[tId:395] Dynamo query,LIMIT 1000 [00:00:00.088]
[tId:406] Dynamo query,LIMIT 1000 [00:00:00.088]
[tId:415] Dynamo query,LIMIT 1000 [00:00:00.078]
[tId:421] Dynamo query,LIMIT 1000 [00:00:00.089]
[tId:425] Dynamo query,LIMIT 1000 [00:00:00.068]
[tId:429] Dynamo query,LIMIT 1000 [00:00:00.088]
[tId:433] Dynamo query,LIMIT 1000 [00:00:00.105]
[tId:437] Dynamo query,LIMIT 1000 [00:00:00.092]
[tId:461] Dynamo query,LIMIT 1000 [00:00:00.110]
[tId:483] Dynamo query,LIMIT 1000 [00:00:00.071]
[tId:491] Dynamo query,LIMIT 1000 [00:00:00.078]
[tId:495] Dynamo query,LIMIT 1000 [00:00:00.075]
[tId:503] Dynamo query,LIMIT 1000 [00:00:00.064]
[tId:499] Dynamo query,LIMIT 1000 [00:00:00.108]
[tId:514] Dynamo query,LIMIT 1000 [00:00:00.163]
[tId:518] Dynamo query,LIMIT 1000 [00:00:00.135]
[tId:529] Dynamo query,LIMIT 1000 [00:00:00.163]
[tId:533] Dynamo query,LIMIT 1000 [00:00:00.079]
[tId:541] Dynamo query,LIMIT 1000 [00:00:00.060]
--> [0h:00m:10s] Throughput: Total 24316,Interval +24316 (2431 req/s),50/50 threads reporting
--> [0h:00m:20s] Throughput: Total 64416,Interval +40100 (4010 req/s),50/50 threads reporting
我在配置中遗漏了什么吗?如果我让它自动缩放,性能问题会被大大放大。
解决方法
编辑基于附加信息:
正如下面 Panagiotis Voulgaris 所指出的,我认为这种情况下的问题与客户端配置无关,而是与查询本身有关。查询似乎很慢,尤其是对于较高的 OFFSET
值。我用一个大约有 1,000,000 行的表进行了尝试,对于 900,000 的 OFFSET
值,单个查询运行了 4-5 秒。当您向上扩展时,问题变得更糟的原因可能是您用大量需要很长时间的并行查询使后端不堪重负,而不是因为客户端配置错误。
如果您可以重写查询以根据主键值选择一系列行,而不是使用 LIMIT x OFFSET y
构造,那么最好的方法是。因此,您的查询将如下所示:
SELECT A,B,C
FROM data_table
WHERE A >= x AND A < (x+250)
如果您的键列包含值之间的间隙,这显然不能保证您在每个分区中获得恰好 250 行。在这种情况下,您还可以稍微增加 +250
值以获得合理的分区。
如果由于键值是完全随机的值(或分布不均匀)而无法进行上述操作,那么我认为以下查询会比您当前的查询更有效:
SELECT A,C
FROM data_table
WHERE A >= (
SELECT ANY_VALUE(A)
FROM data_table
GROUP BY A
LIMIT 1 OFFSET y
)
ORDER BY A
LIMIT 250
我不太清楚在这种情况下您的最终目标是什么,这在具体问题上有所不同:
...如果 partitionQuery() 路由是正确的 (?)
BatchReadOnlyTransaction
和 partitionQuery()
路由用于读取单个时间点的大型数据集。例如,这可能是当您想要创建表中所有数据的转储时。 Spanner 将为您分区查询并返回分区列表。然后每个分区可以由单独的线程(甚至单独的 VM)处理。这可以说会自动替换查询的 LIMIT 250 OFFSET xxxx
部分,因为 Spanner 根据表中的实际数据创建不同的分区。
但是,如果您在这里的最终目标是模拟生产负载,那么 BatchReadOnlyTransaction
不是要遵循的路线。
如果您想要高效地查询数据集,那么您应该确保使用 single-use read-only transaction 进行查询。这就是您已经对本机客户端执行的操作。此外,只要连接处于自动提交模式,JDBC 驱动程序也将自动使用一次性只读事务进行查询。如果关闭自动提交,驱动程序将在您执行查询时自动启动事务。
关于会议和渠道:
- 会话有点类似于您通常所说的连接。 JDBC 驱动程序和本机客户端都使用内部会话池。在您的案例中,重要的部分是随时执行的并行读取次数。一个会话可以随时处理一个事务(即一个读操作)。因此,您将需要与并行读取操作一样多的会话。我假设在您使用 c3po 的设置中,您正在为正在读取的每个线程分配一个 JDBC 连接。在这种情况下,最大会话数应设置为等于 c3po 池中的最大连接数。
- 通道:通道是 gRPC 使用的低级网络连接。一个通道可以并行处理多个并发请求。据我所知,默认最大值是每个通道 100 个同时请求,因此您应该为每 100 个会话使用 1 个通道。这也是 JDBC 驱动程序和本机客户端库中的默认设置。
关于(示例)查询:
如上所述,我不太清楚这只是一个测试设置,还是一个实际的生产示例。但是,我希望查询包含显式 ORDER BY
子句以确保数据按预期顺序返回,并且 ORDER BY
子句显然应该使用索引列。
最后:是不是后端每次查询响应慢导致的问题?或者后端基本上是空闲的,客户端是否无法真正增加查询?
,我怀疑为了产生准确的结果
SELECT A,C FROM data_table LIMIT 250 OFFSET XXX
后端需要获取 250 + XXX 行,然后跳过其中的 XXX。因此,如果 XXX 非常大,这可能是一个非常昂贵的查询,并且需要扫描一大块 data_table
。
改为限制表键是否有意义?类似:
SELECT A,C FROM data_table WHERE TableKey1 > 'key_restriction' LIMIT 250;
此类查询最多只能读取 250 行。
独立地,最好了解此类查询对您的生产工作负载的代表性。您能解释一下您期望在生产中使用什么类型的查询吗?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。