微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么我的代码需要那么多时间在Spark Pregel中执行?

如何解决为什么我的代码需要那么多时间在Spark Pregel中执行?

我已经在Spark中使用pregel编写了代码,该代码可以处理图形,但是对于小型数据集,它的执行速度非常慢。我以前用pregel编写过程序,但是此代码确实运行缓慢。我的集群由2名工人组成。每个都有核心i5 cpu和6 GB RAM。 这是我在pregel中编写的代码

def run[VD,ED: classtag](graph: Graph[VD,ED],maxSteps: Int) = {

      val temp_graph = graph.mapVertices { case (vid,_) => mutable.HashMap[VertexId,(Double,VertexId)](vid -> (1,vid)) }

      def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId,VertexId)],ED]): Iterator[(VertexId,mutable.HashMap[VertexId,List[(Double,VertexId)]])] = {
        val msg1 = e.dstAttr.map{ case (k,v) => (k,List(v)) }
        val msg2 = e.srcAttr.map{ case (k,List(v)) }
        Iterator((e.srcId,msg1),(e.dstId,msg2))
      }

      def mergeMessage(count1: (mutable.HashMap[VertexId,VertexId)]]),count2: (mutable.HashMap[VertexId,VertexId)]]))= {

        val communityMap = new mutable.HashMap[VertexId,VertexId)]]

        (count1.keySet ++ count2.keySet).map(key => {

          val count1Val: List[(Double,VertexId)] = count1.getorElse(key,Nil)
          val count2Val: List[(Double,VertexId)] = count2.getorElse(key,Nil)

          val pp = List(count1Val:::count2Val).flatten
          communityMap += key-> pp
        })
        communityMap
      }

      def vertexProgram(vid: VertexId,attr: mutable.HashMap[VertexId,message: mutable.HashMap[VertexId,VertexId)]]) = {

        if (message.isEmpty) attr
        else {

          val labels_score: mutable.HashMap[VertexId,Double] = message.map {

            key =>

              var value_sum = 0D
              var maxSimilar_result = 0D

              val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
                case Some(x) => x._2 // most similar neighbor
                // case _ => -1
              }


              if (key._2.exists(x=>x._2==max_similar)) {
                maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
                  case Some(v) => v._1 // is the most similar node in the List?
                  // case _ => 0D
                }
              }
              else maxSimilar_result = 0D

              key._2.map {
                values =>

                  value_sum += values._1 * (broadcastvariable.value(vid)(values._2)._2)

              }
              value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)

              (key._1,value_sum) //label list
          }


          val max_value = labels_score.maxBy(x=>x._2)._2.todouble
          val dividedByMax: mutable.Map[VertexId,Double)] = labels_score.map(x=>(x._1,(x._2,x._2/max_value))) // divide by maximum value

          val resultMap: mutable.HashMap[VertexId,Double)] = new mutable.HashMap[VertexId,Double)]

          dividedByMax.map{ row => // select labels more than threshold P = 0.75
            if (row._2._1 >= p) resultMap += row
          }


          val xx = if (resultMap.isEmpty) dividedByMax.take(1).asInstanceOf[mutable.HashMap[VertexId,Double)]]
          else resultMap


          val rr = xx.map(x=>(x._1,x._2._1))
          val max_for_normalize= rr.values.sum

          val res: mutable.HashMap[VertexId,VertexId)] = rr.map(x=>(x._1->(x._2/max_for_normalize,vid))) // normalize labels

          res
        }
      }


      val initialMessage = mutable.HashMap[VertexId,VertexId)]]()

      val overlapCommunitiesGraph = pregel(temp_graph,initialMessage,maxIterations = maxSteps)(
        vprog = vertexProgram,sendMsg = sendMessage,mergeMsg = mergeMessage)

      overlapCommunitiesGraph
    }

谁能解释执行缓慢的问题所在?是对的,因为我有2个工作人员,并且pregel中有很多消息传递和减少操作,所以我的代码在大型数据集中的性能会大大降低?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。