Spark—累加器
本文记录了Spark三大数据结构中累加器的相关知识
文章目录
前言
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于
处理不同的应用场景。三大数据结构分别是:
- RDD : 弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
1、实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在
Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,
传回 Driver 端进行 merge(合并)
2、累加器的实现
2.1 系统累加器
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
// 其它的系统累加器
//sc.doubleAccumulator
//sc.collectionAccumulator // 返回的是 java.util.List
rdd.foreach(
num => {
// 使用累加器
sumAcc.add(num)
}
)
// 获取累加器的值
// result: 10
println(sumAcc.value)
sc.stop()
注意:
- 不能够通过在main函数中定义一个变量sum,用foreach进行遍历累加,main函数定义的sum是在Driver端,foreach执行是在Executor端,
虽然Driver端会把sum传到Executor端,但foreach实现的是分布式计算,sum的值会不确定,并且此时不能够将sum返回到Driver端进行累加
,所以只能通过累加器,才能实现在各个计算节点返回sum,并在Driver端合并聚合 - 在获取累加器的值,会出现少加和多加的情况
val sumAcc = sc.longAccumulator("sum") val mapRDD = rdd.map( num => { // 使用累加器 sumAcc.add(num) num } ) // 获取累加器的值 // 少加:转换算子中(例如本例中的map)调用累加器,如果没有行动算子的话,那么不会执行 // mapRDD.collect() // 即没有行动算子触发任务 // 多加:转换算子中调用累加器,如果有多个行动算子的话,那么就会多次触发任务,多加 // 一般情况下,累加器会放置在行动算子进行操作 mapRDD.collect() mapRDD.collect() println(sumAcc.value)
2.2 自定义累加器(WordCount)
代码如下(示例):
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)
val rdd = sc.makeRDD(List("hello", "spark", "hello"))
// 累加器 : WordCount
// 1. 创建累加器对象
val wcAcc = new MyAccumulator()
// 2. 向Spark进行注册
sc.register(wcAcc, "wordCountAcc")
rdd.foreach(
word => {
// 3. 数据的累加(使用累加器)
wcAcc.add(word)
}
)
// 4. 获取累加器累加的结果
// result: Map(spark -> 1, hello -> 2)
println(wcAcc.value)
sc.stop()
}
/*
自定义数据累加器:WordCount
1. 继承AccumulatorV2, 定义泛型
IN : 累加器输入的数据类型 String
OUT : 累加器返回的数据类型 mutable.Map[String, Long]
2. 重写方法(6)
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
private var wcMap = mutable.Map[String, Long]()
// 判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
// 复制一个新的累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new MyAccumulator()
}
// 重置累加器
override def reset(): Unit = {
wcMap.clear()
}
// 向累加器中增加数据 (In)
// 添加累加器需要计算的值
override def add(word: String): Unit = {
// 查询 map 中是否存在相同的单词
// 如果有相同的单词,那么单词的数量加 1
// 如果没有相同的单词,那么在 map 中增加这个单词,给个初始值0,并在+1,实现累加统计
val newCnt = wcMap.getorElse(word, 0L) + 1
wcMap.update(word, newCnt)
}
// Driver合并多个累加器
// 两个 Map 的合并
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value // Executor 返回的计算结果
map2.foreach{
case ( word, count ) => {
// 合并Executor 返回的计算结果
val newCount = map1.getorElse(word, 0L) + count
map1.update(word, newCount)
}
}
}
// 累加器结果,获取累加器的值
// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = {
wcMap
}
}
总结
文章也是仅作知识点的记录,欢迎大家指出错误,如有新的感悟也会一并更新,一起探讨~~~
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。