微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

同时调度大量DAG时出现重复输入错误

如何解决同时调度大量DAG时出现重复输入错误

我正在运行此测试:

from unittest import TestCase

from backend.tasks.airflow import trigger_dag


class TestTriggerDag(TestCase):

    def test_trigger_dag(self):
        trigger_dag("update_game_dag",game_id=99)
        trigger_dag("update_game_dag",game_id=100)
        trigger_dag("update_game_dag",game_id=101)
        trigger_dag("update_game_dag",game_id=102)
        trigger_dag("update_game_dag",game_id=103)
        self.assertTrue(True)

trigger_dag的内部是:

from typing import List
import random
import time

from airflow.api.client.local_client import Client
from airflow.models.dagrun import DagRun

afc = Client(None,None)

...
def get_dag_run_state(dag_id: str,run_id: str):
    return DagRun.find(dag_id=dag_id,run_id=run_id)[0].state


def trigger_dag(dag_id: str,wait_for_complete: bool = False,**kwargs):
    run_hash = '%030x' % random.randrange(16**30)
    kwarg_list = [f"{str(k)}:{str(v)}" for k,v in kwargs.items()]
    run_id = f"{run_hash}-{'_'.join(kwarg_list)}"
    afc.trigger_dag(dag_id,run_id=run_id,conf=kwargs)
    while wait_for_complete and get_dag_run_state(dag_id,run_id) == "running":
        time.sleep(1)
        continue
    return get_dag_run_state(dag_id,run_id)

这将引发以下错误

sqlalchemy.exc.IntegrityError: (pyMysqL.err.IntegrityError) (1062,"Duplicate entry 'update_game_dag-2020-08-30 00:30:13.000000' for key 'dag_run.dag_id'")
[sql: INSERT INTO dag_run (dag_id,execution_date,start_date,end_date,state,run_id,external_trigger,conf) VALUES (%(dag_id)s,%(execution_date)s,%(start_date)s,%(end_date)s,%(state)s,%(run_id)s,%(external_trigger)s,%(conf)s)]
[parameters: {'dag_id': 'update_game_dag','execution_date': datetime.datetime(2020,8,30,13),'start_date': datetime.datetime(2020,13,262676),'end_date': None,'state': 'running','run_id': '3129c0272d7e3e5f018d04d2debf06-game_id:101','external_trigger': 1,'conf': b'\x80\x04\x95\x10\x00\x00\x00\x00\x00\x00\x00}\x94\x8c\x07game_id\x94Kes.'}]

问题似乎在于,将DAG运行记录到dag_run元数据表时,execution_date列时间戳记是在第二秒(datetime.datetime(2020,)而非微秒处保存的, 解析度。当触发一堆DAG时,这会产生重复的条目冲突。有趣的是,start_date不能这样工作:它保留了微秒信息(datetime.datetime(2020,262676))。

这是功能还是错误?对于给定的DAG ID,气流是否不允许在同一第二时间间隔内故意启动多个外部触发的DAG运行?是否可以快速解决此问题,还是应该发送PR或ASF Jira出票?

如果需要的话,我想这样做是因为我有很多资产需要在游戏级别上以5分钟的节奏为应用程序上的所有用户更新。我们将芹菜节拍用作应用程序调度程序,而不是气流。气流真正发光的地方是在弹性伸缩的工人群集上协调任务图的执行。所以我想每5分钟说一次“嘿,气流,请为这200场比赛触发DAGS。 DAG通过conf数据(与外部API调用一起传递给DAG上下文)来识别游戏ID。

解决方法

继续保持几分钟,找到解决方案。 local_client中的from airflow.api.client.local_client import Client只是将airflow.api.client中的基类与airflow.api.common.experimental中的几种方法捆绑在一起:

from airflow.api.client import api_client
from airflow.api.common.experimental import pool
from airflow.api.common.experimental import trigger_dag
from airflow.api.common.experimental import delete_dag


class Client(api_client.Client):
    """Local API client implementation."""

    def trigger_dag(self,dag_id,run_id=None,conf=None,execution_date=None):
        dag_run = trigger_dag.trigger_dag(dag_id=dag_id,run_id=run_id,conf=conf,execution_date=execution_date)
        return "Created {}".format(dag_run)

    def delete_dag(self,dag_id):
        count = delete_dag.delete_dag(dag_id)
        return "Removed {} record(s)".format(count)

    def get_pool(self,name):
        the_pool = pool.get_pool(name=name)
        return the_pool.pool,the_pool.slots,the_pool.description

    def get_pools(self):
        return [(p.pool,p.slots,p.description) for p in pool.get_pools()]

    def create_pool(self,name,slots,description):
        the_pool = pool.create_pool(name=name,slots=slots,description=description)
        return the_pool.pool,the_pool.description

    def delete_pool(self,name):
        the_pool = pool.delete_pool(name=name)
        return the_pool.pool,the_pool.description

有点奇怪的方法,因为这里没有一个类方法实际调用api_client.Client基类。 trigger_dag中的airflow.api.common.experimental有一个自变量replace_microseconds。这是清理信息的地方。

直接与airflow.api.common.experimental.trigger_dag调用replace_microseconds=True解决了我的问题:

from typing import List
import random
import time

from airflow.api.common.experimental import trigger_dag
from airflow.models.dagrun import DagRun


def log_headline(keys: tuple,values: List):
    headline_ls = [f"{key} = {value}" for key,value in zip(keys,values)]
    print("\n \n*** ARGUMENTS ***\n-----------------\n" + ",".join(headline_ls) + "\n-----------------\n")


def context_parser(context: dict,*args: str):
    """*args looks for an inventory of names from the context that we expect a given task to have access to. Use of
    the .get access method means that misses names will default to None rather than generate a key error"""
    return_values = [context['dag_run'].conf.get(arg) for arg in args]
    log_headline(args,return_values)
    return return_values


def get_dag_run_state(dag_id: str,run_id: str):
    return DagRun.find(dag_id=dag_id,run_id=run_id)[0].state


def start_dag(dag_id: str,wait_for_complete: bool = False,**kwargs):
    run_hash = '%030x' % random.randrange(16**30)
    kwarg_list = [f"{str(k)}:{str(v)}" for k,v in kwargs.items()]
    run_id = f"{run_hash}-{'_'.join(kwarg_list)}"
    trigger_dag.trigger_dag(dag_id,conf=kwargs,replace_microseconds=False)
    while wait_for_complete and get_dag_run_state(dag_id,run_id) == "running":
        time.sleep(1)
        continue
    return get_dag_run_state(dag_id,run_id)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。