如何解决使用pyspark中的图框Pregel API在组织和深度上进行员工层次结构
我有spark / scala Graphx解决方案,可以解决员工层次结构问题,并向我提供每位员工与高层管理人员相比的深度。它在内部使用pregel API。我可以使用pyspark图框实现相同的功能吗,如果可以的话,可以使用pregel api。
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
type Role = String
case class Employee(name: String,role: Role)
val employeeRawData = Array(
(1L,"Steve","Jobs","CEO",None),(2L,"Leslie","Lamport","CTO",Some(1L)),(3L,"Jason","Fried","Manager",(4L,"Joel","Spolsky",Some(2L)),(5L,"Jeff","Dean","Lead",Some(4L)),(6L,"Martin","Odersky","Sr.Dev",Some(5L)),(7L,"Linus","Trovalds","Dev",Some(6L)),(8L,"Wozniak",(9L,"Matei","Zaharia",(10L,"James","Faeldon","Intern",Some(7L))
)
val employeeDf = sc.parallelize(employeeRawData,4).toDF(
"employeeId","firstName","lastName","role","supervisorId"
).cache()
val verticesRdd: RDD[(VertexId,Employee)] = employeeDf
.select($"employeeId",concat($"firstName",lit(" "),$"lastName"),$"role")
.rdd.map(emp => (emp.getLong(0),Employee(emp.getString(1),emp.getString(2))))
val edgesRdd: RDD[Edge[String]] = employeeDf
.filter($"supervisorId".isNotNull) # Remove vertices without supervisor,in Scala None === Null
.select($"supervisorId",$"employeeId",$"role") # First column is supervisorID (not employeeId),since direction of edge is top-down
.rdd.map(emp => Edge(emp.getLong(0),emp.getLong(1),emp.getString(2))) # Edge property is the Role
# Define a default employee in case there are missing employee referenced in Graph
val missingEmployee = Employee("John Doe","UnkNown")
# Let's build the graph model
val employeeGraph: Graph[Employee,String] = Graph(verticesRdd,edgesRdd,missingEmployee)
# The structure of the message to be passed to vertices
case class EmployeeMessage(
currentId: Long,# Tracks the most recent vertex appended to path and used for flagging isCyclic
level: Int,# The number of up-line supervisors (level in reporting heirarchy)
head: String,# The top-most supervisor
path: List[String],# The reporting path to the the top-most supervisor
isCyclic: Boolean,# Is the reporting structure of the employee cyclic
isLeaf: Boolean # Is the employee rank and file (no down-line reporting employee)
)
# The structure of the vertex values of the graph
case class EmployeeValue(
name: String,# The employee name
currentId: Long,# Initial value is the employeeId
level: Int,# Initial value is zero
head: String,# Initial value is this employee's name
path: List[String],# Initial value contains this employee's name only
isCyclic: Boolean,# Initial value is false
isLeaf: Boolean # Initial value is true
)
# Initialize the employee vertices
val employeeValueGraph: Graph[EmployeeValue,String] = employeeGraph.mapVertices { (id,v) =>
EmployeeValue(
name = v.name,currentId = id,level = 0,head = v.name,path = List(v.name),isCyclic = false,isLeaf = false
)
}
def vprog(
vertexId: VertexId,value: EmployeeValue,message: EmployeeMessage
): EmployeeValue = {
if (message.level == 0) { #superstep 0 - initialize
value.copy(level = value.level + 1)
} else if (message.isCyclic) { # set isCyclic
value.copy(isCyclic = true)
} else if (!message.isLeaf) { # set isleaf
value.copy(isLeaf = false)
} else { # set new values
value.copy(
currentId = message.currentId,level = value.level + 1,head = message.head,path = value.name :: message.path
)
}
}
def sendMsg(
triplet: EdgeTriplet[EmployeeValue,String]
): Iterator[(VertexId,EmployeeMessage)] = {
val src = triplet.srcAttr
val dst = triplet.dstAttr
# Handle cyclic reporting structure
if (src.currentId == triplet.dstId || src.currentId == dst.currentId) {
if (!src.isCyclic) { # Set isCyclic
Iterator((triplet.dstId,EmployeeMessage(
currentId = src.currentId,level = src.level,head = src.head,path = src.path,isCyclic = true,isLeaf = src.isLeaf
)))
} else { # Already marked as isCyclic (possibly,from prevIoUs superstep) so ignore
Iterator.empty
}
} else { # Regular reporting structure
if (src.isLeaf) { # Initially every vertex is leaf. Since this is a source then it should NOT be a leaf,update
Iterator((triplet.srcId,isLeaf = false # This is the only important value here
)))
} else { # Set new values by propagating source values to destination
#Iterator.empty
Iterator((triplet.dstId,# Set to false so that cyclic updating is ignored in vprog
isLeaf = true # Set to true so that leaf updating is ignored in vprog
)))
}
}
}
def mergeMsg(msg1: EmployeeMessage,msg2: EmployeeMessage): EmployeeMessage = msg2
val initialMsg = EmployeeMessage(
currentId = 0L,head = "",path = Nil,isLeaf = true
)
val results = employeeValueGraph.pregel(
initialMsg,Int.MaxValue,EdgeDirection.Out
)(
vprog,sendMsg,mergeMsg
)
val resultDf = results
.vertices.map { case (id,v) => (id,v.name,v.level,v.head,v.path.reverse.mkString(">"),v.isCyclic,v.isLeaf) }
.toDF("id","employee","level","head","path","cyclic","leaf")
val df = resultDf.withColumn("letters",split(col("path"),">"))
val numCols = df
.withColumn("letters_size",size($"letters"))
.agg(max($"letters_size"))
.head()
.getInt(0)
df.drop(col("path")).drop(col("leaf")).drop("cyclic")
.select( col("*") +:
(0 until numCols).map(i => $"letters".getItem(i).as(s"level$i")): _*
).drop(col("letters")).orderBy(col("level"))
.show()
有人可以指导我进行pyspark graphframes转换。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。