一、 数值型RDD算子之转换算子
1、 map算子
map算子是将RDD中的每一个元素通过map算子计算后得到一个新的结果,新的结果为一个新的RDD
object MapOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("map").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 0))
val rdd1: RDD[Int] = rdd.map((a: Int) => {
a * 3
})
rdd1.foreach(println(_))
val rdd2: RDD[String] = sc.makeRDD(Array("hadoop", "spark", "flink", "hadoop"))
val rdd3: RDD[(String, Int)] = rdd2.map((word: String) => {
(word, 1)
})
rdd3.foreach(println(_))
sc.stop()
}
}
2、 mapPartitions算子
和map类似,区别在于mapPartitions是对每一个分区操作
object MapPartitionsOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mapPartitions").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
val rdd1:RDD[Int] = rdd.mapPartitions((a:Iterator[Int]) =>{
var listBuffer:ListBuffer[Int] = ListBuffer()
for( num <- a ){
println(s"处理了一个分区数据$num")
listBuffer.append(num*3)
}
listBuffer.iterator
})
rdd1.foreach(println(_))
sc.stop()
}
}
3、 mapPartitionsWithIndex算子
和map类似,区别在于mapPartitionsWithIndex是对每一个分区操作并且带有索引
object MapPartitionsWithIndex {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
val rdd1:RDD[Int] = rdd.mapPartitionsWithIndex((index:Int,data:Iterator[Int])=>{
val listBuffer:ListBuffer[Int] = ListBuffer()
for(num <- data){
println(s"正在处理$index 分区的数据$num")
listBuffer.append(num *3)
}
listBuffer.iterator
})
rdd1.foreach(println(_))
sc.stop()
}
}
4、 flatMap压扁算子
原来的RDD一条数据经过flatMap算子操作返回一个集合数据,集合中的每一条数据都是新的RDD中的数据
object FlatMapOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("flatMap").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd:RDD[String] = sc.makeRDD(Array("i am people", "dog is an animal"))
val rdd1:RDD[String] = rdd.flatMap((a:String) => {
a.split(" ")
})
rdd1.foreach(println(_))
sc.stop()
}
}
5、 filter过滤算子
将RDD中每一个元素通过fun函数计算得到一个boolean类型的返回值,如果返回值为true则保留数据
如果返回值为false,那么数据就舍弃,得到的新的RDD类型和旧的RDD类型一致
object FilterOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("filter").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd:RDD[Int] = sc.makeRDD(1 to 100)
val rdd2:RDD[Int] = rdd.filter((a: Int) => {
if (a % 2 == 1) {
true
} else {
false
}
})
rdd2.foreach(println(_))
sc.stop()
}
}
6、 union算子
两个RDD求一个并集返回一个新的RDD 两个RDD的类型必须保持一致
object Unionoperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("union").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9))
val rdd1 = sc.makeRDD(Array(1, 12, 13, 14))
val rdd2 = rdd.union(rdd1)
rdd2.foreach(print(_))
sc.stop()
}
}
7、 intersection算子
交集 两个RDD类型一致
object Intersectionoperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("inersection").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9))
val rdd1 = sc.makeRDD(Array(1, 12, 13, 14))
val rdd2 = rdd.intersection(rdd1)
rdd2.foreach(println(_))
sc.stop()
}
}
8、 distinct算子
对rdd进行去重
object distinctOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("distinct").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(Array(1, 1, 1, 1, 5, 6, 7, 8, 9))
val rdd2 = rdd.distinct()
rdd2.foreach(println(_))
sc.stop()
}
}
9、 sample抽样算子
withReplacement:Boolean 是否放回抽样 true放回抽样,false代表不放回抽象
fraction:Double 如果是放回抽样 整数代表同一个数据期望被抽中几次 如果是不放回抽样,0-1之间的小数,代表的抽取比例
seed 种子 不用写(默认值) 抽样底层有算法
object SampleOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sample").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3), 4)
val rdd2: RDD[Int] = rdd.mapPartitionsWithIndex((index: Int, data: Iterator[Int]) => {
println(s"有一个$index 数据为${data.mkString(",")}")
data
})
val rdd1: RDD[Int] = rdd.sample(false, 0.5)
val rdd3: RDD[Int] = rdd1.mapPartitionsWithIndex((index: Int, data: Iterator[Int]) => {
println(s"有一个$index 数据为${data.mkString(",")}")
data
})
rdd2.collect()
rdd3.collect()
sc.stop()
}
}
10、 repartition算子
对RDD数据集指定的分区数重新分区
object Repartitionoperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("repartition").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(Array(1, 1, 1, 1, 5, 6, 7, 8, 9), 10)
println(rdd.getNumPartitions)
val rdd2 = rdd.repartition(2)
println(rdd2.getNumPartitions)
sc.stop()
}
}
二、 行动算子
1、 reduce算子
将原先RDD数据集中聚合起来算一个总的结果
object ReduceOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("reduce").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9,0))
val sum:Int = rdd.reduce((a:Int,b:Int) => {
println(s"a=$a,b=$b")
if (a>b){
a
}else{
b
}
})
print(sum)
}
}
2、 aggregate算子
zerovalue 初始值
seqOp函数 对RDD数据集中的每一个分区和zerovalue取一个聚合操作 得到一个结果
combOp函数 将每个分区通过seqOp函数计算出来的结果和zerovalue聚合操作得到一个最终结果
object AggregateOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("aggregate").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9),4);
/**
* 计算rdd数据集中累加结果
*/
val res:Int = rdd.aggregate(1)(
(a: Int, b: Int) => {a + b},
(a: Int, b: Int) => {a + b}
)
/**
* 计算rdd数据集中最大值和最小值
*/
val res1:Int = rdd.aggregate(rdd.first())(
(a: Int, b: Int) => {
if(a<b){
a
}else{
b
}
},
(a: Int, b: Int) => {
if(a<b){
a
}else{
b
}
}
)
println(res1)
}
}
3、 first算子
获取RDD数据集中第一条数据
object FirstOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("first").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd:RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9));
println(rdd.first())
}
}
4、 take算子
获取RDD数据集中前N条数据
object TakeOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("take").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
val array1:Array[Int] = rdd.take(5)
println(array1.mkString("="))
}
}
5、 takeSample算子
takeSample(withReplacement, num, [seed]) 抽样,抽取几条数据
object TakeSampleOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("takesample").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
val array2:Array[Int] = rdd.takeSample(false, 10)
println(array2.mkString(","))
}
}
6、 takeOrdered算子
takeOrdered(num):Array[T] 将RDD数据集元素排序之后获取前num条数据 默认是升序排序
object TakeOrderedOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("takeordered").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
val array3:Array[Int] = rdd.takeOrdered(5)
println(array3.mkString("="))
}
}
7、 count算子
求总数
object CountOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("count").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd:RDD[Int] = sc.makeRDD(1 to 6789)
/**
* 计算RDD中奇数一共有多少个
*/
val rdd2:RDD[Int] = rdd.filter((a:Int)=>{
if(a % 2==1){
true
}else{
false
}
})
rdd2.foreach((a:Int)=>{println(a)})
val count:Long = rdd2.count()
println(count)
}
}
8、 foreach算子
foreach(func) 无返回值 一般用于累加器计算或者RDD数据集的遍历操作
func函数无返回值类型的函数
object ForeachOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("action").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd:RDD[Int] = sc.makeRDD(1 to 6789)
val rdd2 = rdd.foreach((a: Int) => {
println(a)
})
println(rdd2)
}
}
9、 collect算子
将RDD数据集中的全部数据拉取到Driver所在节点形成一个数组,数组包含RDD数据集中的所有数据,数组内存中的概念
object CollectOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("collect").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(20,30,11,15,16,19,22,26,17,45,54,99))
val array:Array[Int] = rdd.collect()
println(array.mkString(","))
}
}
10、 saveAsTextFile算子
将rdd计算出来的结果集保存到本地或者是HDFS分布式文件存储系统
object SaveAsTextFileOperator {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("action").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)\
val rdd = sc.makeRDD(1 to 100)
rdd.saveAsTextFile("hdfs://node1:9000/action")
}
}
原文地址:https://www.jb51.cc/wenti/3287612.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。