spark sql 读取kudu表向sqlserver数据库中插入70万条数据
1.废话不多说。直接上代码。
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object KuduTosqlserver {
val kuduMasters = "cdh-5:xx,cdh-6:xx"
//Todo 1:定义kudu表
val kudutableName = "impala::dw_etl.zxjk_day_etl_flow_rg_kudu"
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("SparkKudu")
conf.setMaster("local[*]")
val option = Map("kudu.master" -> kuduMasters, "kudu.table" -> kudutableName)
val sparkSession = SparkSession.builder()
.config(conf)
.config("hive.metastore.uris", "thrift://cdh-2:9083")
.config("hive.metastore.warehouse.dir", "hdfs://http://cdh-3:3306/user/hive/warehouse")
.getorCreate()
// Todo: 读取kudu
sparkSession.read.format("org.apache.kudu.spark.kudu")
.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> kudutableName)).load.createOrReplaceTempView("tmp_kudu_table")
val result = sparkSession.sql(
"""
|select pscode,outputcode,pollutant_code,region_code,province,city,psname,outputname,case pollutant_code when '001' then '颗粒物'when '001' then '二氧化硫'when '003' then '氮氧化物'end pollutantname,
|focusindustrytype_new,monitortime,reviseflow, updateflow,qx_flow,reason
|from tmp_kudu_table
|""".stripMargin)
println("读取成功")
val prop = new Properties()
prop.setProperty("driver", "com.microsoft.sqlserver.jdbc.sqlServerDriver")
prop.setProperty("user", "你的登录名")
prop.setProperty("password", "密码")
result.write.mode("append")
.jdbc("jdbc:sqlserver://ip:1433;DatabaseName=AMDB_DataCleanDB", "[dbo].[tb_day11]", prop)
println("写入成功")
sparkSession.close()
}
}
总结,本次推入数据大约70多万条,总耗时8分钟,各位如有更快更好的方案,大家一起来讨论,共赏,谢谢大家。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。