如何解决在将PCollection作为Apache Beam上的侧面输入传递时发生KeyError
我将override def toString = {
if (this.noGoals)
s"${this.home} vs. ${this.away} at ${this.location}: no goals"
else if (this.Tied)
s"${this.home} vs. ${this.away} at ${this.location}: tied"
}
PCollection作为side_input
转换的侧面输入传递,但得到了相同的KeyError
ParDo
下面是错误
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
from processors.appendcol import AppendCol
from side_inputs.config import sideinput_bq_config
from source.config import source_config
with beam.Pipeline(options=PipelineOptions()) as si:
side_input = si | "Reading from BQ side input" >> relational_db.ReadFromDB(
source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc"
)
with beam.Pipeline(options=PipelineOptions()) as p:
PCollection = p | "Reading records from database" >> relational_db.ReadFromDB(
source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
AppendCol(),beam.pvalue.AsIter(side_input)
)
我正在从Postgresql表中读取数据,PCollection的每个元素都是一个字典。
解决方法
我认为问题在于您有两个单独的管道试图一起工作。您应该将所有转换作为单个管道的一部分执行:
with beam.Pipeline(options=PipelineOptions()) as p:
side_input = p | "Reading from BQ side input" >> relational_db.ReadFromDB(
source_config=sideinput_bq_config,table_name='abc',query="SELECT * FROM abc")
my_pcoll = p | "Reading records from database" >> relational_db.ReadFromDB(
source_config=source_config,table_name='xyzzy',query="SELECT * FROM xyzzy",) | beam.ParDo(
AppendCol(),beam.pvalue.AsIter(side_input))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。