如何解决Airflow:在 TaskGroup 中创建动态任务的问题
我正在尝试制作一个动态的工作流程。
我收到了这个损坏的 DAG 错误重复任务 ID
broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baSEOperator.py",line 430,in __init__
task_group.add(self)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py",line 140,in add
raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'Review.extract__1' has already been added to the DAG
我的代码:
@task
def extract(filename):
some_extract_function
@task
def transform(item :list):
some_transform_function
with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)",x),filenames))
for filename in filtered_filenames:
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)
当我尝试在 TaskGroup
内动态创建任务时出现问题。如果我删除了 TaskGroup
,它工作正常。
我在这里https://github.com/apache/airflow/issues/8057 发现了这个问题。有什么办法可以解决这个错误吗?例如动态创建自定义task_id?我知道这可以使用 Pythonoperator
。但我正在尝试使用 TaskFlow API
来做到这一点。
谢谢
解决方法
已修复感谢此视频here
所以我通过在 TaskGroup 中动态创建 TaskGroup 来解决这个问题。
这是代码
with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)",x),filenames))
for filename in filtered_filenames:
with TaskGroup(filename):
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)
,
这是我的情况:
delay_python_task1 = PythonOperator(task_id="delay_python_task1",dag=dag3,python_callable=lambda: time.sleep(0.5))
delay_python_task2 = PythonOperator(task_id="delay_python_task2",python_callable=lambda: time.sleep(0.5))
with TaskGroup(group_id="tasks_1") as tg1:
df = psql.read_sql("""SELECT ticker FROM ...);""",connection)
result1 = df.to_json(orient="records")
tickers = json.loads(result1)
for d in tickers:
ticker = d['ticker'] # getting ticker value
with TaskGroup(ticker):
t=get_profile(ticker)
delay_python_task1
delay_python_task2
但是当我运行 dag CPU 为 100% 并且气流网络将无法打开时,你能帮我解决它吗,我认为 TaskGroup 和 delay_task 是错误的
在我的 python 文件中,我使用代码运行 smool:
df = psql.read_sql("""SELECT ticker FROM ...);""",connection)
result1 = df.to_json(orient="records")
tickers = json.loads(result1)
for d in tickers:
ticker = d['ticker'] # getting ticker value for loop
readAndSave(ticker)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。