微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark中利用Scala进行数据清洗代码

2019-05-07 18:56:18
1

  package com.amoscloud.log.analyze 2 3 import java.text.SimpleDateFormat 4 import java.util.Date 5 6 import org.apache.spark.rdd.RDD 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object LogAnalyze1 { 10 def main(args: Array[String]): Unit = { 11 12 13 val conf = new SparkConf().setMaster("local[2]").setAppName("LogAnalyze2") 14 val sc = new SparkContext(conf) 15 16 val data = sc.textFile("C:\\Users\\Administrator\\Desktop\\HTTP.txt") 17 data.cache() 18 // 1.(手机号,归属地,设备品牌,设备型号,连接时长) 19 // analyze1(data) 20 // 2.(时间段秒,访问流量) 21 analyze2(data) 22 // 3.(品牌,Array[(String,Int)]((型号1,个数1),(型号2,个数2))) 23 // analyze(data) 24 } 25 26 private def analyze(data: RDD[String]) = { 27 data.filter(_.split(",").length >= 72) 28 .map(x => { 29 val arr = x.split(",") 30 val brand = arr(70) 31 val model = arr(71) 32 ((brand, model), 1) 33 }) 34 .reduceByKey(_ + _) 35 .map(t => { 36 val k = t._1 37 (k._1, (k._2, t._2)) 38 }) 39 .groupByKey() 40 .collect() 41 .foreach(println) 42 } 43 44 private def analyze2(data: RDD[String]) = { 45 data.map(x => { 46 val arr = x.split(",") 47 val time = arr(16).take(arr(16).length - 4) 48 val flow = arr(7).toLong 49 (time, flow) 50 }) 51 .reduceByKey(_ + _) 52 // .map(x => (x._1, (x._2 / 1024.0).formatted("%.3f") + "KB")) 53 .map(x => (x._1, x._2)) 54 .collect() 55 .foreach(println) 56 } 57 58 private def analyze1(data: RDD[String]) = { 59 data 60 .filter(_.split(",").length >= 72) 61 .map(x => { 62 val arr = x.split(",") 63 val phoneNum = arr(3).takeRight(11) 64 val local = arr(61) + arr(62) + arr(63) 65 val brand = arr(70) 66 val model = arr(71) 67 val connectTime = timeDiff(arr(16), arr(17)) 68 (phoneNum + "|" + local + "|" + brand + "|" + model, connectTime) 69 // 1.(手机号,归属地,设备品牌,设备型号,连接时长) 70 }) 71 .reduceByKey(_ + _) 72 .map(t => (t._1, formatTime(t._2))) 73 .collect() 74 .foreach(println) 75 } 76 77 def timeDiff(time1: String, time2: String): Long = { 78 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 79 val timeStamp2 = sdf.parse(time2.take(time2.length - 4)).getTime + time2.takeRight(3).toLong 80 val timeStamp1 = sdf.parse(time1.take(time1.length - 4)).getTime + time1.takeRight(3).toLong 81 timeStamp2 - timeStamp1 82 } 83 84 85 def formatTime(time: Long): String = { 86 val timeS = time / 1000 87 val s = timeS % 60 88 val m = timeS / 60 % 60 89 val h = timeS / 60 / 60 % 24 90 h + ":" + m + ":" + s 91 } 92 93 }

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐