通用 Airflow 数据暂存操作符

如何解决通用 Airflow 数据暂存操作符

我想了解如何使用 Airflow 管理大数据。文档清楚地表明我们需要使用外部存储,而不是 XCom,但我找不到任何干净的数据进出工作节点的示例。

我的期望是应该有一个操作员可以在操作中运行暂存,运行主操作,然后再次暂存。

有这样的操作符或模式吗?我发现的关闭一个 S3 File Transform,但它运行一个可执行文件来进行转换,而不是一个通用的 Operator,例如我们想要使用的 DockerOperator。

我见过的其他“解决方案”依赖于在单个主机上运行的所有内容,并使用已知路径,这不是生产就绪的解决方案。

是否有支持数据暂存的运营商,或者是否有不依赖于每个运营商都配备云应对能力的 Airflow 处理大数据的具体示例?

解决方法

是和否。传统上,Airflow 主要是协调器——所以它通常不会“做”这些东西,它通常会告诉其他人该做什么。您很少需要将实际数据带给 Airflow Worker,Worker 主要是在那里告诉其他人数据来自哪里,如何处理以及将数据发送到哪里。

有例外(一些传输操作员实际上从一个服务下载数据并将其上传到另一个服务) - 所以数据通过 Airflow 节点,但这是一个例外而不是规则(更有效和更好的模式是调用一个外部服务来进行传输并有一个传感器等待它完成)。

这更像是 Airflow 的“历史”和“当前”运作方式,但是随着 Airflow 2 及更高版本,我们正在扩展这一点,并且越来越有可能执行类似于您所描述的模式,这是XCom 在那里发挥了重要作用。

您可以 - 从最近开始 - 开发自定义 XCom 后端,不仅可以共享元数据,还可以共享数据。您可以在此处查看文档 https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends,但您也可以阅读来自 Astronomer 的这篇精彩文章 https://www.astronomer.io/guides/custom-xcom-backends 以及来自 Airflow Summit 2021(上周)的精彩演讲:https://airflowsummit.org/sessions/2021/customizing-xcom-to-enhance-data-sharing-between-tasks/。我强烈推荐观看演讲!

看看你的模式 - XCom Pull 是登入,Operator 的 execute() 是操作,XCom Push 是登出。

这种模式将得到加强,我认为通过即将发布的 Airflow 版本和即将到来的一些很酷的集成。未来可能会有更酷的数据共享选项(但我认为它们都将基于 XCom 实现——也许略有增强)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?