微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

MWAAaws 管理气流在使用 CLI 时未能暂停 DAG

如何解决MWAAaws 管理气流在使用 CLI 时未能暂停 DAG

我正在尝试按照 here 所述暂停 DAG,但即使 DAG 存在,也会收到“Dag id testDag not found”。

错误消息说我的第 3 方模块“未找到”,即使它在我触发 DAG 时工作。

我在文档上看到了这个:

注意 如果 DAG 使用依赖于通过 requirements.txt 安装的包的插件,则任何解析 DAG 的命令(例如 list_dags、backfill)都将失败。

Dag 代码(testDag):

from datetime import timedelta
# my_client is the 3rd party library
from client import my_client
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import Pythonoperator

default_args = {
    'owner': 'airflow','depends_on_past': False,'start_date': days_ago(2),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,}
dag = DAG(
    'testDag',default_args=default_args,description='A simple tutorial DAG',schedule_interval=None,catchup=False
)

def print_me_func(**context):
    print("hiiiii")

printMe = Pythonoperator(
    task_id='printMe',dag = dag,provide_context=True,python_callable=print_me_func,)

气流 API:

import boto3
import json
import requests
import base64

mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
dag_name = 'testDag'

client = boto3.client('mwaa')

mwaa_cli_token = client.create_cli_token(
    Name=mwaa_env_name
)

mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = "pause {0}".format(dag_name)

mwaa_response = requests.post(
    mwaa_webserver_hostname,headers={
        'Authorization': mwaa_auth_token,'Content-Type': 'text/plain'
    },data=raw_data
)

mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')

print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)
200
Traceback (most recent call last):
  File "/usr/local/bin/airflow",line 37,in <module>
    args.func(args)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py",line 76,in wrapper
    return f(*args,**kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py",line 421,in pause
    set_is_paused(True,args)
  File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py",line 431,in set_is_paused
    is_paused=is_paused,File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py",line 74,in wrapper
    return func(*args,**kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py",line 1887,in set_is_paused
    raise DagNotFound("Dag id {} not found".format(self.dag_id))
airflow.exceptions.DagNotFound: Dag id testDag not found

[2021-03-23 20:37:45,790] {{dagbag.py:259}} ERROR - Failed to import: /usr/local/airflow/dags/testDag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py",line 256,in process_file
    m = imp.load_source(mod_name,filepath)
  File "/usr/lib64/python3.7/imp.py",line 171,in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>",line 696,in _load
  File "<frozen importlib._bootstrap>",line 677,in _load_unlocked
  File "<frozen importlib._bootstrap_external>",line 728,in exec_module
  File "<frozen importlib._bootstrap>",line 219,in _call_with_frames_removed
  File "/usr/local/airflow/dags/testDag.py",line 2,in <module>
    from client import my_client
ModuleNotFoundError: No module named 'client'
[2021-03-23 20:37:45,802] {{dagbag.py:259}} ERROR - Failed to import: /usr/local/airflow/dags/testDag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py",in <module>
    from client import my_client
ModuleNotFoundError: No module named 'client'

我可以使用任何解决方法吗? 谢谢

解决方法

似乎 MWAA 1.10.12 存在序列化问题,从 MWAA 1.10.12 迁移到 MWAA 2.0.2 时,相同的代码也能工作。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。