微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Spark:在Java中有效地使用mapPartitions

在当前早期发布的名为High Performance Spark的教科书中,Spark的开发人员注意到:

To allow Spark the flexibility to spill some records
to disk, it is important to represent your functions inside of mapPartitions in such a
way that your functions don’t force loading the entire partition in-memory (e.g.
implicitly converting to a list). Iterators have many methods we can write functional style
transformations on, or you can construct your own custom iterator. When a
transformation directly takes and returns an iterator without forcing it through
another collection, we call these iterator-to-iterator transformations.

但是,教科书缺少使用mapPartitions或类似方法变体的好例子.并且在线存在很少的好代码示例 – 其中大多数是Scala.例如,我们使用0732上由zero323编写的mapPartitions来查看此Scala代码.

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

不幸的是,Java并没有为迭代器提供像iter.map(…)那样好的东西.所以它引出了一个问题,如何有效地使用mapPartitions的迭代器到迭代器转换而不将RDD作为列表完全溢出到磁盘?

JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

这似乎是在Java示例中使用mapPartitions的一般语法,但我不知道这是多么有效,假设你有一个包含数万条记录的JavaRDD(甚至更多……因为,Spark是对于大数据).你最终会得到一个迭代器中所有对象的列表,只是把它变回迭代器(这就要求某种地图函数在这里效率更高).

注意:虽然使用mapPartitions的这8行代码可以写成带有map或flatMap的1行,但我有意使用mapPartitions来利用它对每个分区而不是RDD中的每个元素进行操作的事实.

请问有什么想法吗?

解决方法:

防止强制整个分区“实现”的一种方法是将Iterator转换为Stream,然后使用Stream的功能API(例如map函数).

How to convert an iterator to a stream?提出了一些将Iterator转换为Stream的好方法,因此我们可以选择其中一个选项:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

哪个应该是“Itrator-to-Iterator”转换,因为所有使用的中间API(Iterable,Stream)都是懒惰的评估.

编辑:我自己没有测试过,但是OP评论说,“通过在列表上使用Stream没有提高效率”.我不知道为什么会这样,而且我不知道这一般是否属实,但值得一提.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐