微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Databricks Spark Pyspark RDD 重新分区 - “远程 RPC 客户端已断开关联可能是由于容器超过阈值或网络问题”

如何解决Databricks Spark Pyspark RDD 重新分区 - “远程 RPC 客户端已断开关联可能是由于容器超过阈值或网络问题”

我的代码在小型数据集(几百万行)上运行良好,但在较大数据集(> 10 亿行)上运行失败。它抛出的错误是:

Remote RPC client disassociated. Likely due to containers exceeding thresholds,or network issues. Check driver logs for WARN messages. 

我已经用细齿梳检查了 Executor 和 Driver 日志。没有任何内容可以表明两个大小的数据集之间发生了什么不同。我使用的代码是:

spark_df = spark_df.repartition([KEY COLUMNS])
rdd = spark_df.rdd.mapPartitions(lambda x: process_partition(x))
final_df = spark.createDataFrame(rdd,schema=schema,verifySchema=True)
final_df.write.format("delta").mode([MODE]).save([SAVE_LOCATION])

我尝试了很多东西:

  1. 更改了 groupby 以使组更小
  2. 增加集群中机器的资源
  3. 注释掉代码库中除 1 个之外的所有“转换”。
  4. 更改或添加了以下集群配置选项:
    • spark.network.timeout 10000000
    • spark.executor.heartbeatInterval 10000000
  5. 为作业添加了超时:10000000

在整个过程中,错误没有改变,日志似乎没有包含任何有用的信息来帮助我了解发生了什么。

解决方法

尽管日志和文档确实没有帮助将解决方案与问题联系起来,但解决方案最终非常简单。

默认情况下,Databricks/Spark 使用 200 个分区。对于较小的数据集,效果很好。对于较大的数据集,它太小了。解决方案是提供所需数量的分区作为重新分区调用的一部分。

spark_df = spark_df.repartition(1000,[KEY_COLUMNS])

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。