如何解决并行读取数据时,Spark 不会将过滤器下推到 PostgreSQL 数据源,为下限和上限提供值
我正在尝试并行读取 Postgresql 表中的数据。我使用时间戳列作为分区列并提供下限、上限和 numPartitions 的值。它正在创建多个查询以并行读取数据,但它没有将过滤器下推到 Postgresql 数据库。当我在数据帧上使用解释命令时,它在物理计划中的推送过滤器中没有任何内容。我也试过在加载方法之后应用过滤器子句,但它仍然没有按下过滤器。
选项 1:这里我没有使用过滤条件
val df = spark.read
.format("jdbc")
.option("url",jdbcurl)
.option("dbtable",query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver","org.postgresql.Driver")
.option("fetchsize",50000)
.option("user","user")
.option("password","password")
.option("lowerBound","2018-01-01 00:00:00")
.option("upperBound","2018-12-31 23:59:00")
.load
解释计划输出
== Physical Plan ==
*(1) Scan JDBCRelation(( SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] PushedFilters: [],ReadSchema: struct<columnnames>
现在,如果我确实在 df 上进行了解释,推送的过滤器中没有任何内容,但是我能够使用 pg_stat_activity 从 Postgresql 获取的查询显示了 12 个具有 where 条件的不同查询。我在这里提供了一个查询。
SELECT 1 FROM ( SELECT columnnames
FROM schema.Transaction ) a WHERE "transactionbegin" >= '2018-03-02 19:59:50' AND "transactionbegin" < '2018-04-02 05:59:45'
我在这里有点困惑,它是过滤 Postgresql 中的记录还是按照解释计划在 spark 中执行此操作,您在推送的过滤器中没有任何内容,但基于生成的查询,它看起来像它正在过滤 Postgresql 中的数据。
选项 2:使用过滤条件
val df = spark.read
.format("jdbc")
.option("url","2018-12-31 23:59:00")
.load.filter(s"TransactionBegin between cast('2018-01-01 00:00:00' as TIMESTAMP) and cast('2018-12-31 23:59:00' as TIMESTAMP)")
解释上述数据框的计划
== Physical Plan ==
*(1) Scan JDBCRelation(( SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames]
PushedFilters: [*IsNotNull(transactionbegin),*GreaterThanorEqual(transactionbegin,2018-01-01 00:00:00.0),...,ReadSchema: struct<columnnames>
使用 pg_stat_activity 来自 Postgresql 的查询之一
SELECT 1 FROM ( SELECT columnnames
FROM schema.Transaction ) a
WHERE (("transactionbegin" IS NOT NULL) AND ("transactionbegin" >= '2018-01-01 00:00:00.0')
AND ("transactionbegin" <= '2018-12-31 23:59:00.0')) AND
("transactionbegin" >= '2018-06-02 01:59:35' AND "transactionbegin" < '2018-07-02 11:59:30')
我想理解的是,为什么在提供分区列和下限和上限时,它没有将过滤器推送到数据库,而是在通过将值转换为时间戳来应用显式过滤器后,它会向下推过滤器。框架也不应该足够聪明,将我们传递的值视为下限和上限,以将其视为时间戳列的范围。
如果您有大量数据需要在过滤条件之后读取,那么最有效的处理方法是什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。