如何解决将 python dicts 的 pyspark pipelineRDD 解压到 pyspark Dataframe
我正在使用平面图来解析数据框并且它工作正常,但我无法将最终结果重塑为多列数据集。我该如何解析这个 RDD?这是平面图后我的结果的示例行:
[Row(XXXX-XXXX-XXXX-XXXXX-XXXXXX={'m_ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX','ci_id': 'XXXX-XXXX-XXXX-XXXXX-XXXXXX','pp_breaker_power_phase': 'L1_L2','pp_breaker_poles': 2,'pp_breaker_panel_circuit_number': 2,'cp_ci_id': None,'cp_value': None,'phase': 'L1','pole': 2})]
我正在传递一个与您在 dict 中看到的列相同的数据框,这是我与 flatmap 一起使用的函数:
def get_poles_phases(row):
"""
:param row:
:return:
"""
new_rows = []
initial_pole = row.pp_breaker_panel_circuit_number
phases = row.pp_breaker_power_phase.split('_')
for _ in range(row.pp_breaker_poles):
temp = row.asDict()
temp['phase'] = phases[_]
temp['pole'] = initial_pole
if row.cp_value != 'Phase Grouping':
initial_pole += 2
else:
logger.error('Panel configuration not recognized.')
new_rows.append(row(temp))
return new_rows
我尝试使用 Structfields 模式,但没有用
cols = [StructField('m_ci_id',StringType(),True),StructField('ci_id',StructField('pp_breaker_power_phase',StructField('pp_breaker_poles',StructField('pp_breaker_panel_circuit_number',StructField('cp_ci_id',StructField('cp_value',StructField('phase',StructField('pole',True)]
schema = StructType(cols)
poles_phases = poles_phases.toDF(schema)
我也尝试过传递列名列表。
poles_phases = poles_phases.toDF(['m_ci_id','ci_id','pp_breaker_power_phase','pp_breaker_poles','pp_breaker_panel_circuit_number','cp_ci_id','cp_value','phase','pole'])
我怀疑这不起作用,因为我得到的 RDD 只有一列,但我不知道如何解析该单个 dict 以便架构匹配。
解决方法
我想通了:
from pyspark.sql import Row
poles_phases = poles_phases.map(lambda row: Row(**list(row.asDict().values())[0]))
这是通过解压缩值字典来构建一个新行。之后,您可以使用
poles_phases = poles_phases.toDF(['m_ci_id','ci_id','pp_breaker_power_phase','pp_breaker_poles','pp_breaker_panel_circuit_number','cp_ci_id','cp_value','phase','pole'])
如果您有 None
值,模式推断可能会失败,因此您需要明确声明它,例如,
cols = [StructField('m_ci_id',StringType(),True),StructField('ci_id',StructField('pp_breaker_power_phase',StructField('pp_breaker_poles',StructField('pp_breaker_panel_circuit_number',StructField('cp_ci_id',StructField('cp_value',StructField('phase',StructField('pole',True)]
schema = StructType(cols)
poles_phases = poles_phases.toDF(schema)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。