如何解决无法使用 alpakka 从 elasticsearch 读取数据
我试图使用 alpakka 从 elasticsearch 读取文档(存储在 ES 中的 json 数据)。
我得到了这个 alpakka-Elasticsearch。
这里说你可以使用
ElasticsearchSource,ElasticsearchFlow or the ElasticsearchSink
。
我试图实现 ElasticsearchSource 方法。所以我的代码看起来像这样
val url = "http://localhost:9200"
val connectionSettings = ElasticsearchConnectionSettings(url)
val sourceSettings = ElasticsearchSourceSettings(connectionSettings)
val elasticsearchParamsV7 = ElasticsearchParams.V7("category_index")
val copy = ElasticsearchSource
.typed[CategoryData](
elasticsearchParamsV7,query = query,sourceSettings
).map { message: ReadResult[CategoryData] =>
println("Inside message==================> "+message)
WriteMessage.createIndexMessage(message.id,message.source)
} .runWith(
ElasticsearchSink.create[CategoryData](
elasticsearchParamsV7,ElasticsearchWriteSettings(connectionSettings)
)
)
println("Final data==============>. "+copy)
最后,复制返回 Future[Done]
的值。
但是我无法从 ES 读取数据。
有什么我遗漏的吗?
还有其他方法可以使用 akka http 客户端 api 来做同样的事情吗?
在 akka 中使用 ES 的首选方式是什么?
解决方法
要从 Elasticsearch 中读取数据,像这样应该就足够了:
val matchAllQuery = """{"match_all": {}}"""
val result = ElasticsearchSource
.typed[CategoryData](
elasticsearchParamsV7,query = matchAllQuery,sourceSettings
).map { message: ReadResult[CategoryData] =>
println("Read message==================> "+message)
}.runWith(Sink.seq)
result.onComplete(res => res.foreach(col => println(s"Read: ${col.size} records")))
如果 CategoryData
类型与索引中存储的内容不正确匹配,则查询可能不会返回结果。
如果有疑问,可以读取原始 JSON:
val elasticsearchSourceRaw = ElasticsearchSource
.create(
elasticsearchParamsV7,settings = sourceSettings
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。