如何解决如何在 Airflow 的每个循环中获得统一的当前日期时间?
以下是使用的 DAG 格式, 我在 DAG 中的输入文件上使用 for 循环,并使用
重命名输出文件CurrentDateTime = datetime.datetime.today().strftime("%Y%m%d%H%M%s")
#if currentDateTIme = 20210406010203
outputfile = ''
outputfile1 = ''
outputfile2 = ''
inputfiles = ['input1','input2']
API_CALL_TASK1 = {
source : inputfile1
filename : outputfile1 #20210406010513
}
API_CALL_TASK2 = {
source : inputfile2
filename : outputfile2
}
for file in inputfiles:
if file == 'input1'
outputfile1 = f'inputFileName_{CurrentDateTime}' #20210406010303
outputfile = f'inputFileName_{CurrentDateTime}' #20210406010303
if file == 'input2'
outputfile2 = f'inputFileName_{CurrentDateTime}'
outputfile = f'inputFileName_{CurrentDateTime}'
MOVE_OUTPUT_TO_BUCKET_TASK = (
filename = f{outputfile} #20210406010423
)
MOVE_OUTPUT_TO_BUCKET_TASK >> API_CALL_TASK1 >> API_CALL_TASK1
在任务中 - API_CALL_TASK1
、API_CALL_TASK2
和 MOVE_OUTPUT_TO_BUCKET_TASK
文件名中的日期时间不同,因为每个任务触发的时间不同。
我想将相同的文件名从循环传递给 MOVE_OUTPUT_TO_BUCKET_TASK
和 API_CALL_TASK1
或 API_CALL_TASK2
解决方法
据我所知,您希望为循环中的两个输出以 YYYYMMDD
格式传递相同的日期时间。
Airflow 为我们提供了一组默认变量,可用于所有模板。因此,您可以使用它在整个模板中传递一个常量值。在您的情况下,我看到您希望执行日期作为输出的后缀。因此,您应该使用 ds_nodash
,根据 documentation,它检索 the execution date as YYYYMMDD
。
如果您使用带有 **kwargs 的 Python 运算符,您可以使用 kwargs['ds_nodash']
访问它。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。