






  1. spark早期版本中,shuffleMapTask将所有数据写入bucket缓存后,才会刷新到磁盘,但是缓存就容易OOM,所以后来的版本,这个缓存设置了阈值,认100kb,写入数据达到缓存的阈值后,就会将数据刷新到磁盘。这种就不容易OOM,但是频繁刷新涉及过多IO操作,所以这个阈值也是性能优化的一个
  2. 和MapReduce不同,MapReduce需要将所有数据写入本地磁盘文件后才能启动reduce操作来拉取数据,因为MapReduce涉及排序,排序就需要对所有数据排序,所以要等全部数据写后才reduce;但是spark不用排序,shuffleMapTask每写入一点数据,ResultTask就可以拉取,然后在本地执行定义的函数和算子进行计算;这种机制好处就在于速度快,但是也有个问题,MapReduce提供的reduce是可以处理每个key对应的value的,很方便;但是spark这种机制导致不能提供直接处理key对应的value的算子,只能通过groupbykey,先shuffle,得到MapPartitionRDD,然后再用map算子处理每个key对应的values,不是很方便。






 * Obtained inside a map task to write out records to the shuffle system.
private[spark] abstract class ShuffleWriter[K, V] {
  /** Write a sequence of records to this task's output */
  def write(records: Iterator[Product2[K, V]]): Unit

  /** Close this writer, passing along whether the map completed */
  def stop(success: Boolean): Option[MapStatus]




/** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    // 判断是否需要使用Map端的合并
    // 上下两种情况,就是下面的不使用聚合也不使用排序
    sorter = if (dep.mapSideCombine) {
    // 需要进行map端聚合
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
    // 不需要进行map端聚合
      * 不会向sorter传入聚合或者排序,因为我们不关心在每个partition中key是否是有序
      * 如果正在执行的是sortByKey,那么排序操作会在reduce端实现
      * */
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    // sorter中保存了所有产生的溢写文件

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")

首先我们看看sorter是个啥,从new ExternalSorter进去ExternalSorter看看

 * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner
 * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then
 * optionally sorts keys within each partition using a custom Comparator. Can output a single
 * partitioned file with a different byte range for each partition, suitable for shuffle fetches.
 * If combining is disabled, the type C must equal V -- we'll cast the objects at the end.
 * Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied
 * to its use in sort-based shuffle (for example, its block compression is controlled by
 * `spark.shuffle.compress`).  We may need to revisit this if ExternalSorter is used in other
 * non-shuffle contexts where we might want to use different configuration settings.
 * @param aggregator optional Aggregator with combine functions to use for merging data
 * @param partitioner optional Partitioner; if given, sort by partition ID and then key
 * @param ordering optional Ordering to sort keys within each partition; should be a total ordering
 * @param serializer serializer to use when spilling to disk
 * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really
 * want the output keys to be sorted. In a map task without map-side combine for example, you
 * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do
 * want to do combining, having an Ordering is more efficient than not having it.
 * Users interact with this class in the following way:
 * 1. Instantiate an ExternalSorter.
 * 2. Call insertAll() with a set of records.
 * 3. Request an iterator() back to traverse sorted/aggregated records.
 *     - or -
 *    Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs
 *    that can be used in Spark's sort shuffle.
 * At a high level, this class works internally as follows:
 *  - We repeatedly fill up buffers of in-memory data, using either a PartitionedAppendOnlyMap if
 *    we want to combine by key, or a PartitionedPairBuffer if we don't.
 *    Inside these buffers, we sort elements by partition ID and then possibly also by key.
 *    To avoid calling the partitioner multiple times with each key, we store the partition ID
 *    alongside each record.
 *  - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first
 *    by partition ID and possibly second by key or by hash code of the key, if we want to do
 *    aggregation. For each file, we track how many objects were in each partition in memory, so we
 *    don't have to write out the partition ID for every element.
 *  - When the user requests an iterator or file output, the spilled files are merged, along with
 *    any remaining in-memory data, using the same sort order defined above (unless both sorting
 *    and aggregation are disabled). If we need to aggregate by key, we either use a total ordering
 *    from the ordering parameter, or read the keys with the same hash code and compare them with
 *    each other for equality to merge values.
 *  - Users are expected to call stop() at the end to delete all the intermediate files.

这个sorter可以排序并可能合并多个类型为 (K, V) 的键值对以生成类型为 (K, C) 的键组合器对(啥是K,V类型转为K,C,比如combineByKey就是将K,V类型转为K,C类型的算子)。使用 Partitioner 首先将键分组到分区中,然后有选择地使用自定义比较器对每个分区内的键进行排序。可以为每个分区输出具有不同字节范围的单个分区文件,适用于 shuffle fetch

  • aggregator:可选聚合器,带有用于合并数据的组合函数
  • partitioner:可选的分区器;如果给定,按分区 ID 排序,然后按key二次排序
  • ordering:可选排序以对每个分区内的键进行排序;应该是一个总排序
  • serializer:溢出到磁盘时使用的序列化程序


  1. 实例化一个ExternalSorter
  2. 一个record集合调用insertAll()方法
  3. 请求一个 iterator() 返回来遍历 sorted/aggregated 记录。或者说,调用 writePartitionedFile() 以创建一个包含 sorted/aggregated 输出文件,该输出可用于 Spark 的排序 shuffle


反复填充内存数据的缓冲区,如果我们想按键组合,则使用 PartitionedAppendOnlyMap,否则使用 PartitionedPairBuffer。在这些缓冲区内,我们按分区 ID 对元素进行排序,然后也可能按键排序。为了避免使用每个键多次调用分区器,我们将分区 ID 存储在每条记录旁边。

当每个缓冲区达到我们的内存限制时,我们将其溢出到一个文件中。如果我们想进行聚合,这个文件首先按分区 ID 排序,然后可能按键或键的哈希码排序。对于每个文件,我们跟踪内存中每个分区中有多少对象,因此我们不必为每个元素写出分区 ID。



def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // Todo: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    // 判断是否需要进行map端的合并操作
    //这个就是ExternalSorter里的处理逻辑,反复填充内存数据的缓冲区,如果我们想按键组合,则使用 PartitionedAppendOnlyMap,否则使用 PartitionedPairBuffer
    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      // 来一条数据我们首先使用Map在内存中进行更新
      // 下面两个是两个方法
      // 对应rdd.aggregatorByKey的 seqOp 参数
      // 如果map中有对应的key,那么使用mergeValue来更新对应的value
      val mergeValue = aggregator.get.mergeValue
      // 对应rdd.aggregatorByKey的zerovalue参数,利用zerovalue来创建Combiner
      // 如果map中没有对应的key,那么使用createCombiner来创建一个comniber
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      // 判断当前key是否之前出现过,从而决定是更新还是创建一个新的combiner
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      while (records.hasNext) {
        // 检查spill的频率
        // 获取下一条记录
        kv = records.next()
        // ((partition,key),update方法)
        map.changeValue((getPartition(kv._1), kv._1), update)
        // 判断是否需要将内存中的数据溢写到磁盘
        maybeSpillCollection(usingMap = true)
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)

可以看到,两个逻辑,填充内存数据的缓冲区,如果我们想按键组合,则使用 PartitionedAppendOnlyMap,否则使用 PartitionedPairBuffer


  // Data structures to store in-memory objects before we spill. Depending on whether we have an
  // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
  // store them in an array buffer.
  //根据我们是否有聚合器集,我们要么将对象放入 AppendOnlyMap 中,然后将它们组合起来,要么将它们存储在数组缓冲区中。
  @volatile private var map = new PartitionedAppendOnlyMap[K, C]
  @volatile private var buffer = new PartitionedPairBuffer[K, C]

map是 PartitionedAppendOnlyMap的实例,buffer是PartitionedPairBuffer的实例



 * Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples
 * of (partition ID, K)
 * WritablePartitionedPairCollection 的实现,它包装了一个映射,其中的键是 (partition ID, K) 的元组
private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getorElse(partitionComparator)

  def insert(partition: Int, key: K, value: V): Unit = {
    update((partition, key), value)


 * An append-only map that keeps track of its estimated size in bytes.
 * 只允许添加的map,会跟踪估计它的大小
private[spark] class SizeTrackingAppendOnlyMap[K, V]
  extends AppendOnlyMap[K, V] with SizeTracker
  override def update(key: K, value: V): Unit = {
    super.update(key, value)


  override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val newValue = super.changeValue(key, updateFunc)

  override protected def growTable(): Unit = {


   * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
   * for key, if any, or null otherwise. Returns the newly updated value.
  def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else if (k.eq(curKey) || k.equals(curKey)) {
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy

更新完value后,有个 super.afterUpdate()

   * Callback to be invoked after every update.
  protected def afterUpdate(): Unit = {
    numUpdates += 1
    if (nextSampleNum == numUpdates) {
   * Take a new sample of the current collection's size.
  private def takeSample(): Unit = {
    samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
    // Only use the last two samples to extrapolate
    if (samples.size > 2) {
    val bytesDelta = samples.toList.reverse match {
      case latest :: prevIoUs :: tail =>
        (latest.size - prevIoUs.size).todouble / (latest.numUpdates - prevIoUs.numUpdates)
      // If fewer than 2 samples, assume no change
      case _ => 0
    bytesPerUpdate = math.max(0, bytesDelta)
    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong

至此,changevalue结束。来看看changevalue后的maybeSpillCollection(usingMap = true),溢写数据到磁盘。

   * Spill the current in-memory collection to disk if needed.
   * @param usingMap whether we're using a map or buffer as our current in-memory collection
  private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize


   * Spills the current in-memory collection to disk if needed. Attempts to acquire more
   * memory before spilling.
   * @param collection collection to spill to disk
   * @param currentMemory estimated size of the collection in bytes
   * @return true if `collection` was spilled to disk; false otherwise
  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false//返回参数
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      // elementsRead:从上次spill后的大小
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      _elementsRead = 0
      _memoryBytesspilled += currentMemory


   * Spill our in-memory collection to a sorted file that we can merge later.
   * 将内存集合溢出到一个排序的文件,方便后面合并
   * We add this file into `spilledFiles` to find it later.
   * @param collection whichever collection we're using (map or buffer)
  override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
//这个destructiveSortedWritablePartitionedIterator,遍历数据并写出元素而不是返回它们。记录按其分区 ID 和给定比较器的顺序返回。这可能会破坏底层集合。
    val spillFile = spillMemoryIteratorTodisk(inMemoryIterator)
    spills += spillFile


 private def comparator: Option[Comparator[K]] = {
    if (ordering.isDefined || aggregator.isDefined) {
    } else {


  // A comparator for keys K that orders them within a partition to allow aggregation or sorting.
  //key K 的比较器,在分区内对它们进行排序以允许聚合或排序。
  // Can be a partial ordering by hash code if a total ordering is not provided through by the
  // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
  // non-equal keys also have this, so we need to do a later pass to find truly equal keys).
  // Note that we ignore this if no aggregator and no ordering are given.
  private val keyComparator: Comparator[K] = ordering.getorElse(new Comparator[K] {
    override def compare(a: K, b: K): Int = {
      val h1 = if (a == null) 0 else a.hashCode()
      val h2 = if (b == null) 0 else b.hashCode()
      if (h1 < h2) -1 else if (h1 == h2) 0 else 1


 * Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples
 * of (partition ID, K)
private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getorElse(partitionComparator)

  def insert(partition: Int, key: K, value: V): Unit = {
    update((partition, key), value)


   * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
   * //先按分区ID排序再按key排序
  def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
    new Comparator[(Int, K)] {
      override def compare(a: (Int, K), b: (Int, K)): Int = {
        val partitionDiff = a._1 - b._1
        if (partitionDiff != 0) {
        } else {
          keyComparator.compare(a._2, b._2)

private[spark] object WritablePartitionedPairCollection {
   * A comparator for (Int, K) pairs that orders them by only their partition ID.
   * 只按照分区id排序
  def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
    override def compare(a: (Int, K), b: (Int, K)): Int = {
      a._1 - b._1


   * Return an iterator of the map in sorted order. This provides a way to sort the map without
   * using additional memory, at the expense of destroying the validity of the map.
  def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
    destroyed = true
    // Pack KV pairs into the front of the underlying array
    //将 KV 对打包到底层数组的前面
    var keyIndex, newIndex = 0
    while (keyIndex < capacity) {
      if (data(2 * keyIndex) != null) {
        data(2 * newIndex) = data(2 * keyIndex)
        data(2 * newIndex + 1) = data(2 * keyIndex + 1)
        newIndex += 1
      keyIndex += 1
    assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
    new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

    new Iterator[(K, V)] {
      var i = 0
      var nullValueReady = haveNullValue
      def hasNext: Boolean = (i < newIndex || nullValueReady)
      def next(): (K, V) = {
        if (nullValueReady) {
          nullValueReady = false
          (null.asInstanceOf[K], nullValue)
        } else {
          val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
          i += 1



   * Spill the in-memory Iterator to a temporary file on disk.
  private[this] def spillMemoryIteratorTodisk(inMemoryIterator: Iterator[(K, C)])
      : diskMapIterator = {
    val (blockId, file) = diskBlockManager.createTempLocalBlock()
    val writer = blockManager.getdiskWriter(blockId, file, ser, filebufferSize, writeMetrics)
    var objectsWritten = 0

    // List of batch sizes (bytes) in the order they are written to disk
    val batchSizes = new ArrayBuffer[Long]

    // Flush the disk writer's contents to disk, and update relevant variables
    def flush(): Unit = {
      val segment = writer.commitAndGet()
      batchSizes += segment.length
      _diskBytesspilled += segment.length
      objectsWritten = 0

    var success = false
    try {
      while (inMemoryIterator.hasNext) {
        val kv = inMemoryIterator.next()
        writer.write(kv._1, kv._2)
        objectsWritten += 1

        if (objectsWritten == serializerBatchSize) {
        //  private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000)
      if (objectsWritten > 0) {
      } else {
      success = true
    } finally {
      if (!success) {
        // This code path only happens if an exception was thrown above before we set success;
        // close our stuff and let the exception be thrown further
        if (file.exists()) {
          if (!file.delete()) {
            logWarning(s"Error deleting ${file}")
//返回一个从磁盘映射中按排序顺序返回 (K, C) 对的迭代器
    new diskMapIterator(file, blockId, batchSizes)





  // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    //shuffleBlockResolver 创建和维护 shuffle 块在逻辑块和物理文件位置之间的映射。
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      //将添加到此 ExternalSorter 中的所有数据写入磁盘存储中的文件。这由 SortShuffleWriter 调用。返回文件每个分区的长度数组
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
     // 最终生成的mapStatus会随着executor向driver发送的状态更新信息发送给driver

    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")


   * Write all the data added into this ExternalSorter into a file in the disk store. This is
   * called by the SortShuffleWriter.
   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
  def writePartitionedFile(
      blockId: BlockId,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    val writer = blockManager.getdiskWriter(blockId, outputFile, serInstance, filebufferSize,

    if (spills.isEmpty) {
      // Case where we only have in-memory data
      val collection = if (aggregator.isDefined) map else buffer
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      while (it.hasNext) {
        val partitionId = it.nextPartition()
        while (it.hasNext && it.nextPartition() == partitionId) {
        val segment = writer.commitAndGet()
        lengths(partitionId) = segment.length
    } else {
      // We must perform merge-sort; get an iterator by partition and write everything directly.
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          val segment = writer.commitAndGet()
          lengths(id) = segment.length




   * Return an iterator over all the data written to this object, grouped by partition and
   * aggregated by the requested aggregator. For each partition we then have an iterator over its
   * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
   * partition without reading the prevIoUs one). Guaranteed to return a key-value pair for each
   * partition, in order of partition ID.
   * 返回写入此对象的所有数据的迭代器,按分区分组并由请求的聚合器聚合。对于每个分区,我们都有一个遍历其内容的迭代器,这些内容应该按顺序访问
   * For Now, we just merge all the spilled files in once pass, but this can be modified to
   * support hierarchical merging.
   * Exposed for testing.
  def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
    val usingMap = aggregator.isDefined
    val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
    if (spills.isEmpty) {
      // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
      // we don't even need to sort by anything other than partition ID
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition ID, not key
      } else {
        // We do need to sort by both partition ID and key
    } else {
      // Merge spilled and in-memory data
      merge(spills, destructiveIterator(


   * Merge a sequence of sorted files, giving an iterator over partitions and then over elements
   * inside each partition. This can be used to either write out a new file or return data to
   * the user.
   * 合并一系列已排序的文件,在分区上提供迭代器,然后在每个分区内的元素上提供迭代器。这可用于写出新文件或将数据返回给用户。
   * Returns an iterator over all the data written to this object, grouped by partition. For each
   * partition we then have an iterator over its contents, and these are expected to be accessed
   * in order (you can't "skip ahead" to one partition without reading the prevIoUs one).
   * Guaranteed to return a key-value pair for each partition, in order of partition ID.
  private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        // Perform partial aggregation across partitions
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        //没有给出聚合器,但我们有一个排序(例如,由 sortByKey 中的 reduce 任务使用);对元素进行排序而不尝试合并它们
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)


// 将属于同一个partition的多个iterator数据进行排序
private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
      : Iterator[Product2[K, C]] =
  // bufferedIterator就是查看了下一个元素之后不会丢弃
  val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
  type Iter = BufferedIterator[Product2[K, C]]
  // 这里使用优先级队列,底层使用堆来进行排序,将含有数据的多个iterator添加到堆中,使用队列的头元素进行比较
  val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
    // Use the reverse order because PriorityQueue dequeues the max
    // 比较两个buffer的头元素
    override def compare(x: Iter, y: Iter): Int = comparator.compare(y.head._1, x.head._1)
  heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = true
  new Iterator[Product2[K, C]] {
    override def hasNext: Boolean = !heap.isEmpty

    override def next(): Product2[K, C] = {
      if (!hasNext) {
        throw new NoSuchElementException
      // 获得堆顶的第一个元素,并且从队列中删除
      // 如果该iterator还有数据,那么再将该iterator添加到堆中
      val firstBuf = heap.dequeue()
      val firstPair = firstBuf.next()
      if (firstBuf.hasNext) {


// 针对含有同一个partition数据的多个iterator进行合并,如果需要全局有序,还需要排序
private def mergeWithAggregation(
      iterators: Seq[Iterator[Product2[K, C]]],
      mergeCombiners: (C, C) => C,
      comparator: Comparator[K],
      totalOrder: Boolean)
      : Iterator[Product2[K, C]] =
  if (!totalOrder) {
    // 不需要进行全局排序
    // 因为没有定义全局的comparator,所以使用Key的hashcode进行排序

    new Iterator[Iterator[Product2[K, C]]] {
      // 和之前的一样,使用堆排序
      val sorted = mergeSort(iterators, comparator).buffered

      // Buffers reused across elements to decrease memory allocation
      val keys = new ArrayBuffer[K]
      val combiners = new ArrayBuffer[C]

      override def hasNext: Boolean = sorted.hasNext

      // next() 会将key的hashcode的记录进行聚合,并且返回聚合结果
      override def next(): Iterator[Product2[K, C]] = {
        if (!hasNext) {
          throw new NoSuchElementException
        // 获得下一个最小元素
        val firstPair = sorted.next()

        // 将key的hashcode相等但是key不相等的key添加到buffer中
        keys += firstPair._1
        combiners += firstPair._2
        val key = firstPair._1
        // 这里其实就是判断是否有相同的key,进行合并
        // 当前遍历到的key和第一个key相同
        // 因为使用的是认的使用hashCode比较器,所以就算两个key的hashcode相等,但是key可能是不等的
        // 下面的循环就是遍历和第一个数据的key的hashcode相等的数据
        while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
          val pair = sorted.next()
          var i = 0
          var foundKey = false
          // 判断keys中是否有和当前遍历到的key相等
          // 如果有相同的key,那么就更新combiner的值,进行聚合
          while (i < keys.size && !foundKey) {
            if (keys(i) == pair._1) {
              combiners(i) = mergeCombiners(combiners(i), pair._2)
              foundKey = true
            i += 1
          // 如果没有和当前key相同的,那么将当前key添加到keys buffer中
          if (!foundKey) {
            keys += pair._1
            combiners += pair._2

        // Note that we return an iterator of elements since we Could've had many keys marked
        // equal by the partial order; we flatten this below to get a flat iterator of (K, C).
    }.flatMap(i => i)
  } else {
    // We have a total ordering, so the objects with the same key are sequential.
    new Iterator[Product2[K, C]] {
      val sorted = mergeSort(iterators, comparator).buffered

      override def hasNext: Boolean = sorted.hasNext

      override def next(): Product2[K, C] = {
        if (!hasNext) {
          throw new NoSuchElementException
        val elem = sorted.next()
        val k = elem._1
        var c = elem._2
        // 因为有序,所以不会造成,Key的hashcode相同但是key不相同的记录排在一起
        // 不需要buffer了
        while (sorted.hasNext && sorted.head._1 == k) {
          val pair = sorted.next()
          c = mergeCombiners(c, pair._2)
        (k, c)


  1. 生成一个ExternalSorter对象,该对象是接下来进行操作的对象
  2. 调用ExternalSorter的insertAll,将所有的数据写入到磁盘,留一部分最新的数据在内存中
  3. 针对每条数据,会调用map的changeValue(),该方法首先判断hash表中,是否有当前记录的(partitionId,key),如果没有,那么直接在使用(partitionId,key)的hash值通过计算得到的位置上添加当前记录,如果已经存在,那么在该位置上进行聚合
  4. 在每次添加数据后,会判断当前的更新次数是否达到了指定的值,如果达到了,需要对当前map所占用的byte个数进行评估,使用SizeEstimator来预估当前map所占用的bytes,并且使用一个samples队列保存近两次的预估结果,计算每次更新增加的byte个数,并且设置下一次采样时的更新次数
  5. 添加完一次记录,我们就需要判断是否需要向磁盘溢写,使用在上一步计算的每次更新增加的byte个数乘以当前距最近的一次采样的更新次数增加量计算当前map占用的byte个数,如果map占用的byte个数满足某些条件,那么就开始溢写
  6. 溢写过程是这样的:因为map使用hash表,所以底层数据并不是每个位置都有数据,所以首先需要将所有的数据都移动到数组的右边,然后按照partition开始排序,如果定义了全局排序规则,那么在每个partition中使用该规则对记录进行排序,否则使用认的使用key的hashcode进行排序,将排好序的数据写入到文件
  7. 当所有的数据都写入到文件中后,我们需要合并多个溢写文件一个文件,因为文件的记录都是按照partitionid进行排序的,并且我们按照partitionId递增的顺序来合并数据,所以我们可以很快地拿到每个文件中属于当前partition的数据
  8. 如果不需要聚合,但是需要排序,那么我们将每个文件中属于当前partition的数据组成一个iterator,创建一个堆,将每个iterator添加到堆中,排序规则是每个iterator第一个元素的大小比较规则,每次从堆顶的iterator抽取第一个元素添加到排序结果中,并且将堆顶iterator从堆中移除,如果该iterator还有元素,那么再添加到堆中,这样就完成了排序
  9. 如果需要聚合,首先也需要排序,这样能够使相同的key排在一起,然后顺序遍历每个元素,进行聚合,这里需要注意,因为如果没有定义全局排序规则,会使用认的key的hashCode来排序,因此会使得hashCode相同但是key不同的记录排在一起,因此这里还需要使用一个buffer来缓存hashCode相同但是key不同的聚合结果
  10. 像上面那样遍历处理每个partition,将每次的结果都写入到一个文件中,该文件就是最终的数据文件
  11. 利用每个partition在数据文件中所占的空间大小,生成索引文件

