如何解决GCP 数据流,argparse.ArgumentError 使用 DataflowRunner 而不是 DirectRunner
带有运行时参数的数据流管道使用 DirectRunner 运行良好,但在切换到 DataflowRunner 时遇到参数错误。
File "/home/user/miniconda3/lib/python3.8/site-packages/apache_beam/options/pipeline_options.py",line 124,in add_value_provider_argument
self.add_argument(*args,**kwargs)
File "/home/user/miniconda3/lib/python3.8/argparse.py",line 1386,in add_argument
return self._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py",line 1749,in _add_action
self._optionals._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py",line 1590,in _add_action
action = super(_ArgumentGroup,self)._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py",line 1400,in _add_action
self._check_conflict(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py",line 1539,in _check_conflict
conflict_handler(action,confl_optionals)
File "/home/user/miniconda3/lib/python3.8/argparse.py",line 1548,in _handle_conflict_error
raise ArgumentError(action,message % conflict_string)
argparse.ArgumentError: argument --bucket_input: conflicting option string: --bucket_input
这里是参数的定义和调用方式
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
parser.add_value_provider_argument(
'--bucket_input',default="device-file-dev",help='Raw device file bucket')
pipeline = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
_ = (
pipeline
| 'Initiate dataflow' >> beam.Create(["Start"])
| 'Create P collection with file paths' >> beam.ParDo(
CreateGcsPCol(input_bucket=custom_options.bucket_input)
)
请注意,这只发生在 DataflowRunner 中。有谁知道如何解决它?非常感谢。
解决方法
从这里的评论中复制答案:
该错误是通过相对路径导入本地Python子模块引起的。使用 DirectRunner,相对路径有效,因为它在本地机器上。但是,DataflowRunner 位于不同的机器(GCE 实例)上,需要绝对路径。因此,通过安装 Dataflow 管道模块和子模块,并从安装的子模块导入,而不是使用相对路径,问题得以解决。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。