动态创建的任务实例被标记为已删除当我使用 for 循环动态生成任务时

如何解决动态创建的任务实例被标记为已删除当我使用 for 循环动态生成任务时

请在下面找到我们似乎构建了 DAG 但任务处于已删除状态的代码

我们在下面的代码中尝试获取 blob 列表,然后基于该列表动态创建数据流模板调用者任务操作符,该操作符将通过动态接收输入文件参数来运行。因此,作为数据流模板调用方的动态任务的每次调用将迎合一个输入文件

DAG 代码

    list_of_blobs=[]
    def read_text_file(config_file_path):
        if os.path.exists(config_file_path):
            try:
                with open(config_file_path,'r',encoding='utf-8') as f:
                    configuration = f.read()#json.load(f) 
                    con = configuration.split(',')
                    global list_of_blobs
                    for c in con:
                        list_of_blobs.append(c)
                    
            except IOError as e:
                print(e)
    def read_config(**kwargs):
        today = date.today()
        bucket = "mystoragebucket"
        blob_name = "config/configuration_r2c.json"
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(str(bucket))
        blob = bucket.blob(str(blob_name))
        downloaded_blob = blob.download_as_string()
        data = json.loads(downloaded_blob.decode("utf-8"))
        kwargs['ti'].xcom_push(key='input_file',value=data['input_file'])
        kwargs['ti'].xcom_push(key='delimiter',value=data['delimiter'])
        kwargs['ti'].xcom_push(key='cols',value=data['cols'])
        kwargs['ti'].xcom_push(key='p_output_file',value=data['p_output_file'])
        kwargs['ti'].xcom_push(key='b_output_file',value=data['b_output_file'])
        kwargs['ti'].xcom_push(key='Cleansed_Bucket_FolderPath',value=data['Cleansed_Bucket_FolderPath'])
        kwargs['ti'].xcom_push(key='GCSSourceFolder',value=data['GCSSourceFolder'])
        kwargs['ti'].xcom_push(key='File_name',value=data['File_name'])
        kwargs['ti'].xcom_push(key='Cleansed_Bucket_Name',value=data['Cleansed_Bucket_Name'])
        kwargs['ti'].xcom_push(key='Source_Raw_Bucket_Name',value=data['Source_Raw_Bucket_Name'])
        kwargs['ti'].xcom_push(key='BigQueryTargetTableName',value=data['BigQueryTargetTableName'])
        source_bucket = storage_client.get_bucket(data['Source_Raw_Bucket_Name'])
        folder = 'processing' + '/' + data['GCSSourceFolder'] + '/' + data['File_name'] + '/' + str(today.year) + '/' + str(today.month) + '/' + str(today.day)
        blobs = source_bucket.list_blobs(prefix=folder)
        
        print(blobs)
        #blob_list= ['aaa','hhhhhh']
        blob_list = ''
        i=0
        for blob in blobs:
            if(blob.name.endswith('.csv') or blob.name.endswith('.dat') or blob.name.endswith('.gz')):
                if(i==0):
                    blob_list=blob_list + blob.name
                else: 
                    print(blob.name)
                    blob_list=blob_list+ ','+ blob.name
        try:
            with open("/home/airflow/gcs/data/blob_list.txt","wt") as fout:
                fout.write(blob_list)
        except Exception as e:
            print(e)
        
        return data

    with models.DAG(
        # The id you will see in the DAG airflow page
        "raw_to_cleansed_example_4feb",default_args=default_args,# The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),# Override to match your needs
        ) as dag:
        start = dummy_operator.DummyOperator(
            task_id='start',trigger_rule='all_success'
        )
        
        end = dummy_operator.DummyOperator(
            task_id='end',trigger_rule='all_success'
        )
        def dataflow_call(filename,i):
            today = date.today()
            
            return DataflowTemplateOperator(
                    # The task id of your job
                    task_id='dataflow_dynamic_{}'.format(str(i)),template="gs://mystoragebucket/template/dataflowTemplate",parameters={
                        "input_file":'gs://' + "{{ task_instance.xcom_pull(task_ids='get_file_name',key='Source_Raw_Bucket_Name') }}" + '/' + filename,"delimiter":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='delimiter') }}","no_of_cols":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='cols') }}","p_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='p_output_file') }}","b_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='b_output_file') }}",},)
        py_op = Pythonoperator(
            task_id='get_file_name',provide_context=True,python_callable=read_config)
        
        
        start >> py_op 
        read_text_file('/home/airflow/gcs/data/blob_list.txt')
        
        i = 0
        for b in list_of_blobs:
            py_op >> dataflow_call(b,i) >> end
            i = i+1

解决方法

此支架已解决,因为缺少作业 ID 参数

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?