微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark JDBC 读取 API:动态确定日期时间类型的列的分区数

如何解决Spark JDBC 读取 API:动态确定日期时间类型的列的分区数

我正在尝试使用 PySpark 从 RDS MysqL 实例读取表。这是一个巨大的表,因此我想通过使用分区概念来并行化读取操作。该表没有用于查找分区数的数字列。相反,它有一个时间戳列(即日期时间类型)。

我通过检索时间戳列的最小值和最大值找到了下限和上限。但是,我不确定是否有标准公式可以动态找出分区数。这是我目前正在做的事情(对 numPartititons 参数的值进行硬编码):

select_sql = "SELECT {} FROM {}".format(columns,table)
partition_info = {'partition_column': 'col1','lower_bound': '<result of min(col1)>','upper_bound': '<result of max(col1)>','num_partitions': '10'}
read_df = spark.read.format("jdbc") \
        .option("driver",driver) \
        .option("url",url) \
        .option("dbtable",select_sql) \
        .option("user",user) \
        .option("password",password) \
        .option("useSSL",False) \
        .option("partitionColumn",partition_info['partition_column']) \
        .option("lowerBound",partition_info['lower_bound'])) \
        .option("upperBound",partition_info['upper_bound'])) \
        .option("numPartitions",partition_info['num_partitions']) \
        .load()

请建议我一个有效的解决方案/你的方法。谢谢

解决方法

如何设置 add_header 取决于您的集群定义。这里没有对错或自动设置。只要您了解 numPartitionspartitionColumnlowerBoundupperBound 以及可能的大量基准测试背后的逻辑,您就可以决定什么是正确的数字。

Pyspark - df.cache().count() taking forever to run

What is the meaning of partitionColumn,lowerBound,upperBound,numPartitions parameters?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?