/**
* Redis连接池
*/
object RedisClient extends Serializable {
val redisHost = "192.168.115.142"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new JedisPoolConfig, redisHost, redisPort, redisTimeout)
import scala.concurrent.ExecutionContext.Implicits.global
/*
lazy val hook = Future {
println("Execute hook thread :" + this)
pool.destroy()
} onComplete {
case Success(value) => println "success" + value
case Failure(value) => println "failure" + value
}
*/
}
以下是Streaming的代码:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Streaming Process Data").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//streaming context class, need a time
val ssc = new StreamingContext(sc, Seconds(3))
//kafka的偏移量会存储到此处
ssc.checkpoint("spark-receiver")
//kafka conf
val kafkaParams = Map[String,String] (
"bootstrap.servers" -> "192.168.115.142:9092",
/* "key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],*/
"group.id" -> "spark_order"
)
val topics = Set("example_order")
//KafkaUtils.creatDirectStream 利用低级api接收数据
val kafkaStream = KafkaUtils.createDirectStream[String, String,StringDecoder,StringDecoder](
ssc,
kafkaParams,
topics
)
//flatMap方法:1.调用者必须是一个可迭代的集合。2.返回类型为调用者的类型。3.传入函数的返回值必须是一个集合
//RDD里的操作对象需要支持序列化,尽量在函数里声明对象
val paymentInfos = kafkaStream.flatMap(record => Some(new Gson().fromJson(record._2,classOf[PaymentInfo])))
val batchInfos = paymentInfos.map(v => (v.productId, v.productPrice)).groupByKey.map(v => (v._1, v._2.size,v._2.sum[Long]))
batchInfos.foreachRDD(
x =>
x.foreachPartition(
partition =>
partition.foreach(
v => {
//get redis connection
val jedis = RedisClient.pool.getResource
//select database
jedis.select(1)
//add single sales
//把Hash中对应的v._1元素的值加v._3
jedis.hincrBy("orderTotalKey", v._1, v._3)
//add all sales
//将key对应的value+v._3
jedis.incrBy("totalKey", v._3)
//return redis connection
jedis.close()
println(v._1+" "+v._3)
}
)
)
)
ssc.start()
ssc.awaitTermination()
}
需要说明的是Spark是需要把代码发送带执行器上去,如果对象不能完整的序列化,就会导致异常,比如RedisClient用properties动态加载会导致出错,若Gson用本地变量也会导致出错。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。