无法让自定义 MySQLOperator 在 Airflow 中工作:带有钩子的 extra_dejson 错误

如何解决无法让自定义 MySQLOperator 在 Airflow 中工作:带有钩子的 extra_dejson 错误

我正在尝试编写一个运算符,该运算符将下载一些 API 数据并使用数据框将其放入表中。我已经编写了以下操作员代码

from airflow.providers.MysqL.hooks.MysqL import MysqLHook
from datetime import datetime
from airflow.models.baSEOperator import BaSEOperator
from airflow.utils.decorators import apply_defaults
from typing import List
from pycoingecko import CoinGeckoAPI
import pandas as pd


class CryptoToMysqL(BaSEOperator):
    @apply_defaults
    def __init__(
            self,name: str,coins: List[str],MysqL_conn_id: str = None,tablename: str = None,*args,**kwargs) -> None:
        super().__init__(*args,**kwargs)
        self.name = name
        self.coins = coins
        self.MysqL_conn_id = MysqL_conn_id
        self.tablename = tablename

    # Todo: Test this to see if it can return data
    @staticmethod
    def _get_cryptos(coins):
        cg = CoinGeckoAPI()
        data = cg.get_price(ids=coins,vs_currencies='usd',include_market_cap=True,include_24hr_vol=True,include_24hr_change=True,include_last_updated_at=True)

        df = pd.DataFrame.from_dict(data,orient='index').reset_index()
        df.rename(columns={'index': 'crypto'},inplace=True)
        return df

    def execute(self,context):
        hook = MysqLHook(schema='source',connection=self.MysqL_conn_id)
        conn = hook.get_conn()
        data = self._get_cryptos(self.coins)

        data.to_sql(self.tablename,conn,if_exists='append',index=False)

        message = f" Saving data to {self.tablename}"
        print(message)
        return message

以及以下 DAG:

from datetime import timedelta,datetime
from airflow import DAG
from operators.operators import CryptoToMysqL
from airflow.operators.dummy import DummyOperator

coins = ['bitcoin','litecoin','ethereum','dogecoin']

default_args = {
    'owner': 'airflow','depends_on_past': False,'email': ['example@gmail.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016,1,1),# 'wait_for_downstream': False,# 'dag': dag,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),# 'on_failure_callback': some_function,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'
}

with DAG(
    'crypt dag',default_args=default_args,description='Pulls varIoUs crypto prices every interval',schedule_interval='@hourly',start_date=(datetime(2021,5,9)),tags=['crypto']
) as dag:
    t1 = DummyOperator(
        task_id='dummy-1'
    )

    t2 = CryptoToMysqL(
        task_id='load_data',name='crypto_task',coins=coins,MysqL_conn_id='pinwheel_source',tablename='stonks'
    )

    t1 >> t2

但是我在 hook = get_conn() 调用中收到以下错误

Traceback (most recent call last):
  File "<stdin>",line 1,in <module>
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/MysqL/hooks/MysqL.py",line 162,in get_conn
    client_name = conn.extra_dejson.get('client','MysqLclient')
AttributeError: 'str' object has no attribute 'extra_dejson'

MysqLHook 类的源代码的进一步研究揭示了出现问题的以下行:

        conn = self.connection or self.get_connection(
            getattr(self,self.conn_name_attr)
        )  # pylint: disable=no-member

        client_name = conn.extra_dejson.get('client','MysqLclient')

extra_dejson 方法尝试解压 json 连接字符串以获取数据。但是,文档中没有描述这一点,MysqLHook 代码只是期望 conn_id 按照 hereherehere

的描述传递

我已经确认我传递的连接存在于数据库中,并且 AIRFLOW_HOME 变量指向正确的目录。

连接:


id | conn_id    | conn_type | descriptio | host      | schema | login    | password   | port | is_encrypt | is_extra_e | extra_dejs | get_uri   
   |            |           | n          |           |        |          |            |      | ed         | ncrypted   | on         |           
===+============+===========+============+===========+========+==========+============+======+============+============+============+===========
48 | pinwheel_source | MysqL     | connection | localhost | source | pinwheel | |          | 3306 | True       | True       | {}         |           

解决方法

使用 MySQL 时,您可以选择要使用的 Python 包装器。您可以使用 mysql-connector-pythonmysqlclientdocs 中提到您需要在 Extra 字段中指定要连接的客户端。

因此只需将 {"client": "mysql-connector-python"}{"client": "mysqlclient"} 添加到额外字段。

请注意,测试是一个很好的信息来源。例如检查 this

,

对于遇到此问题的任何其他人,这就是我发现的。

MySQLHook 类 here 的文档引用您用于连接的参数是 connection。但是这不起作用。

hook = MySqlHook(schema='source',connection={'connection': 'mysql_pinwheel_source'})

>>> hook.get_conn()
Traceback (most recent call last):
  File "<stdin>",line 1,in <module>
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/mysql/hooks/mysql.py",line 162,in get_conn
    client_name = conn.extra_dejson.get('client','mysqlclient')
AttributeError: 'str' object has no attribute 'extra_dejson'

要传递的正确参数名称是mysql_conn_id

>>> hook = MySqlHook(schema='source',mysql_conn_id='mysql_pinwheel_source')
>>> hook.get_conn()
[2021-05-09 10:30:40,293] {base.py:65} INFO - Using connection to: id: mysql_pinwheel_source. Host: localhost,Port: 3306,Schema: source,Login: pinwheel,Password: XXXXXXXX,extra: XXXXXXXX
<_mysql.connection open to 'localhost' at 0x562af438f760>

我不确定是否还有其他参数可以使用 connection 变量,但这似乎是解决方案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?