如何解决动态创建的任务实例被标记为已删除当我使用 for 循环动态生成任务时
请在下面找到我们似乎构建了 DAG 但任务处于已删除状态的代码。
我们在下面的代码中尝试获取 blob 列表,然后基于该列表动态创建数据流模板调用者任务操作符,该操作符将通过动态接收输入文件参数来运行。因此,作为数据流模板调用方的动态任务的每次调用将迎合一个输入文件。
DAG 代码:
list_of_blobs=[]
def read_text_file(config_file_path):
if os.path.exists(config_file_path):
try:
with open(config_file_path,'r',encoding='utf-8') as f:
configuration = f.read()#json.load(f)
con = configuration.split(',')
global list_of_blobs
for c in con:
list_of_blobs.append(c)
except IOError as e:
print(e)
def read_config(**kwargs):
today = date.today()
bucket = "mystoragebucket"
blob_name = "config/configuration_r2c.json"
storage_client = storage.Client()
bucket = storage_client.get_bucket(str(bucket))
blob = bucket.blob(str(blob_name))
downloaded_blob = blob.download_as_string()
data = json.loads(downloaded_blob.decode("utf-8"))
kwargs['ti'].xcom_push(key='input_file',value=data['input_file'])
kwargs['ti'].xcom_push(key='delimiter',value=data['delimiter'])
kwargs['ti'].xcom_push(key='cols',value=data['cols'])
kwargs['ti'].xcom_push(key='p_output_file',value=data['p_output_file'])
kwargs['ti'].xcom_push(key='b_output_file',value=data['b_output_file'])
kwargs['ti'].xcom_push(key='Cleansed_Bucket_FolderPath',value=data['Cleansed_Bucket_FolderPath'])
kwargs['ti'].xcom_push(key='GCSSourceFolder',value=data['GCSSourceFolder'])
kwargs['ti'].xcom_push(key='File_name',value=data['File_name'])
kwargs['ti'].xcom_push(key='Cleansed_Bucket_Name',value=data['Cleansed_Bucket_Name'])
kwargs['ti'].xcom_push(key='Source_Raw_Bucket_Name',value=data['Source_Raw_Bucket_Name'])
kwargs['ti'].xcom_push(key='BigQueryTargetTableName',value=data['BigQueryTargetTableName'])
source_bucket = storage_client.get_bucket(data['Source_Raw_Bucket_Name'])
folder = 'processing' + '/' + data['GCSSourceFolder'] + '/' + data['File_name'] + '/' + str(today.year) + '/' + str(today.month) + '/' + str(today.day)
blobs = source_bucket.list_blobs(prefix=folder)
print(blobs)
#blob_list= ['aaa','hhhhhh']
blob_list = ''
i=0
for blob in blobs:
if(blob.name.endswith('.csv') or blob.name.endswith('.dat') or blob.name.endswith('.gz')):
if(i==0):
blob_list=blob_list + blob.name
else:
print(blob.name)
blob_list=blob_list+ ','+ blob.name
try:
with open("/home/airflow/gcs/data/blob_list.txt","wt") as fout:
fout.write(blob_list)
except Exception as e:
print(e)
return data
with models.DAG(
# The id you will see in the DAG airflow page
"raw_to_cleansed_example_4feb",default_args=default_args,# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1),# Override to match your needs
) as dag:
start = dummy_operator.DummyOperator(
task_id='start',trigger_rule='all_success'
)
end = dummy_operator.DummyOperator(
task_id='end',trigger_rule='all_success'
)
def dataflow_call(filename,i):
today = date.today()
return DataflowTemplateOperator(
# The task id of your job
task_id='dataflow_dynamic_{}'.format(str(i)),template="gs://mystoragebucket/template/dataflowTemplate",parameters={
"input_file":'gs://' + "{{ task_instance.xcom_pull(task_ids='get_file_name',key='Source_Raw_Bucket_Name') }}" + '/' + filename,"delimiter":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='delimiter') }}","no_of_cols":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='cols') }}","p_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='p_output_file') }}","b_output_file":"{{ task_instance.xcom_pull(task_ids='get_file_name',key='b_output_file') }}",},)
py_op = Pythonoperator(
task_id='get_file_name',provide_context=True,python_callable=read_config)
start >> py_op
read_text_file('/home/airflow/gcs/data/blob_list.txt')
i = 0
for b in list_of_blobs:
py_op >> dataflow_call(b,i) >> end
i = i+1
解决方法
此支架已解决,因为缺少作业 ID 参数
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。