微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark RDD分区效果

如何解决Spark RDD分区效果

我对分区操作感到困惑。请参见下面的代码

import org.apache.spark._
import org.apache.log4j._


object FriendsByAge {

  def parseLine(line: String)={
    val fields = line.split(",")
    val age = fields(2).toInt
    val numFriends = fields(3).toInt

    (age,numFriends)
  }

  def main(args: Array[String]) = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]","FriendsByAge")

    val lines = sc.textFile("./data/fakefriends-noheader.csv").repartition(1000)
    val rdd = lines.map(parseLine)

    println(rdd.getNumPartitions)

    val totalsByAge = rdd.mapValues(x=> (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))

    println(totalsByAge.getNumPartitions)

    val averagesByAges = totalsByAge.mapValues(x => x._1/x._2)

    println(averagesByAges.getNumPartitions)
    val results = averagesByAges.collect()

    results.sortWith(_._2> _._2).foreach(println)
  }


}

在这里,我在将文件读入1000个分区后将rdd重新分区。由于映射操作会创建新的RDD,因此不会保留分区。我仍然看到相同数量的分区。

问题是我怎么知道子RDD是否将保留父RDD分区?子RDD将使重新分区失效的标准是什么。

解决方法

mapValues不会更改已经生效的分区,它是narrow转换。你有两个。

reduceByKey是关联的。 Spark在本地聚合,然后将这些结果发送到驱动程序或相关分区-在您的情况下。如果您没有在reduceByKey上使用number of partitions上的参数,则新RDD会保留相同数量的分区,尽管分布不同。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。