微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark Core中RDD数值型算子

一、 数值型RDD算子之转换算子

1、 map算子

map算子是将RDD中的每一个元素通过map算子计算后得到一个新的结果,新的结果为一个新的RDD

object MapOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("map").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 0))
    val rdd1: RDD[Int] = rdd.map((a: Int) => {
      a * 3
    })
    rdd1.foreach(println(_))

    val rdd2: RDD[String] = sc.makeRDD(Array("hadoop", "spark", "flink", "hadoop"))
    val rdd3: RDD[(String, Int)] = rdd2.map((word: String) => {
      (word, 1)
    })
    rdd3.foreach(println(_))
    sc.stop()
  }
}

2、 mapPartitions算子

和map类似,区别在于mapPartitions是对每一个分区操作

object MapPartitionsOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitions").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
    val rdd1:RDD[Int] = rdd.mapPartitions((a:Iterator[Int]) =>{
      var listBuffer:ListBuffer[Int] = ListBuffer()
      for( num <- a ){
        println(s"处理了一个分区数据$num")
        listBuffer.append(num*3)
      }
      listBuffer.iterator
    })
    rdd1.foreach(println(_))
    sc.stop()
  }
}

3、 mapPartitionsWithIndex算子

和map类似,区别在于mapPartitionsWithIndex是对每一个分区操作并且带有索引

object MapPartitionsWithIndex {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
    val rdd1:RDD[Int] = rdd.mapPartitionsWithIndex((index:Int,data:Iterator[Int])=>{
      val listBuffer:ListBuffer[Int] = ListBuffer()
      for(num <- data){
        println(s"正在处理$index 分区的数据$num")
        listBuffer.append(num *3)
      }
      listBuffer.iterator
    })
    rdd1.foreach(println(_))
    sc.stop()
  }
}

4、 flatMap压扁算子

原来的RDD一条数据经过flatMap算子操作返回一个集合数据,集合中的每一条数据都是新的RDD中的数据

object FlatMapOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flatMap").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd:RDD[String] = sc.makeRDD(Array("i am people", "dog is an animal"))
    val rdd1:RDD[String] = rdd.flatMap((a:String) => {
      a.split(" ")
    })
    rdd1.foreach(println(_))
    sc.stop()
  }
}

5、 filter过滤算子

将RDD中每一个元素通过fun函数计算得到一个boolean类型的返回值,如果返回值为true则保留数据
如果返回值为false,那么数据就舍弃,得到的新的RDD类型和旧的RDD类型一致

object FilterOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("filter").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd:RDD[Int] = sc.makeRDD(1 to 100)
    val rdd2:RDD[Int] = rdd.filter((a: Int) => {
      if (a % 2 == 1) {
        true
      } else {
        false
      }
    })
    rdd2.foreach(println(_))
    sc.stop()
  }
}

6、 union算子

两个RDD求一个并集返回一个新的RDD 两个RDD的类型必须保持一致

object Unionoperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("union").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9))
    val rdd1 = sc.makeRDD(Array(1, 12, 13, 14))
    val rdd2 = rdd.union(rdd1)
    rdd2.foreach(print(_))
    sc.stop()
  }
}

7、 intersection算子

交集 两个RDD类型一致

object Intersectionoperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("inersection").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9))
    val rdd1 = sc.makeRDD(Array(1, 12, 13, 14))
    val rdd2 = rdd.intersection(rdd1)
    rdd2.foreach(println(_))
    sc.stop()
  }
}

8、 distinct算子

对rdd进行去重

object distinctOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("distinct").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(Array(1, 1, 1, 1, 5, 6, 7, 8, 9))
    val rdd2 = rdd.distinct()
    rdd2.foreach(println(_))
    sc.stop()
  }
}

9、 sample抽样算子

withReplacement:Boolean 是否放回抽样 true放回抽样,false代表不放回抽象
fraction:Double 如果是放回抽样 整数代表同一个数据期望被抽中几次 如果是不放回抽样,0-1之间的小数,代表的抽取比例
seed 种子 不用写(认值) 抽样底层有算法

object SampleOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sample").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3), 4)
    val rdd2: RDD[Int] = rdd.mapPartitionsWithIndex((index: Int, data: Iterator[Int]) => {
      println(s"有一个$index 数据为${data.mkString(",")}")
      data
    })
    val rdd1: RDD[Int] = rdd.sample(false, 0.5)
    val rdd3: RDD[Int] = rdd1.mapPartitionsWithIndex((index: Int, data: Iterator[Int]) => {
      println(s"有一个$index 数据为${data.mkString(",")}")
      data
    })
    rdd2.collect()
    rdd3.collect()

    sc.stop()
  }
}

10、 repartition算子

对RDD数据集指定的分区数重新分区

object Repartitionoperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("repartition").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(Array(1, 1, 1, 1, 5, 6, 7, 8, 9), 10)
    println(rdd.getNumPartitions)
    val rdd2 = rdd.repartition(2)
    println(rdd2.getNumPartitions)
    sc.stop()
  }
}

二、 行动算子

1、 reduce算子

将原先RDD数据集中聚合起来算一个总的结果

object ReduceOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("reduce").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9,0))
    val sum:Int = rdd.reduce((a:Int,b:Int) => {
      println(s"a=$a,b=$b")
      if (a>b){
        a
      }else{
        b
      }
    })
    print(sum)
  }
}

2、 aggregate算子

zerovalue 初始值
seqOp函数 对RDD数据集中的每一个分区和zerovalue取一个聚合操作 得到一个结果
combOp函数 将每个分区通过seqOp函数计算出来的结果和zerovalue聚合操作得到一个最终结果

object AggregateOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("aggregate").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9),4);
    /**
     * 计算rdd数据集中累加结果
     */
    val res:Int = rdd.aggregate(1)(
      (a: Int, b: Int) => {a + b},
      (a: Int, b: Int) => {a + b}
    )
    /**
     * 计算rdd数据集中最大值和最小值
     */
    val res1:Int = rdd.aggregate(rdd.first())(
      (a: Int, b: Int) => {
        if(a<b){
          a
        }else{
          b
        }
      },
      (a: Int, b: Int) => {
        if(a<b){
          a
        }else{
          b
        }
      }
    )
    println(res1)
  }
}

3、 first算子

获取RDD数据集中第一条数据

object FirstOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("first").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9));
    println(rdd.first())
  }
}

4、 take算子

获取RDD数据集中前N条数据

object TakeOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("take").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
    val array1:Array[Int] = rdd.take(5)
    println(array1.mkString("="))
  }
}

5、 takeSample算子

takeSample(withReplacement, num, [seed]) 抽样,抽取几条数据

object TakeSampleOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("takesample").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
    val array2:Array[Int] = rdd.takeSample(false, 10)
    println(array2.mkString(","))
  }
}

6、 takeOrdered算子

takeOrdered(num):Array[T] 将RDD数据集元素排序之后获取前num条数据 认是升序排序

object TakeOrderedOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("takeordered").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
    val array3:Array[Int] = rdd.takeOrdered(5)
    println(array3.mkString("="))
  }
}

7、 count算子

求总数

object CountOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("count").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd:RDD[Int] = sc.makeRDD(1 to 6789)

    /**
     * 计算RDD中奇数一共有多少个
     */
    val rdd2:RDD[Int] = rdd.filter((a:Int)=>{
      if(a % 2==1){
        true
      }else{
        false
      }
    })
    rdd2.foreach((a:Int)=>{println(a)})
    val count:Long = rdd2.count()
    println(count)
  }
}

8、 foreach算子

foreach(func) 无返回值 一般用于累加器计算或者RDD数据集的遍历操作
func函数无返回值类型的函数

object ForeachOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("action").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd:RDD[Int] = sc.makeRDD(1 to 6789)
    val rdd2 = rdd.foreach((a: Int) => {
      println(a)
    })
    println(rdd2)
  }
}

9、 collect算子

将RDD数据集中的全部数据拉取到Driver所在节点形成一个数组,数组包含RDD数据集中的所有数据,数组内存中的概念

object CollectOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("collect").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
    val array:Array[Int] = rdd.collect()
    println(array.mkString(","))
  }
}

10、 saveAsTextFile算子

将rdd计算出来的结果集保存到本地或者是HDFS分布式文件存储系统

object SaveAsTextFileOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("action").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)\
    val rdd = sc.makeRDD(1 to 100)
    rdd.saveAsTextFile("hdfs://node1:9000/action")
  }
}

原文地址:https://www.jb51.cc/wenti/3287612.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐