微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark 写入 es

spark 2.4

es 7.10.2

Scala2.11  

<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>7.10.2</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>7.10.0</version>
        </dependency>

        <dependency>
            <groupId>io.searchBox</groupId>
            <artifactId>jest</artifactId>
            <version>6.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.4</version>
        </dependency>

        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.0.1</version>
        </dependency>

================

package com

import com.constant.PropConstants
import com.javaUtil.PropertieUtil
import io.searchBox.client.JestClient
import io.searchBox.core.{Bulk, BulkResult, Index}
import org.apache.hadoop.security.UserGroupinformation
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.elasticsearch.spark.sql.Essparksql

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Properties}

/**

 */
object center_materiel_hotspot2es {

  def main(args: Array[String]): Unit = {
    val log: Logger = Logger.getRootLogger
    //读取集群配置文件
    val prop: Properties = PropertieUtil.load("config.properties")
    //本地测试读文件
//    val prop: Properties = PropertieUtil.getProperties("/config.properties")

    //读hive 的Kerberos认证
    System.setProperty("java.security.krb5.conf", prop.getProperty(PropConstants.KRB5_CONF_PATH))
    System.setProperty("HADOOP_USER_NAME", prop.getProperty(PropConstants.HADOOP_USER_NAME))
    System.setProperty("user.name", prop.getProperty(PropConstants.USER_NAME))
    UserGroupinformation.loginUserFromKeytab(
      prop.getProperty(PropConstants.KEYTAB_NAME), prop.getProperty(PropConstants.KEYTAB_FILE_PATH)
    )

    val session: SparkSession = SparkSession.builder()
//      .master("local[9]")
//      .config("spark.testing.memory","4718592000")
//      .appName("spark to es")
//      .config("spark.yarn.am.waitTime", "1000")
      .config("spark.hadoop.hive.exec.dynamic.partition", "true")//开启动态分区
      .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")//开启动态分区
      .enableHiveSupport() //支持hive
      .getorCreate()

    import session.implicits._


    //首先处理
    val dataFrame: DataFrame = session.sql(
      """
        |select id,create_by,create_at,update_by,update_at,position_top,position_left,materiel_group_child_id,
        |drawing_no,width,height,drawing_type from
        | dws.center_materiel_hotspot
        |""".stripMargin)

    val rdd = dataFrame.map(x => {
      val id: String = x.getAs(0)
      val create_by: String = x.getAs(1)
      val create_at: String = x.getAs(2).toString
      val update_by: String = x.getAs(3)
      val update_at: String = x.getAs(4).toString
      val position_top: String = if(x.getAs(5)!=null) x.getAs(5).toString else x.getAs(5)
      val position_left: String = if(x.getAs(6)!=null) x.getAs(6).toString else x.getAs(6)
      val materiel_group_child_id: String = x.getAs(7)
      val drawing_no: String = x.getAs(8)
      val width: String = if(x.getAs(9)!=null) x.getAs(9).toString else x.getAs(9)
      val height: String = if(x.getAs(10)!=null) x.getAs(10).toString else x.getAs(10)
      val drawing_type: String = x.getAs(11)
      val createTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(create_at).getTime
      val updateTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(update_at).getTime
      center_materiel_hotspot_local(id, create_by, createTime, update_by, updateTime, position_top,position_left,
        materiel_group_child_id,drawing_no,width,height,drawing_type)
    })
//    rdd.show(2)

    rdd.foreachPartition(x=>{
      val tuples: List[(String, center_materiel_hotspot_local)] = x.toList.map { data => (data.id, data) }
      //tuples 数据,参二:索引名称  ,参三:类型
      val dt: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
      savaBulk(tuples,"center_materiel_hotspot_local","_doc")
    })

    session.stop()
  }

  case class center_materiel_hotspot_local(id:String, createBy:String, createTime:Long, updateBy:String,
                                           updateTime:Long,positionTop:String, positionLeft:String,
                                           materielGroupChildId:String,drawingNo:String, width:String,
                                           height:String, drawingType:String )

  def savaBulk(dataList:List[(String,AnyRef)],indexName:String,typeName:String):Unit={
    if (dataList!=null && dataList.nonEmpty) {
      val client: JestClient = jestClient

      val builder = new Bulk.Builder
      builder.defaultIndex(indexName).defaultType(typeName)
      for ((id, data) <- dataList) {
        val index: Index = new Index.Builder(data).id(id).build()
        builder.addAction(index)
      }
      val bulk: Bulk = builder.build()
      //获取执行的返回值
      val items: util.List[BulkResult#BulkResultItem] = client.execute(bulk).getItems
      println("以保存:" + items.size() + "条数据!")
      client.close()
    }
  }

  import io.searchBox.client.JestClient
  import io.searchBox.client.JestClientFactory
  import io.searchBox.client.config.HttpClientConfig

  def jestClient: JestClient = {
    val factory = new JestClientFactory
    factory.setHttpClientConfig(new HttpClientConfig.Builder(
      "http://es-cn-tl32b5q0t003a6n6g.public.elasticsearch.aliyuncs.com:9200")
      .multiThreaded(true)
      .defaultMaxTotalConnectionPerRoute(2)
      .maxTotalConnection(10)
      .build)
    factory.getobject
  }

}
 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐