微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark JDBC 过滤器记录超出边界

如何解决Spark JDBC 过滤器记录超出边界

我正在尝试优化一项日常工作,该工作将三个月的数据从 MysqL 表中提取到 HDFS 上的镶木地板中。他们目前正在以一种非常有创意的方式使用 MysqLdump,但有一个 spark/hdfs 生态系统,所以我想我会改用它。

背景

我像这样定义了如何读取数据库

# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period

df = session.read \
  .format("jdbc") \
  .option("url",url) \
  .option("driver",driver) \
  .option("dbtable","table1") \
  .option("user",username) \
  .option("password",password) \
  .option("partitionColumn","time_col") \
  .option("upperBound",end_time) \
  .option("lowerBound",start_time) \
  .option("numPartitions",partitions) \
  .load()

这真的很好用,除了第一个和最后一个分区有数十亿条记录,我什至不想要;

为了过滤掉表格的绝大部分,我更新了dtable

.option("dtable","(select * from table1 WHERE time_col >= {} and time_col < {}) as table2".format(start_time,end_time))

这种方式奏效了。当 end_time - start_time 较小时,作业运行良好,但不能扩展到 3 个月。

这是因为每个分区的查询现在都包含一个派生表

EXPLAIN SELECT * FROM (SELECT * From table1 WHERE time_col >=1585780000 AND time_col < 1585866400 ) as table2 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| id | select_type | table      | type  | possible_keys | key      | key_len | ref  | rows     | Extra       |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
|  1 | PRIMARY     | <derived2> | ALL   | NULL          | NULL     | NULL    | NULL | 25048354 | Using where |
|  2 | DERIVED     | table1     | range | time_col      | time_col | 4       | NULL | 25048354 | Using where |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+

相比之下,当我只使用 dtable = "table1" 时,生成查询是这样的;更简单更快

explain SELECT * From table1 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| id | select_type | table  | type  | possible_keys | key      | key_len | ref  | rows    | Extra       |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
|  1 | SIMPLE      | table1 | range | time_col      | time_col | 4       | NULL | 1097631 | Using where |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+

问题

有什么方法可以让我过滤掉 upperBoundlowerBound 之外的数据,同时仍然保留更简单的查询?比如阻止第一个和最后一个分区被运行,或者在运行时覆盖 dtable 以便它只用 table1 替换子查询

参数

我对 MysqL 5.7 上的表只有读取权限,不能创建视图或索引

我在 Spark 3.1 上开发,但我相信生产是在 Spark 2 上

是的,我已经考虑过 Spark Structured Streaming 和其他流媒体选项,但这不是我们目前要走的方向。

解决方法

我发现如果我添加 where() 方法,我可以避免子查询。示例:

# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period

df = session.read \
  .format("jdbc") \
  .option("url",url) \
  .option("driver",driver) \
  .option("dbtable","table1") \
  .option("user",username) \
  .option("password",password) \
  .option("partitionColumn","time_col") \
  .option("upperBound",end_time) \
  .option("lowerBound",start_time) \
  .option("numPartitions",partitions) \
  .load()

# This filters out everything outside of boundaries
# without creating a subquery
df.where('time_col >= {} AND time_col < {}'.format(start_time,end_time))

Spark 能够将子句与分区逻辑创建的子句一起添加。因此,没有子查询和更好的性能。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?