如何解决无法让自定义 MySQLOperator 在 Airflow 中工作:带有钩子的 extra_dejson 错误
我正在尝试编写一个运算符,该运算符将下载一些 API 数据并使用数据框将其放入表中。我已经编写了以下操作员代码:
from airflow.providers.MysqL.hooks.MysqL import MysqLHook
from datetime import datetime
from airflow.models.baSEOperator import BaSEOperator
from airflow.utils.decorators import apply_defaults
from typing import List
from pycoingecko import CoinGeckoAPI
import pandas as pd
class CryptoToMysqL(BaSEOperator):
@apply_defaults
def __init__(
self,name: str,coins: List[str],MysqL_conn_id: str = None,tablename: str = None,*args,**kwargs) -> None:
super().__init__(*args,**kwargs)
self.name = name
self.coins = coins
self.MysqL_conn_id = MysqL_conn_id
self.tablename = tablename
# Todo: Test this to see if it can return data
@staticmethod
def _get_cryptos(coins):
cg = CoinGeckoAPI()
data = cg.get_price(ids=coins,vs_currencies='usd',include_market_cap=True,include_24hr_vol=True,include_24hr_change=True,include_last_updated_at=True)
df = pd.DataFrame.from_dict(data,orient='index').reset_index()
df.rename(columns={'index': 'crypto'},inplace=True)
return df
def execute(self,context):
hook = MysqLHook(schema='source',connection=self.MysqL_conn_id)
conn = hook.get_conn()
data = self._get_cryptos(self.coins)
data.to_sql(self.tablename,conn,if_exists='append',index=False)
message = f" Saving data to {self.tablename}"
print(message)
return message
以及以下 DAG:
from datetime import timedelta,datetime
from airflow import DAG
from operators.operators import CryptoToMysqL
from airflow.operators.dummy import DummyOperator
coins = ['bitcoin','litecoin','ethereum','dogecoin']
default_args = {
'owner': 'airflow','depends_on_past': False,'email': ['example@gmail.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016,1,1),# 'wait_for_downstream': False,# 'dag': dag,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),# 'on_failure_callback': some_function,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'
}
with DAG(
'crypt dag',default_args=default_args,description='Pulls varIoUs crypto prices every interval',schedule_interval='@hourly',start_date=(datetime(2021,5,9)),tags=['crypto']
) as dag:
t1 = DummyOperator(
task_id='dummy-1'
)
t2 = CryptoToMysqL(
task_id='load_data',name='crypto_task',coins=coins,MysqL_conn_id='pinwheel_source',tablename='stonks'
)
t1 >> t2
但是我在 hook = get_conn()
调用中收到以下错误:
Traceback (most recent call last):
File "<stdin>",line 1,in <module>
File "/usr/local/lib/python3.9/site-packages/airflow/providers/MysqL/hooks/MysqL.py",line 162,in get_conn
client_name = conn.extra_dejson.get('client','MysqLclient')
AttributeError: 'str' object has no attribute 'extra_dejson'
对 MysqLHook 类的源代码的进一步研究揭示了出现问题的以下行:
conn = self.connection or self.get_connection(
getattr(self,self.conn_name_attr)
) # pylint: disable=no-member
client_name = conn.extra_dejson.get('client','MysqLclient')
extra_dejson 方法尝试解压 json 连接字符串以获取数据。但是,文档中没有描述这一点,MysqLHook 代码只是期望 conn_id 按照 here 和 here 和 here
的描述传递我已经确认我传递的连接存在于数据库中,并且 AIRFLOW_HOME
变量指向正确的目录。
连接:
id | conn_id | conn_type | descriptio | host | schema | login | password | port | is_encrypt | is_extra_e | extra_dejs | get_uri
| | | n | | | | | | ed | ncrypted | on |
===+============+===========+============+===========+========+==========+============+======+============+============+============+===========
48 | pinwheel_source | MysqL | connection | localhost | source | pinwheel | | | 3306 | True | True | {} |
解决方法
使用 MySQL 时,您可以选择要使用的 Python 包装器。您可以使用 mysql-connector-python
或 mysqlclient
。 docs 中提到您需要在 Extra
字段中指定要连接的客户端。
因此只需将 {"client": "mysql-connector-python"}
或 {"client": "mysqlclient"}
添加到额外字段。
请注意,测试是一个很好的信息来源。例如检查 this
,对于遇到此问题的任何其他人,这就是我发现的。
MySQLHook 类 here 的文档引用您用于连接的参数是 connection
。但是这不起作用。
hook = MySqlHook(schema='source',connection={'connection': 'mysql_pinwheel_source'})
>>> hook.get_conn()
Traceback (most recent call last):
File "<stdin>",line 1,in <module>
File "/usr/local/lib/python3.9/site-packages/airflow/providers/mysql/hooks/mysql.py",line 162,in get_conn
client_name = conn.extra_dejson.get('client','mysqlclient')
AttributeError: 'str' object has no attribute 'extra_dejson'
要传递的正确参数名称是mysql_conn_id
:
>>> hook = MySqlHook(schema='source',mysql_conn_id='mysql_pinwheel_source')
>>> hook.get_conn()
[2021-05-09 10:30:40,293] {base.py:65} INFO - Using connection to: id: mysql_pinwheel_source. Host: localhost,Port: 3306,Schema: source,Login: pinwheel,Password: XXXXXXXX,extra: XXXXXXXX
<_mysql.connection open to 'localhost' at 0x562af438f760>
我不确定是否还有其他参数可以使用 connection
变量,但这似乎是解决方案。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。