如何解决Databricks Spark Pyspark RDD 重新分区 - “远程 RPC 客户端已断开关联可能是由于容器超过阈值或网络问题”
我的代码在小型数据集(几百万行)上运行良好,但在较大数据集(> 10 亿行)上运行失败。它抛出的错误是:
Remote RPC client disassociated. Likely due to containers exceeding thresholds,or network issues. Check driver logs for WARN messages.
我已经用细齿梳检查了 Executor 和 Driver 日志。没有任何内容可以表明两个大小的数据集之间发生了什么不同。我使用的代码是:
spark_df = spark_df.repartition([KEY COLUMNS])
rdd = spark_df.rdd.mapPartitions(lambda x: process_partition(x))
final_df = spark.createDataFrame(rdd,schema=schema,verifySchema=True)
final_df.write.format("delta").mode([MODE]).save([SAVE_LOCATION])
我尝试了很多东西:
- 更改了 groupby 以使组更小
- 增加集群中机器的资源
- 注释掉代码库中除 1 个之外的所有“转换”。
- 更改或添加了以下集群配置选项:
- spark.network.timeout 10000000
- spark.executor.heartbeatInterval 10000000
- 为作业添加了超时:10000000
在整个过程中,错误没有改变,日志似乎没有包含任何有用的信息来帮助我了解发生了什么。
解决方法
尽管日志和文档确实没有帮助将解决方案与问题联系起来,但解决方案最终非常简单。
默认情况下,Databricks/Spark 使用 200 个分区。对于较小的数据集,效果很好。对于较大的数据集,它太小了。解决方案是提供所需数量的分区作为重新分区调用的一部分。
spark_df = spark_df.repartition(1000,[KEY_COLUMNS])
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。