文章目录
引入pom文件依赖
<dependency> <groupId>org.apache.hbasegroupId> <artifactId>hbase-serverartifactId> <version>1.1.1version> dependency> <dependency> <groupId>org.apache.hbasegroupId> <artifactId>hbaseartifactId> <version>1.1.1version> dependency> <dependency> <groupId>org.apache.hbasegroupId> <artifactId>hbase-commonartifactId> <version>1.1.1version> dependency> <dependency> <groupId>org.apache.hbasegroupId> <artifactId>hbase-clientartifactId> <version>1.1.1version> dependency>
将标签按日输出到hbase表中
package com.dmp.tags import com.dmp.utils.TagsUtils import com.typesafe.config.ConfigFactory import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.sql.sqlContext import org.apache.spark.{SparkConf, SparkContext} /* C:\Users\admin\Desktop\result1 C:\Users\admin\Desktop\data\app_mapping.txt C:\Users\admin\Desktop\data\stops.txt C:\Users\admin\Desktop\data\resultTags1 */ object Tags4Ctx { def main(args: Array[String]): Unit = { // 0 校验参数个数 if (args.length != 4) { println( """ |com.dmp.tags.Tags4Ctx |参数: | logInputPath | dictionaryPath | stopwordpath | resultOutputPath | day """.stripMargin) sys.exit() } // 1 接受程序参数 val Array(logInputPath, dictionaryPath, stopwordpath, resultOutputPath,day) = args // 2 创建sparkconf->sparkContext val sparkConf = new SparkConf() sparkConf.setAppName(s"${this.getClass.getSimpleName}") sparkConf.setMaster("local[*]") // RDD 序列化到磁盘 worker与worker之间的数据传输 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) val sqlContext = new sqlContext(sc) //字典文件 appMapping val dicMap = sc.textFile(dictionaryPath) .map(line => { val fields = line.split("\t", -1) (fields(4), fields(5)) }).collect().toMap // 停用词 val stopWordsMap = sc.textFile(stopwordpath) .map((_, 0)) .collect().toMap //广播出去 val broadcastAppDict = sc.broadcast(dicMap) val broadcastStopWordsDict = sc.broadcast(stopWordsMap) //判断hbase的表是否存在.不存在则创建 val configuration = sc.hadoopConfiguration val load = ConfigFactory.load() val hbaseTabName = load.getString("hbase.table.name") configuration.set("hbase.zookeeper.quorum", load.getString("hbase.zookeeper.host")) val hbConn = ConnectionFactory.createConnection(configuration) val hbAdmin = hbConn.getAdmin if (!hbAdmin.tableExists(TableName.valueOf(hbaseTabName))){ println(s"$hbaseTabName is not exists") println(s"$hbaseTabName is creating") val tabNameDescriptor = new HTableDescriptor(TableName.valueOf(hbaseTabName)) //创建列族 val columnDescriptor = new HColumnDescriptor("cf") tabNameDescriptor.addFamily(columnDescriptor) //创建表 hbAdmin.createTable(tabNameDescriptor) //释放资源 hbAdmin.close() hbConn.close() } //类似于mr 指定key的输出类型 val jobConf = new JobConf(configuration) jobConf.setoutputFormat(classOf[TableOutputFormat]) //指定表名称 jobConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTabName) //读取日志的parquet文件 sqlContext.read.parquet(logInputPath) .where(TagsUtils.hasSomeUserIdCondition) //过滤 .map(row => { //行数据进行标签化处理 //广告 val ads = Tags4Ads.makeTags(row) //媒介 val apps = TagsApp.makeTags(row, broadcastAppDict.value) //设备 val devices = Tags4Devices.makeTags(row) //停用词 val keywords = Tags4KeyWords.makeTags(row, broadcastStopWordsDict.value) val allUserId = TagsUtils.getAllUserId(row) (allUserId(0), (ads ++ apps ++ devices).toList) }).reduceByKey((a, b) => { // List(("K电视剧",1),("K电视剧",1)) => groupBy => Map["K电视剧",List(....)] //foldLeft(0)(_+_._2) 表示前一个加上后一个值 //第一种写法 // (a ++ b).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList (a ++ b).groupBy(_._1).map { //使用偏函数 case (k, smaTags) => (k, smaTags.map(_._2).sum) }.toList }) // .saveAsTextFile(resultOutputPath) .map { case (userId, userTags) => { //以用户id为rowkey val put = new Put(Bytes.toBytes(userId)) //list里面是元组,key是标签值 value是 个数 map转换成List(String) 类型 然后mkString转换成字符串类型, //按,分割 val tags = userTags.map(t => t._1 + ":" + t._2).mkString(",") //列族 cf 列day + 日期 值 标签 put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes(s"day$day"),Bytes.toBytes(tags)) //转换成hadoop的输出类型 (new ImmutableBytesWritable(),put) } }.saveAsHadoopDataset(jobConf) sc.stop() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。