如何解决为什么在Pyspark Dataframe上切片,拆分和存储数据会很费时?
我对Pyspark还是陌生的,我正在处理大量文件。我大约有2.5 TB的数据,并且正在从每个文件中提取一些元数据并将其存储在Dataframe中。下表中给出了这种元数据的示例
|-------|----------|----------|--------------|--------------|
| ID | FileType | Metadata | AssociatedID | Group ID |
|-------|----------|----------|--------------|--------------|
| 1 | Type 1 | xyz | 3 | A |
|-------|----------|----------|--------------|--------------|
| 2 | Type 1 | fgh | 4 | A |
|-------|----------|----------|--------------|--------------|
| 3 | Type 2 | | | A |
|-------|----------|----------|--------------|--------------|
| 4 | Type 2 | | | A |
|-------|----------|----------|--------------|--------------|
| 5 | Type 4 | | | A |
|-------|----------|----------|--------------|--------------|
一旦我提取了此元数据,便要将其存储在CSV文件中。我要存储它的方式基于以下逻辑。
- 获取FileType为1的唯一元数据-> ['xyz','fgh']
- 浏览每个唯一的元数据,并获取属于同一组且具有FileType Type 2的关联FileType-> 如果元数据值为xyz,则我的子集数据帧现在将包含第1行和第3行
- 获取属于同一组但不是类型1 /类型2的其余文件类型
因此,元数据xyz的最终子集数据帧将包含:
|-------|----------|----------|--------------|--------------|
| ID | FileType | Metadata | AssociatedID | Group ID |
|-------|----------|----------|--------------|--------------|
| 1 | Type 1 | xyz | 3 | A |
|-------|----------|----------|--------------|--------------|
| 3 | Type 2 | | | A |
|-------|----------|----------|--------------|--------------|
| 5 | Type 4 | | | A |
|-------|----------|----------|--------------|--------------|
执行此操作的代码如下:
uniqueMetadata= sdf_.select("Metadata").distinct().collect()
for meta in uniqueMetadata:
# Get the rows that have metadata 'xyz'
metadata_df = sdf_.filter(col("Metadata") == meta)
#Get All Group ID's of those groups that have a metadata xyz
groupIDs= metadata_df .select("GroupID").distinct().collect()
groupIDs= [v["GroupID"] for v in groupIDs] #Convert to List
for groupID in groupIDs: #Iterate Group IDS
#Get all records for particular Group ID
groupID_df = sdf_.filter(col("GroupID") == groupID)
#Get all Associated ID's of metadata xyz in that group
uniqueAssociatedIDs= groupID_df .select("AssociatedID").distinct().collect()
uniqueAssociatedIDs= [v["AssociatedID"] for v in uniqueAssociatedIDs] #Convert to List
#Get All Rows with matching Associated IDs
matchingAssociatedDF = groupID_df .filter(col("ID").isin(uniqueAssociatedIDs))
metadata_df = metadata_df .union(matchingAssociatedDF )
#Get All Other Rows in the same group that are not FileType1/Type2
other_df = groupID_df .filter((~col("ID").isin(uniqueAssociatedIDs)) & (~col("FileType").isin(["Type 1","Type 2"])))
metadata_df = metadata_df .union(other_df)
df_path = output_path + "/" + "/ByMetadata/" + metadata + "/" + shortuuid.uuid() + "/"
#Save in Location
metadata_df .coalesce(8).write.format("com.databricks.spark.csv").option("header","true").save(df_path)
此代码的问题是计算和存储需要太多时间。对于200个组和大约200个唯一的元数据,要花费48个小时以上,这不好,因为我必须分批处理大约100000个组。
在pyspark中是否有更好的解决方案可以对此进行优化?我可以尝试哪种优化? Pyspark是否不用于这种类型的查询和切片?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。