如何解决使用 Spring 从 SQL Server 到 Elasticsearch 的反应式索引
我想使用反应式 spring 将数据从 SQL Server 数据库(使用 R2DBC)索引到 Elasticsearch(使用反应式 Elasticsearch)。我有一个与 SQL Server 中的表相对应的实体类,我有一个与 Elasticsearch 索引的文档相对应的模型类。
对于两个数据库(SQL Server 和 Elasticsearch),我都创建了存储库:
SQL Server 存储库:
@Repository
public interface ProductRepository extends ReactiveCrudRepository<ProductTbl,Long> {
}
用于 Elasticsearch 的存储库:
@Repository
public interface ProductElasticRepository extends ReactiveElasticsearchRepository<Product,String> {
}
难道我不能通过调用 productElasticRepository.saveAll(productRepository.findAll())
来索引所有文档吗?
它不太工作。它要么超过 DataBufferLimit,因此引发异常,要么存在 ReadTimeOutException。执行时,我可以看到 R2DBC 为 SQL Server 数据库的每个数据条目创建 RowToken。然而,对 Elasticsearch 客户端的 POST 只会在获得所有行之前发生,这似乎不是反应代码应该如何工作?我肯定在这里遗漏了一些东西,希望有人能提供帮助。
解决方法
我不知道你的情况到底是什么问题,但我记得我过去做过类似的事情,我记得我遇到过的一些问题。
这取决于您需要迁移多少数据。如果有数百万行,那么它肯定会失败。在这种情况下,您可以创建一个将有一个窗口的算法,假设有 5000 行,读取它们,然后使用批量插入将它们写入 elasticsearch。然后执行此操作,直到没有更多行可供读取。
我遇到的另一个问题是 ElasticSearch WebClient 没有配置为支持我在正文中发送的数据量。
另一个:默认情况下,ElasticSearch 的队列容量为 200,如果超过该容量,则会出现错误。如果您以某种方式尝试并行插入数据,就会发生这种情况。
另一个:关系型数据库的连接如果长时间保持打开会在某个时候中断
请记住,elasticsearch 默认不是响应式的。目前有一个响应式驱动程序,但不是官方的。
另一个:在进行迁移时,尝试在没有那么多分片的单个节点上写入。
,你应该做类似的事情
productRepository.findAll()
.buffer(1000)
.onBackpressureBuffer()
.flatMap(list -> productElasticRepository.saveAll(list))
此外,如果您收到 ReadTimeOutException,请增加套接字超时
spring:
data:
elasticsearch:
client:
reactive:
socket-timeout: 5m
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。