微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在将PCollection作为Apache Beam上的侧面输入传递时发生KeyError

如何解决在将PCollection作为Apache Beam上的侧面输入传递时发生KeyError

我将override def toString = { if (this.noGoals) s"${this.home} vs. ${this.away} at ${this.location}: no goals" else if (this.Tied) s"${this.home} vs. ${this.away} at ${this.location}: tied" } PCollection作为side_input转换的侧面输入传递,但得到了相同的KeyError

ParDo

下面是错误

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
from processors.appendcol import AppendCol
from side_inputs.config import sideinput_bq_config
from source.config import source_config


with beam.Pipeline(options=PipelineOptions()) as si:
  side_input = si | "Reading from BQ side input" >> relational_db.ReadFromDB(
    source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc"
  )

with beam.Pipeline(options=PipelineOptions()) as p:
  PCollection = p | "Reading records from database" >> relational_db.ReadFromDB(
    source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
   AppendCol(),beam.pvalue.AsIter(side_input)
 )

我正在从Postgresql表中读取数据,PCollection的每个元素都是一个字典。

解决方法

我认为问题在于您有两个单独的管道试图一起工作。您应该将所有转换作为单个管道的一部分执行:

with beam.Pipeline(options=PipelineOptions()) as p:
  side_input = p | "Reading from BQ side input" >> relational_db.ReadFromDB(
    source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc")

  my_pcoll = p | "Reading records from database" >> relational_db.ReadFromDB(
        source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
        AppendCol(),beam.pvalue.AsIter(side_input))

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。