在多个s3键上执行气流任务,然后执行下一个任务

如何解决在多个s3键上执行气流任务,然后执行下一个任务

我有一个用例,我们在s3目录中有一组10个文件(比方说)。我们正在尝试将这些文件重命名为第二个目录中对应的映射的重命名文件名模式。我通过传递文件名动态创建了任务ID唯一性。

 for file in rename_list:
    rename_tak = RenameOperator(
        task_id="file_rename_task_{}".format(str(file.split(":")[0])),s3_conn_id=s3_CONN_ID,source_s3_bucket=source_bucket,destination_s3_bucket=destination_bucket,s3_key = source_prefix + source_key,rename_key = destination_key,output_prefix = output_prefix,dag=dag))

然后,我再次需要对其执行另一操作,最后将其移至最终的s3目录。这也将在for循环中运行,因为我们有多个文件

现在,问题是第二个操作员/任务执行没有被调用,没有错误,但是Airflow日志显示“任务处于“已删除”状态,这不是有效的执行状态。必须清除任务为了运行。”重命名运算符的第一个任务全部成功,但是第二个运算符只是被删除,没有输出,没有日志。

任何反馈,这里可能出什么问题。

解决方法

气流工作流是静态的,不建议使用动态任务ID。据我所知,动态DAG可以通过一些黑客手段来实现,但Airflow并不直接支持。

现在,问题是第二个操作员/任务执行没有被调用,没有错误,但是Airflow日志显示“任务处于“已删除”状态,这不是有效的执行状态。必须清除任务为了运行。”重命名运算符的第一个任务全部成功,但是第二个运算符只是被删除,没有输出,没有日志。

在不知道如何在任务之间添加依赖性的情况下,很难确定问题所在。

我对问题的理解:

  1. 您有一个s3存储桶,该存储桶在路径(源)中包含一些文件
  2. 您需要对每个文件进行一些操作
  3. 将完成的文件移动到新的s3路径(目标)

如果我的理解是正确的,那么我设计工作流程的方式是:

download_files_from_source >> apply_some_operation_on_downloaded_files >> move_files_to_destination

如果跨工作人员共享文件系统,这将起作用。 DAG的每次运行都应具有自己的登台目录,以使属于不同DAG运行的文件不会重叠。

否则,您还可以编写一个自定义运算符来执行上一个解决方案中的所有三个任务,即从源下载文件,对下载的文件进行一些操作,然后将文件移到目标位置。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?