微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark开发调优

Spark任务开发调优

1.避免创建重复的RDD + 尽可能复用同一个RDD + 对多次使用的RDD持久化

如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。

persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。

比如说,StorageLevel.MEMORY_AND_disK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。

而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。

序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。

持久化级别含义解释
MEMORY_ONLY使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_disK使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_disK_SER基本含义同MEMORY_AND_disK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
disK_ONLY使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2,
MEMORY_AND_disK_2,
等等.
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

2.尽量避免使用Shuffle类算子

如果有可能的话,要尽量避免使用 shuffle 类算子。因为 Spark 作业运行过程中,最消耗性能的地方就是 shuffle过程。shuffle 过程,简单来说,就是将分布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或 join 等操作。比如 reduceByKey、join 等算子,都会触发 shuffle 操作。

shuffle 过程中,各个节点上的相同 key 都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同 key。而且相同 key 都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的 key 过多,导致内存不够存放,进而溢写到磁盘文件中。因此在 shuffle 过程中,可能会发生大量的磁盘文件读写的 IO 操作,以及数据的网络传输操作。磁盘 IO 和网络数据传输也是 shuffle 性能较差的主要原因。

因此在我们的开发过程中,能避免则尽可能避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少shuffle 操作的Spark 作业,可以大大减少性能开销。

例如使用broadcast与map进行join代码示例

// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
// broadcast+map的join操作,不会导致shuffle操作。
// 使用broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2Databroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2Databroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或
Tuple)。
val rdd3 = rdd1.map(rdd2Databroadcast...)
// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

3.使用Map-Side预聚合的Shuffle操作

所谓的 map-side 预聚合,说的是在每个节点本地对相同的 key 进行一次聚合操作,类似于MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条相同的 key,因为多条相同的 key都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子。因为 reduceByKey 和aggregateByKey 算子都会使用用户自定义函数对每个节点本地的相同key进行预聚合。而groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

4.使用高性能的算子

4.1使用reduceByKey/aggregateByKey替代groupByKey

4.2使用mapPartitions替代普通map

mapPartitions 类的算子,一次函数调用会处理一个 partition 所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用 mapPartitions 会出现 OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个 partition 所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现 OOM 异常。所以使用这类操作时要慎重!

rdd.map(element => {
val connection = MysqL.getConnection();
val user_data = connection.getTableData("user", id)
element + user_data
})

rdd.mapMapparitions(rdd_partition => {
val connection = MysqL.getConnection();
val user_data = connection.getTableData("user", id)
rdd_partition.map(element => {
element + user_data
})
}) + 数据库的连接池!

4.3. 使用foreachPartitions替代foreach

原理类似于“使用 mapPartitions 替代 map”,也是一次函数调用处理一个 partition 的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions 类的算子,对性能的提升还是很有帮助的。比如在 foreach 函数中,将 RDD 中所有数据写 MysqL,那么如果是普通的 foreach 算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于 1 万条左右的数据量写 MysqL性能可以提升 30% 以上。

4.4使用filter之后进行coalesce操作

rdd.filter(age > 18) RDD 的数据量变少了。! 做了filter之后,分区个数没有变,但是分区的数据量减少了。原来每个 分区1G数据,经过 filter之后 变成了 100M

RDD的数据总量: 分区数 * 分区的平均数据量

思路:在经过filter之后,进行coalesce操作:多个分区合并成一个分区

4.5使用repartitionAndSortWithinPartitions替代repartition与sort类操作

1、rdd.repartition().sort() // 先进行了重分区,然后进行分区的排序 这个效率相对来说没有下面这种方高
2、rdd.repartitionAndSortWithinPartitions() // 边分区边排序

repartitionAndSortWithinPartitions 是 Spark 官网推荐的一个算子,官方建议,如果需要在repartition 重分区之后,还要进行排序,建议直接使用 repartitionAndSortWithinPartitions 算子。因为该算子可以一边进行重分区的 shuffle 操作,一边进行排序。shuffle 与 sort 两个操作同时进行,比先 shuffle 再 sort 来说,性能可能是要高的。

5.广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M 以上的大集合),那么此时就应该使用Spark的广播(broadcast)功能来提升性能

在算子函数中使用到外部变量时,认情况下,Spark 会将该变量复制多个副本,通过网络传输到 task中,此时每个 task 都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至 1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的 Executor 中占用过多内存导致的频繁 GC,都会极大地影响性能

因此对于上述情况,如果使用的外部变量比较大,建议使用 Spark 的广播功能,对该变量进行广播。广播后的变量,会保证每个 Executor 的内存中,只驻留一份变量副本,而 Executor 中的 task 执行时共享该 Executor 中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对 Executor 内存的占用开销,降低 GC 的频率。

// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1broadcast = sc.broadcast(list1)
rdd1.map(list1broadcast...)

提升效果明显:
1、如果待广播特别小,就没有广播的必要性了。
driver中声明了一个小的全局变量,最后在算子的函数参数中使用了
2、待广播的数据量大:如果使用广播变量的工作机制,则一个worker中启动的某个executor中的多个Task 就可以共用一份数据,这个广播数据就是存储在 存储内存中,这个内存有可能是堆内内存,也有可能是堆外内存
3、如果待广播的数据量特别大,不适合使用广播机制!

最终结论:在能广播的前提下,待广播的数据量越大,提升效率越明显。内存资源占用的减小很明显

6.使用Kryo优化序列化性能

序列化和反序列化: java对象,如果要进行IO操作,就需要被序列化成字节数组
1、java中的序列化机制: 让自定义的类实现Serializable接口, 这种方式太重。!!
       除了序列化属性的值意外,还会序列这个类的信息。
2、Hadoop当中:专门为hadoop提供了一种序列化机制:让自定义的类实现Writable
这两种方式,spark都不想用!
3、spark提供了一种新的优化方式:Kryo

在 Spark 中,主要有三个地方涉及到了序列化:

1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
2、将自定义的类型作为 RDD 的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
3、使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用 Kryo 序列化类库,来优化序列化和反序列化的性能。Spark 认使用的是 Java 的序列化机制,也就是 ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是 Spark 同时支持使用 Kryo序列化库,Kryo 序列化类库的性能比 Java序列化类库的性能要高很多。官方介绍,Kryo 序列化机制比 Java 序列化机制,性能高 10 倍左右。Spark 之所以认没有使用 Kryo 作为序列化类库,是因为 Kryo 要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
以下是使用 Kryo 的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为 RDD 泛型类型的自定义类型等):

// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
// 提供一种最暴力的方案:只要是RDD的泛型类,都应该使用这种方式

7.优化数据结构

Java 中,有三种类型比较耗费内存:

1、对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
2、字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
3、集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此 Spark 官方建议,在 Spark 编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低 GC 频率,提升性能
总结一下:表示相同信息的情况下:

1、字符串比对象占用内存少
2、普通基本类型,比字符串占用内存少
3、数组类型,比集合类型占用内存少。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐