微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Spark 和 Redshift 时如何优化 ETL 数据管道以实现容错?

如何解决使用 Spark 和 Redshift 时如何优化 ETL 数据管道以实现容错?

我正在使用 PySpark 编写一个大批量作业,该作业对 200 个表进行 ETL 处理并加载到 Amazon Redshift 中。 这 200 个表是从一个输入数据源创建的。所以只有当数据成功加载到所有 200 个表时,批处理作业才成功。批处理作业每天运行,同时将每个日期的数据附加到表中。

对于容错、可靠性和幂等性,我当前的工作流程如下:

  1. 使用临时表。使用 CREATE TEMP TABLE LIKE <target_table>
  2. 创建临时 Redshift 表
  3. 将数据转换并加载到临时表中。
  4. 对其他 200 个表重复 1-2。
  5. 开始 BEGIN 次交易。
  6. 将临时表数据复制到目标表中 使用 INSERT INTO <taget_table> SELECT * FROM <staging_table>
  7. END 交易
  8. DROP 所有临时表。

这样我就可以保证,如果第 3 步失败(更有可能),我不必担心从原始表中删除部分数据。相反,我将简单地重新运行整个批处理作业,因为在 JDBC 断开连接后会丢弃临时表。

虽然它解决了大部分问题,但它并不优雅、笨拙并且消耗额外的时间。我想知道 Spark 和/或 Redshift 是否提供了标准工具来解决 ETL 世界中这个非常普遍的问题。

谢谢

解决方法

COPY 命令可以在事务块中。你只需要:

  1. 开始
  2. 将数据复制到所有表
  3. 提交(如果成功)

Redshift 将为所有其他查看者维护表格的先前版本,并且他们对表格的视图在 COMMIT 之前不会更改。

您布置的进程的好处是,在事务运行期间,其他进程无法在表(ALTER TABLE 等)上获得排他锁。您的插入将比 COPY 运行得快,因此事务的打开时间将更短。如果其他进程在 ETL 运行的同时修改表,这只是一个问题,这通常不是一个好主意。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?