微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 spark scala 中按顺序聚合键值

如何解决在 spark scala 中按顺序聚合键值

我正在尝试在 spark (Scala) 中实现矩阵 A 的分布式奇异值分解。我已经设法将乘积 A.t*A 的所有元素计算为 RDD 上的转换(A.t A 的转置),并将其作为 RDD[(Int,Int),Double)] 形式的 RDD

Array(((0,0),66.0),((0,2),90.0),((1,78.0),108.0),((2,1),93.0),126.0),90.0))

其中键 (j,k) 表示值应该位于矩阵 A.t*A 中的哪一行和哪一列。 最后,我希望将行作为密集矩阵(但我愿意接受其他建议)。

我尝试在元组的第一部分使用这样的aggregateByKey(指示值应该在矩阵中的哪一行):

aggregateByKey(new HashSet[Double])(_+_,_++_)

但是我没有在最终矩阵的行中以正确的顺序获取元素。

有什么好的办法吗?我在下面发布了代码,所以也许它可能有用。

谢谢您并致以亲切的问候。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}

var m = sc.parallelize(Array((1,2,3),(4,5,6),(7,8,9)))  

import scala.collection.mutable.ArrayBuffer


//Function that maps an indexed row (a_1,...,a_n) to   ((j,k),a_j*a_k)
def f(v: IndexedRow): Array[((Int,Double)]={
var keyvaluepairs = ArrayBuffer[((Int,Double)]()
for(j<-0 to v.vector.size-1){
  for(k<-0 to v.vector.size-1){
  keyvaluepairs.append(((j,v.vector(j)*v.vector(k)))
  }
}
keyvaluepairs.toArray
}

//map M to key-value rdd where key =(j,k) and value = a_ij*a_ik.
val keyvalRDD = A.flatMap(row =>f(row))


//Sum up all key-value pairs that have the same key (j,k) (corresponts to getting the element of A.T*A on the j:th row and k:th column).
val keyvalSum = keyvalRDD.reduceByKey((x,y)=>x+y)

val rowkeySum = keyvalSum.map(x=>(x._1._2,x._2))  // The keys are of the form (j,k). just save the index that indicate of which row it should be in the matrix.

import scala.collection.immutable.HashSet
val mergeRows = rowkeySum.aggregateByKey(new HashSet[Double])(_+_,_++_)

import breeze.linalg.{Vector,DenseMatrix}

val Rows = mergeRows.map(x=>x._2.toArray)

//Throw away the keys,turn the rows to Arrays and collect.

val dm = DenseMatrix(Rows:_*)

解决方法

尝试用坐标矩阵构建矩阵:

  def calculate(sc: SparkContext) = {
    val matrix =
      sc.parallelize(Array(((0,0),66.0),((0,2),90.0),((1,78.0),108.0),((2,1),93.0),126.0),90.0)))
      .map(el => MatrixEntry(el._1._1,el._1._2,el._2))

    var i = 0
    val mat = new CoordinateMatrix(matrix)

    val m = mat.numRows()
    val n = mat.numCols()
    val result = DenseMatrix.zeros[Double](m.toInt,n.toInt)

    mat.toRowMatrix().rows.collect().foreach { vec =>
      vec.foreachActive { case (index,value) =>
        result(i,index) = value
      }
      i += 1
    }

    println("Result: " + result)
  }

结果:

66.0  78.0   90.0   
78.0  93.0   108.0  
90.0  108.0  126.0

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。