如何解决从 PySpark 和 Parquet 创建大型数组进行数据分析时如何避免内存不足错误?
我们正在试验一个有点不寻常的要求。我们有一个包含约 320 GB 频率像素值的镶木地板表 - 数据来自 3 维谱线图像立方体(FIT 源文件)。
+--------------------+------------+-------+
| col_name| data_type|comment|
+--------------------+------------+-------+
| spi_index| int| null|
| spi_image|array<float>| null|
| spi_filename| string| null|
| spi_band| int| null|
|# Partition Infor...| | |
| # col_name| data_type|comment|
| spi_filename| string| null|
| spi_band| int| null|
+--------------------+------------+-------+
有一个用 C 编写的遗留源查找算法,用于分析原始数据源文件,但它受限于一台机器上可用的物理内存。我们正在运行一个 CDH Express 集群、3 个管理节点(8 核、32 GB 内存)、10 个工作节点(16 核、64 GB 内存)和一个运行 Jupyterhub(8 核、32 GB 内存)的网关节点。我们已将原始 C 程序修改为共享对象,并将其分布在整个集群中。我们已将 C 共享对象合并到一个分区程序类中,因此我们可以在整个集群的多个执行程序中运行多个分区。我们使用 pyspark 启动并运行它。
我们遇到的问题是,我们最好需要一个至少约 15GB 的输入像素阵列,而我们似乎在创建约 7.3GB 的阵列时遇到了困难,我们不确定为什么会这样.
YARN 最大分配设置。
yarn.scheduler.maximum-allocation-mb = 40GB
Spark 配置设置
--executor-memory 18g \
--num-executors 29 \
--driver-memory 18g \
--executor-cores 5 \
--conf spark.executor.memoryOverhead='22g' \
--conf spark.driver.memoryOverhead='22g' \
--conf spark.driver.maxResultSize='24g' \
--conf "spark.executor.extrajavaoptions-XX:MaxDirectMemorySize=20G -XX:+UseCompressedOops" \
partitioner类总结
class Partitioner:
def __init__(self):
self.callPerDriverSetup
def callPerDriverSetup(self):
pass
def callPerPartitionSetup(self):
sys.path.append('sofia')
#import example
import sofia
import myLib
import faulthandler
import time as tm
from time import time,clock,process_time,sleep
self.sofia=sofia
self.myLib=myLib
#self.example=example
self.parameterFile=SparkFiles.get('sofia.par')
def doProcess(self,element):
### here's the call to the C library for each row of the dataframe partition
### In here we have to transform the flattened array data to the format SoFiA
### requires,as well as the
ra=np.array(element.right_ascension,dtype='<f4')
dec=np.array(element.declination,dtype='<f4')
frequencies=np.array(element.frequency,dtype='<f4')
Pixels=np.array(element.pixels,dtype='<f4')
dataPtr= Pixels.ravel()
#
# create the dictionary of the original header
#
hdrKey = np.array(element.keys,dtype='U')
hdrValue = np.array(element.values,dtype='U')
hdrDict = dict(zip(hdrKey,hdrValue))
newHdr=self.myLib.CreateHeader(hdrDict)
# Get the crpix adjustment values for the new header
crpix1,crpix2,crpix3,crpix4=self.myLib.GetHeaderUpdates(newHdr,\
element.raHeaderIndex,\
element.decHeaderIndex,\
element.freqHeaderIndex)
# Get the new axis values
naxis1 = len(ra)
naxis2 = len(dec)
naxis4 = len(frequencies)
newHdr['CRPIX1']=float(crpix1)
newHdr['CRPIX2']=float(crpix2)
newHdr['CRPIX3']=float(crpix3)
newHdr['CRPIX4']=float(crpix4)
newHdr['NAXIS1']=float(naxis1)
newHdr['NAXIS2']=float(naxis2)
newHdr['NAXIS4']=float(naxis4)
newHdr.pop("END",None)
hdrstr,hdrsize = self.myLib.dict2FITsstr(newHdr)
path_to_par = self.parameterFile
parsize = len(path_to_par)
# pass off to sofia C library
try:
# This is the call to the shared object
ret = self.sofia.sofia_mainline(dataPtr,hdrstr,hdrsize,path_to_par,parsize)
returnCode= ret[0]
sofiaMsg="Call to sofia has worked!"
except RuntimeError:
print("Caught general exception\n")
sofiaMsg="Caught general exception"
returnCode=1
#sys.exit()
except stopiteration:
print("Caught Null pointer\n")
sofiaMsg="Caught Null pointer"
returnCode=2
#sys.exit()
except MemoryError:
print("Caught ALLOC error\n")
sofiaMsg="Caught ALLOC error"
returnCode=3
#sys.exit()
except IndexError:
print("Caught index range error\n")
sofiaMsg="Caught index range error"
returnCode=4
#ys.exit()
except IOError:
print("Caught file error\n")
sofiaMsg="Caught file error"
returnCode=5
#sys.exit()
except OverflowError:
print("Caught integer overflow error\n")
sofiaMsg="Caught integer overflow error"
returnCode=6
#sys.exit()
except TypeError:
print("Caught user input error\n")
sofiaMsg="Caught user input error"
returnCode=7
#sys.exit()
except SystemExit:
print("Caught no sources error\n")
sofiaMsg="Caught no sources error"
returnCode=8
#sys.exit()
if returnCode==0:
catalogXML=ret[5].tobytes()
pass
else:
catalogXML=""
DELIMITER=chr(255) #"{}{}"
msg="{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}"\
.format( str(returnCode),DELIMITER,\
str(element.raHeaderIndex),\
str(element.decHeaderIndex),\
str(len(ra)),\
str(len(dec)),\
str(len(frequencies)),\
str(element.freqHeaderIndex),\
str(catalogXML),\
str(dataPtr.nbytes)
)
return msg
def processpartition(self,partition):
self.callPerPartitionSetup()
for element in partition:
yield self.doProcess(element)
我们创建一个数据帧,其中数据帧的每一行代表一个 3 维数据数组,包含 3 个位置维度的数组数据列,以及包含像素值的数组(在 doProcess 中调用 ra、dec、频率和像素)。数据帧行还包含来自原始源文件的坐标系信息,我们用它来构建一组新的标头信息,该信息通过 df.rdd.mapPartitions 调用传递给 Partitioner 类的实例化实例。
p=Partitioner()
try:
...
...
...
...
# Creates the positional array dataframes,# and the single rows representing the 3d images in finalSubCubedF
# finalSubCube
#
...
...
...
finalSubcubedF=finalSubcubedF\
.join(broadcast(rarangeDF),finalSubcubedF.bins == rarangeDF.grp,"inner")\
.join(broadcast(decRangeDF),finalSubcubedF.bins == decRangeDF.grp,"inner")\
.join(broadcast(freqRangeDF),finalSubcubedF.bins == freqRangeDF.grp,"inner")\
.join(broadcast(hdrDF),finalSubcubedF.bins == hdrDF.grp,"inner")\
.select("bins","right_ascension","declination","frequency","raHeaderIndex",\
"decHeaderIndex","freqHeaderIndex","pixels","keys","values")
# repartition on the bins column to distribute the processing
finalSubcubedF=finalSubcubedF.repartition("bins")
finalSubcubedF.persist(StorageLevel.MEMORY_AND_disK_SER)
...
# Calls the partitioner class which containes the C shared object calls,as above
...
rddout=finalSubcubedF.rdd.mapPartitions(p.processpartition)
DELIMITER=chr(255)
rddout= rddout.map(lambda x:x.split(DELIMITER))
...
...
# Write the results (which activates the computation) including the catalogue XML file
rddout.saveAsTextFile("hdfs:///<file path>")
except Exception as e:
...
...
...
2021-02-16 11:03:42,830 INFO ERROR! Processthread FAILURE...An error occurred while calling o1989.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 47.1 Failed 4 times,most recent failure: Lost task 5.3 in stage 47.1 (TID 5875,hercules-1-2.nimbus.pawsey.org.au,executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as Failed: container_1612679558527_0181_01_000014 on host: hercules-1-2.nimbus.pawsey.org.au. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal
2021-02-16 11:03:42,executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as Failed: container_1612679558527_0181_01_000014 on host: hercules-1-2.nimbus.pawsey.org.au. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal
以及来自 YARN Container 日志的错误消息
LogType:stdout
Log Upload Time:Tue Feb 16 11:19:03 +0000 2021
LogLength:124
Log Contents:
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 20417"...
所以问题显然与内存有关,但不完全确定为什么会出现这种情况,因为执行程序和驱动程序进程内存设置设置得相当高?目前我们只是抓住了稻草。
我们知道在正常情况下不建议将分布式数据收集到数组中;然而,似乎能够在集群中并行运行多个 C 共享对象可能比在单台机器上串行运行 30-40 GB 的提取更有效。
预先感谢您的任何想法和帮助。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。