如何解决如何使用 Pyspark 中的 Graphframes 和 Spark Dataframe 中的原始数据获取连接组件?
+--+-----+---------+
|id|phone| address|
+--+-----+---------+
| 0| 123| james st|
| 1| 177|avenue st|
| 2| 123|spring st|
| 3| 999|avenue st|
| 4| 678| 5th ave|
+--+-----+---------+
我正在尝试使用 graphframes
包从上面的火花数据中使用 phone 和 address 来识别 ids 的连接组件框架。所以这个数据框可以被视为图的顶点数据框。
我想知道创建图表的边数据框以馈送到 connectedComponents()
中的 graphframes
函数的最佳方法是什么?
理想情况下,edges
数据框应如下所示:
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| 0 | 2| same_phone|
| 1 | 3|same_address|
+---+---+------------+
最后,connectedComponents()
结果应如下所示。 id 0 & 1 基于 same_phone 关系在同一个组件中,1 & 3 基于 相同地址关系。然后,这将使 4 成为另一个与其他 id 无关的组件。
+---+-------------------+
|id |connected_component|
+---+-------------------+
|0 |1 |
|1 |2 |
|2 |1 |
|3 |2 |
|4 |3 |
+---+-------------------+
提前致谢!
解决方法
from functools import reduce
edges = reduce(
lambda x,y: x.union(y),[df.alias('t1')
.join(df.alias('t2'),c)
.filter('t1.id < t2.id')
.selectExpr('t1.id src','t2.id dst',"'same_%s' relationship"% c) for c in df.columns[1:]
]
)
edges.show()
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| 0| 2| same_phone|
| 1| 3|same_address|
+---+---+------------+
import pyspark.sql.functions as F
from pyspark.sql.window import Window
connect = edges.select(
F.array_sort(F.array('src','dst')).alias('arr')
).distinct().union(
df.join(edges,(df.id == edges.src) | (df.id == edges.dst),'anti').select(F.array('id'))
).withColumn(
'connected_component',F.row_number().over(Window.orderBy('arr'))
).select(F.explode('arr').alias('id'),'connected_component')
connect.show()
+---+-------------------+
| id|connected_component|
+---+-------------------+
| 0| 1|
| 2| 1|
| 1| 2|
| 3| 2|
| 4| 3|
+---+-------------------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。