微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark – 如何减少JavaPairRDD的shuffle大小?

我有一个JavaPairRDD< Integer,Integer []>我想在其上执行groupByKey操作.

groupByKey动作给了我一个

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle

如果我没有弄错的话,这实际上是一个OutOfMemory错误.这只发生在大数据集中(在我的情况下,当Web UI中显示的“Shuffle Write”为~96GB时).

我已经设定:

spark.serializer org.apache.spark.serializer.KryoSerializer

在$SPARK_HOME / conf / spark-defaults.conf中,但我不确定Kryo是否用于序列化我的JavaPairRDD.

除了设置这个conf参数之外,我还应该做些什么才能使用Kryo序列化我的RDD?我可以在serialization instructions中看到:

Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the Twitter chill library.

然后:

Since Spark 2.0.0,we internally use Kryo serializer when shuffling RDDs with simple types,arrays of simple types,or string type.

我还注意到,当我将spark.serializer设置为Kryo时,Web UI中的Shuffle Write从~96GB(认序列化器)增加到243GB!

编辑:在评论中,我被问及我的程序的逻辑,以防groupByKey可以用reduceByKey替换.我不认为这是可能的,但无论如何它在这里

>输入具有以下形式:

> key:index bucket id,
> value:此存储桶中的整数实体ID数组

> shuffle write操作以以下形式生成对:

> entityId
>同一个桶中所有实体ID的整数数组(称为邻居)

> groupByKey操作收集每个实体的所有邻居数组,其中一些可能出现多次(在许多存储桶中).
>在groupByKey操作之后,我为每个桶保留一个权重(基于它包含的负实体id的数量),并且对于每个邻居id,我总结它所属的桶的权重.
>我将每个邻居id的分数标准化为另一个值(假设它已经给出)并且每个实体发出前3个邻居.

我得到的不同密钥的数量大约是1000万(大约500万个正实体ID和500万个负数).

EDIT2:我尝试分别使用Hadoop的Writables(VIntWritable和VIntArrayWritable扩展ArrayWritable)而不是Integer和Integer [],但是shuffle大小仍然比认的JavaSerializer大.

然后我将spark.shuffle.memoryFraction从0.2增加到0.4(即使在版本2.1.0中已弃用,也没有说明应该使用的内容)并启用offheap内存,并且shuffle大小减少了~20GB.即使这符合标题所要求的内容,我更倾向于使用更算法的解决方案,或者包含更好压缩的解决方案.

最佳答案
我认为这里可以推荐的最佳方法(没有输入数据的更多具体知识)通常是在输入RDD上使用持久化API.

作为第一步,我尝试在输入上调用.persist(MEMORY_ONLY_SER),RDD以降低内存使用量(尽管在某个cpu开销下,对于你的情况来说不应该是针对整数的问题).

如果这还不够,你可以试试.persist(MEMORY_AND_disK_SER)或者如果你的shuffle仍然占用了大量内存,那么输入数据集需要在内存上更容易.persist(disK_ONLY)可能是一个选项,但是会强烈选择性能恶化.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


应用场景 C端用户提交工单、工单创建完成之后、会发布一条工单创建完成的消息事件(异步消息)、MQ消费者收到消息之后、会通知各处理器处理该消息、各处理器处理完后都会发布一条将该工单写入搜索引擎的消息、最终该工单出现在搜索引擎、被工单处理人检索和处理。 事故异常体现 1、异常体现 从工单的流转记录发现、
线程类,设置有一个公共资源 package cn.org.chris.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Descrip
Java中的数字(带有0前缀和字符串)
在Java 9中使用JLink的目的是什么?
Java Stream API Filter(过滤器)
在Java中找到正数和负数数组元素的数量
Java 9中JShell中的不同启动脚本是什么?
使用Java的位填充错误检测技术
java中string是什么
如何使用Java中的JSON-lib API将Map转换为JSON对象?