1 工程目录
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.10.0</version> </dependency>
2 flink 写入 hbase
package com.atguigu.flink.app import java.util import com.atguigu.flink.bean.SensorReading import com.atguigu.flink.source.HbaseSource import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests object ESSinkApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //调用addSource以此来作为数据输入端 val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")) val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading]{ override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { // 构造数据格式 val hashMap = new util.HashMap[String, String]() hashMap.put("data", t.toString) // 创建请求 val indexRequest = Requests .indexRequest() .index("sensor") // 索引是sensor,相当于数据库 .`type`("readingData") // es6必须写这一行代码 .source(hashMap)// 数据源 // 提交数据 requestIndexer.add(indexRequest) } } ) // 设置每一批写入es多少数据 esSinkBuilder.setBulkFlushMaxActions(1) stream.addSink(esSinkBuilder.build()) // 打印流 stream.print() // 执行主程序 env.execute() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。