如何解决如何创建以当前日期为输入的梁模板每天更新[根据GET请求创建]
我正在尝试创建一个每天使用Cloud Scheduler运行的Dataflow作业。我需要使用GET请求从外部API获取数据,因此需要输入当前日期。但是,当我将数据流作业导出为计划模板时,输入的日期保留在执行时,而不是每天更新。我一直在寻找解决方案,遇到了ValueProvider,但是我使用apache_beam.transforms.Create
声明的管道始终返回错误'RuntimeValueProvider(option:test,type:str,default_value:'killme')未指定ValueProvider时,不会从运行时上下文中调用.get()。
反正我可以克服吗?这似乎是一个简单的问题,但是无论如何我都无法使它起作用。如果有任何想法,我将不胜感激!
解决方法
您可以使用ValueProvider接口将运行时参数传递到管道,在DoFn中访问它,您需要将其作为参数传递。与下面的示例类似:
class LogValueProvidersFn(beam.DoFn):
def __init__(self,string_vp):
self.string_vp = string_vp
# Define the DoFn that logs the ValueProvider value.
# The DoFn is called when creating the pipeline branch.
# This example logs the ValueProvider value,but
# you could store it by pushing it to an external database.
def process(self,an_int):
logging.info('The string_value is %s' % self.string_vp.get())
# Another option (where you don't need to pass the value at all) is:
logging.info(
'The string value is %s' %
RuntimeValueProvider.get_value('string_value',str,''))
| beam.Create([None])
| 'LogValueProvs' >> beam.ParDo(
LogValueProvidersFn(my_options.string_value)))
您可能还想看看Flex模板:
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。