Spark Java PCA:Java堆空间和缺少用于混洗的输出位置

如何解决Spark Java PCA:Java堆空间和缺少用于混洗的输出位置

我尝试在具有 4.827行和40.107列的数据帧上执行PCA,但是我遇到了Java堆空间错误,并且缺少用于混洗的输出位置(根据执行程序上的sdterr文件)。该错误发生在PCA的“ RowMatrix.scala:122处的treeAggregate” 阶段。

集群

这是一个具有16个工作程序节点的独立群集,每个节点都有1个具有4个内核和21.504mb内存的执行程序。主节点具有15g的内存,我通过“ Java -jar -Xmx15g myapp.jar”提供了内存。另外,“ spark.sql.shuffle.partitions”是192,“ spark.driver.maxResultSize”是6g。

简化代码

df1.persist (From the Storage Tab in spark UI it says it is 3Gb)
df2=df1.groupby(col1).pivot(col2).mean(col3) (This is a df with 4.827 columns and 40.107 rows)
df2.collectFirstColumnAsList
df3=df1.groupby(col2).pivot(col1).mean(col3) (This is a df with 40.107 columns and 4.827 rows)

-----it hangs here for around 1.5 hours creating metadata for upcoming dataframe-----

df4 = (..Imputer or na.fill on df3..)
df5 = (..VectorAssembler on df4..)
(..PCA on df5 with error Missing output location for shuffle..)
df1.unpersist

我已经看到并尝试了许多解决方案,但没有任何结果。其中:

  1. 将df5或df4重新分区为16、64、192、256、1000、4000(尽管数据看起来不偏斜)
  2. 将spark.sql.shuffle.partitions更改为16、64、192、256、1000、4000
  3. 每个执行器使用1个和2个内核,以便为每个任务分配更多的内存。
  4. 有2个具有2个核心或4个核心的执行程序。
  5. 将“ spark.memory.fraction”更改为0.8,并将“ spark.memory.storageFraction”更改为0.4。

总是一样的错误!怎么可能耗尽所有这些记忆? df实际上是否可能不适合内存?请让我知道是否需要其他信息或打印屏幕。

编辑1

我将集群更改为2个spark工作者,每个有1个执行程序,每个spark.sql.shuffle.partitions = 48。每个执行器具有115g和8个核心。下面是我加载文件(2.2Gb),将每一行转换为密集向量并送入PCA的代码。

文件中的每一行都具有这种格式(4.568行,每个行具有40.107个双精度值):

 "[x1,x2,x3,...]"

和代码:

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header","true").load("/home/ubuntu/yolo.csv");
StructType schema2 = new StructType(new StructField[] {
                        new StructField("intensity",new VectorUDT(),false,Metadata.empty())
            });
Dataset<Row> df = df1.map((Row originalrow) -> {
                    String yoho =originalrow.get(0).toString();
                    int sizeyoho=yoho.length();
                    String yohi = yoho.substring(1,sizeyoho-1);
                    String[] yi = yohi.split(",");
                    int s = yi.length;
                    double[] tmplist= new double[s];
                    for(int i=0;i<s;i++){
                        tmplist[i]=Double.parseDouble(yi[i]);
                    }
                    
                    Row newrow = RowFactory.create(Vectors.dense(tmplist));
                    return newrow;
            },RowEncoder.apply(schema2));
PCAModel pcaexp = new PCA()
                    .setInputCol("intensity")
                    .setOutputCol("pcaFeatures")
                    .setK(2)
                    .fit(df);

我在2个工人之一的stderr上遇到的确切错误是:

ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 43)
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是SparkUI的“阶段”标签:

stagestab

这是失败的阶段(RowMatrix.scala:122处的TreeAggregate):

treeaggregate

编辑2

console

sparkstages

编辑3

我读取了整个文件,但每行仅获取10个值并创建了密集矢量。我仍然遇到相同的错误!我有一个具有235g Ram的大师和3个工人(每个执行者1个具有4个核心)和每个执行者64g Ram。这怎么可能发生? (不要忘了文件的总大小只有2.3Gb!)

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header","true").load("/home/ubuntu/yolo.csv");

StructType schema2 = new StructType(new StructField[] {
                        new StructField("intensity",");//this string array has all 40.107 values
                    int s = yi.length;
                    double[] tmplist= new double[s];
                    for(int i=0;i<10;i++){//I narrow it down to take only the first 10 values of each row
                        tmplist[i]=Double.parseDouble(yi[i]);
                    }
                    Row newrow = RowFactory.create(Vectors.dense(tmplist));
                    return newrow;
            },RowEncoder.apply(schema2));
      
PCAModel pcaexp = new PCA()
                    .setInputCol("intensity")
                    .setOutputCol("pcaFeatures")
                    .setK(2)
                    .fit(df);

解决方法

当您的Spark应用程序执行较大的洗牌阶段时,会出现“缺少洗牌的输出位置” ,它会尝试在执行程序之间重新分配大量数据,并且您的集群网络中存在一些问题。

>

Spark说您在某个阶段没有记忆。您正在进行的转换需要不同的阶段,并且它们也占用内存。此外,您首先要持久存储数据帧,并且应该检查存储级别,因为有可能持久存储在内存中。

您正在链接多个Spark广泛的转换:例如,执行第一个枢轴阶段,Spark创建一个阶段并对您的列执行随机分组以分组,也许您有 数据偏斜 ,并且执行器消耗的内存比其他执行器大得多,并且其中之一可能会发生错误。

除数据帧转换外,PCA估计器还将数据帧转换为RDD,从而增加了更多的内存来计算协方差矩阵,并且可以与未分布的NxN元素的Breeze矩阵的密集表示形式配合使用。 >。例如,SVD是用Breeze制造的。这给执行者之一带来很大压力。

也许您可以将生成的数据帧保存在HDFS(或其他格式)中,并在PCA中使用另一个Spark应用程序。

主要问题。您拥有的是,在de SVD之前,该算法需要计算Grammian矩阵,并且使用RDD中的treeAggregate。这将创建一个非常大的Double矩阵,该矩阵将发送给驱动程序,并且由于驱动程序内存不足而导致错误。您需要大幅增加驱动程序内存。您遇到了网络错误,如果一位执行者失去了连接,则作业崩溃了,它不会尝试重新执行。

我个人而言,我将尝试直接在驱动程序的Breeze(或Smile)中进行PCA,因为数据集比协方差矩阵小得多,所以请收集RDD字段,并手动进行Float表示。

仅使用Breeze而不是Spark或TreeAgregation来计算PCA的代码:

import breeze.linalg._
import breeze.linalg.svd._

object PCACode {
  
  def mean(v: Vector[Double]): Double = v.valuesIterator.sum / v.size

  def zeroMean(m: DenseMatrix[Double]): DenseMatrix[Double] = {
    val copy = m.copy
    for (c <- 0 until m.cols) {
      val col = copy(::,c)
      val colMean = mean(col)
      col -= colMean
    }
    copy
  }

  def pca(data: DenseMatrix[Double],components: Int): DenseMatrix[Double] = {
    val d = zeroMean(data)
    val SVD(_,_,v) = svd(d.t)
    val model = v(0 until components,::)
    val filter = model.t * model
    filter * d
  }
  
  def main(args: Array[String]) : Unit = {
    val df : DataFrame = ???

    /** Collect the data and do the processing. Convert string to double,etc **/
    val data: Array[mutable.WrappedArray[Double]] =
      df.rdd.map(row => (row.getAs[mutable.WrappedArray[Double]](0))).collect()

    /** Once you have the Array,create the matrix and do the PCA **/
    val matrix = DenseMatrix(data.toSeq:_*)
    val pcaRes = pca(matrix,2)

    println("result pca \n" + pcaRes)
  }
}

此代码将在驱动程序中执行PCA,检查内存。如果崩溃,则可能是Float的提示。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res