如何解决为什么我的代码需要那么多时间在Spark Pregel中执行?
我已经在Spark中使用pregel编写了代码,该代码可以处理图形,但是对于小型数据集,它的执行速度非常慢。我以前用pregel编写过程序,但是此代码确实运行缓慢。我的集群由2名工人组成。每个都有核心i5 cpu和6 GB RAM。 这是我在pregel中编写的代码:
def run[VD,ED: classtag](graph: Graph[VD,ED],maxSteps: Int) = {
val temp_graph = graph.mapVertices { case (vid,_) => mutable.HashMap[VertexId,(Double,VertexId)](vid -> (1,vid)) }
def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId,VertexId)],ED]): Iterator[(VertexId,mutable.HashMap[VertexId,List[(Double,VertexId)]])] = {
val msg1 = e.dstAttr.map{ case (k,v) => (k,List(v)) }
val msg2 = e.srcAttr.map{ case (k,List(v)) }
Iterator((e.srcId,msg1),(e.dstId,msg2))
}
def mergeMessage(count1: (mutable.HashMap[VertexId,VertexId)]]),count2: (mutable.HashMap[VertexId,VertexId)]]))= {
val communityMap = new mutable.HashMap[VertexId,VertexId)]]
(count1.keySet ++ count2.keySet).map(key => {
val count1Val: List[(Double,VertexId)] = count1.getorElse(key,Nil)
val count2Val: List[(Double,VertexId)] = count2.getorElse(key,Nil)
val pp = List(count1Val:::count2Val).flatten
communityMap += key-> pp
})
communityMap
}
def vertexProgram(vid: VertexId,attr: mutable.HashMap[VertexId,message: mutable.HashMap[VertexId,VertexId)]]) = {
if (message.isEmpty) attr
else {
val labels_score: mutable.HashMap[VertexId,Double] = message.map {
key =>
var value_sum = 0D
var maxSimilar_result = 0D
val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
case Some(x) => x._2 // most similar neighbor
// case _ => -1
}
if (key._2.exists(x=>x._2==max_similar)) {
maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
case Some(v) => v._1 // is the most similar node in the List?
// case _ => 0D
}
}
else maxSimilar_result = 0D
key._2.map {
values =>
value_sum += values._1 * (broadcastvariable.value(vid)(values._2)._2)
}
value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)
(key._1,value_sum) //label list
}
val max_value = labels_score.maxBy(x=>x._2)._2.todouble
val dividedByMax: mutable.Map[VertexId,Double)] = labels_score.map(x=>(x._1,(x._2,x._2/max_value))) // divide by maximum value
val resultMap: mutable.HashMap[VertexId,Double)] = new mutable.HashMap[VertexId,Double)]
dividedByMax.map{ row => // select labels more than threshold P = 0.75
if (row._2._1 >= p) resultMap += row
}
val xx = if (resultMap.isEmpty) dividedByMax.take(1).asInstanceOf[mutable.HashMap[VertexId,Double)]]
else resultMap
val rr = xx.map(x=>(x._1,x._2._1))
val max_for_normalize= rr.values.sum
val res: mutable.HashMap[VertexId,VertexId)] = rr.map(x=>(x._1->(x._2/max_for_normalize,vid))) // normalize labels
res
}
}
val initialMessage = mutable.HashMap[VertexId,VertexId)]]()
val overlapCommunitiesGraph = pregel(temp_graph,initialMessage,maxIterations = maxSteps)(
vprog = vertexProgram,sendMsg = sendMessage,mergeMsg = mergeMessage)
overlapCommunitiesGraph
}
谁能解释执行缓慢的问题所在?是对的,因为我有2个工作人员,并且pregel中有很多消息传递和减少操作,所以我的代码在大型数据集中的性能会大大降低?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。