如何解决迭代 GraphFrames AggregateMessages 达到内存限制
我正在使用 GraphFrame 的 aggregateMessages 功能来构建自定义聚类算法。我在一个小样本数据集(约 100 个项目)上测试了这个算法,并验证了它的工作原理。但是当我在包含 50k 个项目的真实数据集上运行它时,在大约 10 次迭代后出现 OOM 错误。有趣的是,前几次迭代在几分钟内处理完毕,而 mem 是正常范围。在第 6 次迭代之后,内存使用量攀升至约 30GB 并最终爆炸。我在 16 核 32GB 的 2 节点集群上运行它。
由于这是一个迭代算法,而且每次迭代后的内存只会增加,我想知道我是否需要以某种方式释放内存。我在循环的末尾添加了 unpersist 块,但这并没有帮助。
我可以使用其他任何效率吗?在迭代设置中使用 GraphFrames 是否有最佳实践?
我注意到的另一件事是,在执行程序页面的 spark UI 上,使用的“存储内存”大约为 300MB,但 spark 进程实际上占用了大约 30GB。不确定这是否是内存泄漏!
while ( true ) {
System.out.println("["+new Date()+"] Running " + i);
Dataset<Row> lastRoutesDs = groups;
Dataset<Row> groupUnwind = groups.withColumn("id",explode(col("routeItems")));
GraphFrame gf = new GraphFrame(groupUnwind,edgesDs);
Dataset<Row> lvl1 = gf.aggregateMessages()
.sendToSrc(when(
callUDF("contains_in_array_str",AggregateMessages.dst().getField("routeItems"),AggregateMessages.src().getField("id")).equalTo(false),struct(AggregateMessages.dst().getField("routeItems").as("routeItems"),AggregateMessages.dst().getField("routescores").as("routescores"),AggregateMessages.dst().getField("grpId").as("grpId"),AggregateMessages.dst().getField("grpscore").as("grpscore"),AggregateMessages.edge().getField("score").as("edgescore"))))
.agg(collect_set(AggregateMessages.msg()).as("incomings"))
.withColumn("inItem",explode(col("incomings")))
.groupBy("id","inItem.grpId")
.agg(first("inItem.routeItems").as("routeItems"),first("inItem.routescores").as("routescores"),first("inItem.grpscore").as("grpscore"),collect_list("inItem.edgescore").as("inscores"))
.groupBy("grpId")
.agg(bestRouteAgg.apply(col("routeItems"),col("routescores"),col("inscores"),col("grpscore"),col("id"),col("grpscore")).as("best"))
.withColumn("newscore",callUDF("calcRoutescores",expr("size(best.routeItems)+1"),col("best.routescores"),col("best.inscores")))
.withColumn("edgeCount",expr("size(best.routescores)"))
.persist(StorageLevel.MEMORY_AND_disK());
lvl1
.filter("newscore > " + groupMaxscore)
.withColumn("itr",lit(i))
.select("grpId","best.routeItems","best.routescores","best.grpscore","edgeCount","itr")
.write()
.mode(SaveMode.Append)
.json(workspaceDir + "clusters-rank-collect");
if (lvl1.count() == 0) {
System.out.println("****** End reached " + i);
break;
}
Dataset<Row> newGroups = lvl1.filter("newscore <= " + groupMaxscore)
.withColumn("routeItems_new",callUDF("merge2Array",col("best.routeItems"),array(col("best.newNode"))))
.withColumn("routescores_new",callUDF("merge2ArrayDouble",col("best.inscores")))
.select(col("grpId"),col("routeItems_new").as("routeItems"),col("routescores_new").as("routescores"),col("newscore").as("grpscore"));
if (i > 0 && (i % 2) == 0) {
newGroups = newGroups
.checkpoint();
}
newGroups = newGroups
.persist(StorageLevel.disK_ONLY());
System.out.println( newGroups.count() );
groups.unpersist();
lastRoutesDs.unpersist();
groupUnwind.unpersist();
lvl1.unpersist();
groups = newGroups;
i++;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。