默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。
Spark为此提供了两种共享变量,一种是broadcast Variable(广播变量),另一种是Accumulator(累加变量)。broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。
broadcast示例
package cn.spark.study.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.broadcast; import java.util.Arrays; import java.util.List; /** * @author: yangchun * @description: * @date: Created in 2020-05-08 12:25 */ public class broadcastvariable { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("WordCountLocal") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); final int factor =3; final broadcast<Integer> factorbroadCast = sc.broadcast(factor); List<Integer> numberlist = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> numbers = sc.parallelize(numberlist); JavaRDD<Integer> multipleNumbers = numbers.map(new Function<Integer, Integer>() { private static final long serialVersionUID =1l; @Override public Integer call(Integer integer) throws Exception { return integer * factorbroadCast.getValue(); } }); multipleNumbers.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { System.out.println(integer); } }); sc.close(); } }
package cn.spark.study.core import org.apache.spark.{SparkConf, SparkContext} /** * @author: yangchun * @description: * @date: Created in 2020-05-08 12:33 */ object broadcastvariable{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("broadcast") .setMaster("local") val sparkContext = new SparkContext(conf) val factor =3 val broadcast = sparkContext.broadcast(factor) val numberArray = Array(1,2,3,4,5) val numbers = sparkContext.parallelize(numberArray,1) val multipleNumbers = numbers.map{num => num * broadcast.value} multipleNumbers.foreach{num => println(num)} } }
accumulator示例
package cn.spark.study.core; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import java.util.Arrays; import java.util.List; /** * @author: yangchun * @description: * @date: Created in 2020-05-08 12:40 */ public class AccumulatorVariable { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("WordCountLocal") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); final Accumulator<Integer> accumulator = sc.accumulator(0); List<Integer> numberlist = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> numbers = sc.parallelize(numberlist); numbers.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { accumulator.add(integer); } }); System.out.println(accumulator); sc.close(); } }
package cn.spark.study.core import org.apache.spark.{SparkConf, SparkContext} /** * @author: yangchun * @description: * @date: Created in 2020-05-08 12:44 */ object AccumulatorVariable { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("broadcast") .setMaster("local") val sparkContext = new SparkContext(conf) val sum = sparkContext.accumulator(0) val numberArray = Array(1,2,3,4,5) val numbers = sparkContext.parallelize(numberArray,1) numbers.foreach{sum +=_} println(sum) } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。