微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark—累加器

Spark—累加器

本文记录了Spark三大数据结构中累加器的相关知识

文章目录


前言

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于
处理不同的应用场景。三大数据结构分别是:

  1. RDD : 弹性分布式数据集
  2. 累加器:分布式共享只写变量
  3. 广播变量:分布式共享只读变量

提示:以下是本篇文章正文内容,下面案例可供参考

1、实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在
Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,
传回 Driver 端进行 merge(合并)

2、累加器的实现

2.1 系统累加器

实现简单聚合功能
代码如下(示例):

val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
val sc = new SparkContext(sparConf)

val rdd = sc.makeRDD(List(1,2,3,4))

// 获取系统累加器
// Spark认就提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")

// 其它的系统累加器
//sc.doubleAccumulator
//sc.collectionAccumulator // 返回的是 java.util.List

rdd.foreach(
    num => {
        // 使用累加器
        sumAcc.add(num)
    }
)

// 获取累加器的值
// result: 10
println(sumAcc.value)

sc.stop()

注意:

  1. 不能够通过在main函数中定义一个变量sum,用foreach进行遍历累加,main函数定义的sum是在Driver端,foreach执行是在Executor端,虽然Driver端会把sum传到Executor端,但foreach实现的是分布式计算,sum的值会不确定,并且此时不能够将sum返回到Driver端进行累加,所以只能通过累加器,才能实现在各个计算节点返回sum,并在Driver端合并聚合
  2. 获取累加器的值,会出现少加和多加的情况
    val sumAcc = sc.longAccumulator("sum")
    val mapRDD = rdd.map(
          num => {
               // 使用累加器
               sumAcc.add(num)
               num
           }
       )
    
    // 获取累加器的值
    // 少加:转换算子中(例如本例中的map)调用累加器,如果没有行动算子的话,那么不会执行
    // mapRDD.collect() // 即没有行动算子触发任务
    // 多加:转换算子中调用累加器,如果有多个行动算子的话,那么就会多次触发任务,多加
    // 一般情况下,累加器会放置在行动算子进行操作
    mapRDD.collect()
    mapRDD.collect()
    println(sumAcc.value)
    

2.2 自定义累加器(WordCount)

代码如下(示例):

def main(args: Array[String]): Unit = {

    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List("hello", "spark", "hello"))

    // 累加器 : WordCount
    // 1. 创建累加器对象
    val wcAcc = new MyAccumulator()
    // 2. 向Spark进行注册
    sc.register(wcAcc, "wordCountAcc")

    rdd.foreach(
        word => {
            // 3. 数据的累加(使用累加器)
            wcAcc.add(word)
        }
    )

    // 4. 获取累加器累加的结果
    // result: Map(spark -> 1, hello -> 2)
    println(wcAcc.value)

    sc.stop()

}
/*
  自定义数据累加器:WordCount

  1. 继承AccumulatorV2, 定义泛型
     IN : 累加器输入的数据类型 String
     OUT : 累加器返回的数据类型 mutable.Map[String, Long]

  2. 重写方法(6)
 */
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {

    private var wcMap = mutable.Map[String, Long]()

    // 判断是否初始状态
    override def isZero: Boolean = {
        wcMap.isEmpty
    }

	// 复制一个新的累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        new MyAccumulator()
    }

	// 重置累加器
    override def reset(): Unit = {
        wcMap.clear()
    }

	// 向累加器中增加数据 (In)
    // 添加累加器需要计算的值
    override def add(word: String): Unit = {
    	// 查询 map 中是否存在相同的单词
		// 如果有相同的单词,那么单词的数量加 1
		// 如果没有相同的单词,那么在 map 中增加这个单词,给个初始值0,并在+1,实现累加统计
        val newCnt = wcMap.getorElse(word, 0L) + 1
        wcMap.update(word, newCnt)
    }

    // Driver合并多个累加器
    // 两个 Map 的合并
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

        val map1 = this.wcMap
        val map2 = other.value // Executor 返回的计算结果

        map2.foreach{
            case ( word, count ) => {
            	// 合并Executor 返回的计算结果
                val newCount = map1.getorElse(word, 0L) + count 
                map1.update(word, newCount)
            }
        }
    }

    // 累加器结果,获取累加器的值
    // 返回累加器的结果 (Out)
    override def value: mutable.Map[String, Long] = {
        wcMap
    }
}

总结

文章也是仅作知识点的记录,欢迎大家指出错误,如有新的感悟也会一并更新,一起探讨~~~

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐