微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark-共享变量工作原理

Spark一个非常重要的特性就是共享变量

认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量,一种是broadcast Variable(广播变量),另一种是Accumulator(累加变量)。broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。

 

 

 

broadcast示例

package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.broadcast;

import java.util.Arrays;
import java.util.List;

/**
 * @author: yangchun
 * @description:
 * @date: Created in 2020-05-08 12:25
 */
public class broadcastvariable {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("WordCountLocal")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        final int factor =3;
        final broadcast<Integer> factorbroadCast = sc.broadcast(factor);
        List<Integer> numberlist = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> numbers = sc.parallelize(numberlist);
        JavaRDD<Integer> multipleNumbers = numbers.map(new Function<Integer, Integer>() {
            private static final long serialVersionUID =1l;
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer * factorbroadCast.getValue();
            }
        });
        multipleNumbers.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
        sc.close();
    }
}
package cn.spark.study.core

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author: yangchun
 * @description:
 * @date: Created in 2020-05-08 12:33 
 */
object broadcastvariable{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("broadcast")
      .setMaster("local")
    val sparkContext = new SparkContext(conf)
    val factor =3
    val broadcast = sparkContext.broadcast(factor)
    val numberArray = Array(1,2,3,4,5)
    val numbers = sparkContext.parallelize(numberArray,1)
    val multipleNumbers = numbers.map{num => num * broadcast.value}
    multipleNumbers.foreach{num => println(num)}
  }
}

accumulator示例

package cn.spark.study.core;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import java.util.Arrays;
import java.util.List;

/**
 * @author: yangchun
 * @description:
 * @date: Created in 2020-05-08 12:40
 */
public class AccumulatorVariable {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("WordCountLocal")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        final Accumulator<Integer> accumulator = sc.accumulator(0);
        List<Integer> numberlist = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> numbers = sc.parallelize(numberlist);
        numbers.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
               accumulator.add(integer);
            }
        });
        System.out.println(accumulator);
        sc.close();
    }
}
package cn.spark.study.core

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author: yangchun
 * @description:
 * @date: Created in 2020-05-08 12:44 
 */
object AccumulatorVariable {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("broadcast")
      .setMaster("local")
    val sparkContext = new SparkContext(conf)
    val sum  = sparkContext.accumulator(0)
    val numberArray = Array(1,2,3,4,5)
    val numbers = sparkContext.parallelize(numberArray,1)
    numbers.foreach{sum +=_}
    println(sum)
  }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐