微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何通过将Pregel消息数组与先前迭代中的数组合并来使用VertexColumn更新

如何解决如何通过将Pregel消息数组与先前迭代中的数组合并来使用VertexColumn更新

我正在尝试实现一个简单的消息传递系统以了解其工作原理。我的目标是将每个顶点的名称发送给父对象。理想的实现是将以前的实现创建的现有“ iterationMsg”数组合并到具有这些值的新数组中。我已经使用与信念传播相同的逻辑,使用AggregateMessages函数实现了这一点,但是我愿意使用pregel API来实现。如何在“ iterationMsg”中现有数组的位置处引用“ iterationMsg”中的数组?

v = sqlc.createDataFrame([
            (0,"Anna",24),(1,"Bob",26),(2,"Charlie",27),(3,"David",(4,"Eric",],["id","name","age"])
    
        # Edge DataFrame
    e = sqlc.createDataFrame([
            (0,1,"friend"),2,"loves"),"pays"),(5,3,"boss"),(6,4,(7,"boss")
        ],"src","dst","label"])

   g = GraphFrame(v.withColumn('state',f.lit(True)).withColumn('iterationMsg',f.array()),e)  

   NR_ITER = 2
        
   msgpassing = g.pregel \
        .setMaxIter(NR_ITER) \
        .withVertexColumn("iterationMsg",f.array(),f.array_union('existing array in iterationMsg',pregel.msg())) \
        .sendMsgToSrc(f.when(pregel.dst('state'),f.array_union(f.array(pregel.dst('name')),pregel.dst('iterationMsg')))) \
        .aggMsgs(f.array(pregel.msg())) \
        .run()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。