微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

并行读取数据时,Spark 不会将过滤器下推到 PostgreSQL 数据源,为下限和上限提供值

如何解决并行读取数据时,Spark 不会将过滤器下推到 PostgreSQL 数据源,为下限和上限提供值

我正在尝试并行读取 Postgresql 表中的数据。我使用时间戳列作为分区列并提供下限、上限和 numPartitions 的值。它正在创建多个查询以并行读取数据,但它没有将过滤器下推到 Postgresql 数据库。当我在数据帧上使用解释命令时,它在物理计划中的推送过滤器中没有任何内容。我也试过在加载方法之后应用过滤器子句,但它仍然没有按下过滤器。

选项 1:这里我没有使用过滤条件

 val df = spark.read
        .format("jdbc")
        .option("url",jdbcurl)
        .option("dbtable",query)
        .option("partitionColumn","transactionbegin")
        .option("numPartitions",12) 
        .option("driver","org.postgresql.Driver")
        .option("fetchsize",50000)  
        .option("user","user")
        .option("password","password")
        .option("lowerBound","2018-01-01 00:00:00")
        .option("upperBound","2018-12-31 23:59:00")
        .load

解释计划输出

== Physical Plan ==
*(1) Scan JDBCRelation((   SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] PushedFilters: [],ReadSchema: struct<columnnames>

现在,如果我确实在 df 上进行了解释,推送的过滤器中没有任何内容,但是我能够使用 pg_stat_activity 从 Postgresql 获取查询显示了 12 个具有 where 条件的不同查询。我在这里提供了一个查询

SELECT 1 FROM (   SELECT columnnames
FROM schema.Transaction ) a WHERE "transactionbegin" >= '2018-03-02 19:59:50' AND "transactionbegin" < '2018-04-02 05:59:45'

在这里有点困惑,它是过滤 Postgresql 中的记录还是按照解释计划在 spark 中执行此操作,您在推送的过滤器中没有任何内容,但基于生成查询,它看起来像它正在过滤 Postgresql 中的数据。

选项 2:使用过滤条件

val df = spark.read
        .format("jdbc")
        .option("url","2018-12-31 23:59:00")
        .load.filter(s"TransactionBegin between cast('2018-01-01 00:00:00' as TIMESTAMP) and cast('2018-12-31 23:59:00' as TIMESTAMP)")

解释上述数据框的计划

== Physical Plan ==
*(1) Scan JDBCRelation((   SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] 
PushedFilters: [*IsNotNull(transactionbegin),*GreaterThanorEqual(transactionbegin,2018-01-01 00:00:00.0),...,ReadSchema: struct<columnnames>

使用 pg_stat_activity 来自 Postgresql查询之一

SELECT 1 FROM (   SELECT columnnames
FROM schema.Transaction ) a 
WHERE (("transactionbegin" IS NOT NULL) AND ("transactionbegin" >= '2018-01-01 00:00:00.0') 
AND ("transactionbegin" <= '2018-12-31 23:59:00.0')) AND 
("transactionbegin" >= '2018-06-02 01:59:35' AND "transactionbegin" < '2018-07-02 11:59:30')

我想理解的是,为什么在提供分区列和下限和上限时,它没有将过滤器推送到数据库,而是在通过将值转换为时间戳来应用显式过滤器后,它会向下推过滤器。框架也不应该足够聪明,将我们传递的值视为下限和上限,以将其视为时间戳列的范围。

如果您有大量数据需要在过滤条件之后读取,那么最有效的处理方法是什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?