如何解决如何根据气流时间戳添加数据库分区
在这里,我试图从开源获取数据,并将其作为基于气流时间戳的分区添加到表中。但它引发了气流异常。
def partition_sql(entity_type):
sql = """
ALTER TABLE db.table
ADD IF NOT EXISTS PARTITION (airflow_ts='{{ts}}')
LOCATION 's3://db/table/update/airflow_ts={{ts}}';
"""
return sql
with DAG(parameters)as dag:
update = DockerOperator(
task_id='update',cmd = 'python script.py 's3://db/table1/update/airflow_ts={{ts}}'
)
partition = AWSAthenaOperator(
task_id='partition',query=partition_sql("artist"),)
update >>partition
解决方法
因为 cmd
和 query
字段都是模板化的,所以应该可以:
items = ["artist"] #add more tables to be created dynamically
with DAG(
dag_id="dag_name",default_args=default_args,) as dag:
for item in items:
command = f'python script.py 's3://db/{item}/update/airflow_ts={{ ds }} '
update = DockerOperator(
task_id=f'update_table_{item}',cmd=command
)
sql = f'ALTER TABLE db.{item} ADD IF NOT EXISTS PARTITION (airflow_ts={{ ds }}) LOCATION s3://db/table/update/airflow_ts={{ ds }};'
partition = AWSAthenaOperator(
task_id=f'partition_table_{item}',query=sql
)
您可以将 {{ ds }}
更改为您喜欢的任何其他日期格式。您可以在 macros 页面查看可用的格式或自定义一种格式。
请注意,您不必在此处保存代码中的 SQL。您可以按照说明将其保存在 .sql
文件中here
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。