如何解决dask-yarn 作业在读取镶木地板时因 dumps_msgpack ImportError 失败
我正在尝试在 AWS EMR 集群上使用 dask-yarn
对一个小型镶木地板文件(10K 条记录)进行简单的读取和计数,该集群具有一个主节点和一个工作节点,两者都是 m5.xlarge
实例。
我正在尝试执行以下代码来测试我的集群:
import os
os.environ['ARROW_LIBHDFS_DIR'] = '/usr/lib/hadoop/lib/native/'
from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd
cluster = YarnCluster(environment='conf/conda_envs/dask_yarn.tar.gz',worker_vcores=1,worker_memory="2GiB")
cluster.scale(2)
client = Client(cluster)
# path = 's3://bucket-name/samples/data_10K/*'
path = 'hdfs:///samples/data_10K/*'
df = dd.read_parquet(path,engine='pyarrow',columns=['YEAR','MONTH','DAY_OF_MONTH','FL_DATE','DEP_TIME','ARR_TIME','ORIGIN','DEST','ACTUAL_ELAPSED_TIME'])
print(df.count().compute())
client.shutdown()
cluster.shutdown()
但是我得到了这个例外:
Traceback (most recent call last):
File "dask_test.py",line 30,in <module>
print(df.count().compute())
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py",line 284,in compute
(result,) = compute(self,traverse=False,**kwargs)
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py",line 566,in compute
results = schedule(dsk,keys,**kwargs)
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py",line 2646,in get
futures = self._graph_to_futures(
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py",line 2554,in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self,keyset)
File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/highlevelgraph.py",line 946,in __dask_distributed_pack__
from distributed.protocol.core import dumps_msgpack
ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/protocol/core.py)
Exception ignored in: <function YarnCluster.__del__ at 0x7f6584a2ac10>
从 s3
和 hdfs
读取时出现此异常。
我当前的 conda 环境如下:
# packages in environment at /home/hadoop/miniconda3/envs/dask_yarn:
#
# Name Version Build Channel
_libgcc_mutex 0.1 conda_forge conda-forge
_openmp_mutex 4.5 1_gnu conda-forge
aiobotocore 1.3.0 pyhd8ed1ab_0 conda-forge
aiohttp 3.7.4 py38h497a2fe_0 conda-forge
aioitertools 0.7.1 pyhd8ed1ab_0 conda-forge
async-timeout 3.0.1 py_1000 conda-forge
attrs 20.3.0 pyhd3deb0d_0 conda-forge
blas 1.0 openblas anaconda
bokeh 2.2.3 py38_0 anaconda
boost-cpp 1.74.0 hc6e9bd1_2 conda-forge
botocore 1.20.49 pyhd8ed1ab_0 conda-forge
brotlipy 0.7.0 py38h497a2fe_1001 conda-forge
bzip2 1.0.8 h7f98852_4 conda-forge
c-ares 1.17.1 h7f98852_1 conda-forge
ca-certificates 2020.12.5 ha878542_0 conda-forge
certifi 2020.12.5 py38h578d9bd_1 conda-forge
cffi 1.14.5 py38ha65f79e_0 conda-forge
chardet 4.0.0 py38h578d9bd_1 conda-forge
click 7.1.2 pyh9f0ad1d_0 conda-forge
cloudpickle 1.6.0 py_0 conda-forge
conda-pack 0.6.0 pyhd3deb0d_0 conda-forge
cryptography 3.4.7 py38ha5dfef3_0 conda-forge
curl 7.76.1 h979ede3_1 conda-forge
cytoolz 0.11.0 py38h497a2fe_3 conda-forge
dask 2021.4.0 pyhd3eb1b0_0
dask-core 2021.4.0 pyhd3eb1b0_0
dask-yarn 0.9 py38h578d9bd_0 conda-forge
distributed 2021.4.1 py38h578d9bd_0 conda-forge
freetype 2.10.4 h5ab3b9f_0 anaconda
fsspec 2021.4.0 pyhd8ed1ab_0 conda-forge
gettext 0.19.8.1 h0b5b191_1005 conda-forge
greenlet 1.0.0 py38h709712a_0 conda-forge
grpcio 1.37.0 py38hdd6454d_0 conda-forge
heapdict 1.0.1 py_0 conda-forge
icu 68.1 h58526e2_0 conda-forge
idna 3.1 pyhd3deb0d_0 conda-forge
jinja2 2.11.2 py_0 anaconda
jmespath 0.10.0 pyh9f0ad1d_0 conda-forge
jpeg 9b habf39ab_1 anaconda
krb5 1.17.2 h926e7f8_0 conda-forge
lcms2 2.11 h396b838_0 anaconda
ld_impl_linux-64 2.35.1 hea4e1c9_2 conda-forge
libcurl 7.76.1 hc4aaa36_1 conda-forge
libedit 3.1.20191231 he28a2e2_2 conda-forge
libev 4.33 h516909a_1 conda-forge
libffi 3.3 h58526e2_2 conda-forge
libgcc-ng 9.3.0 h2828fa1_19 conda-forge
libgcrypt 1.9.3 h7f98852_0 conda-forge
libgfortran-ng 7.3.0 hdf63c60_0 anaconda
libgomp 9.3.0 h2828fa1_19 conda-forge
libgpg-error 1.42 h9c3ff4c_0 conda-forge
libgsasl 1.8.0 2 conda-forge
libhdfs3 2.3 hb485604_1015 conda-forge
libiconv 1.16 h516909a_0 conda-forge
libnghttp2 1.43.0 h812cca2_0 conda-forge
libntlm 1.4 h7f98852_1002 conda-forge
libopenblas 0.3.10 h5a2b251_0 anaconda
libpng 1.6.37 hbc83047_0 anaconda
libprotobuf 3.15.8 h780b84a_0 conda-forge
libssh2 1.9.0 ha56f1ee_6 conda-forge
libstdcxx-ng 9.3.0 h6de172a_19 conda-forge
libtiff 4.1.0 h2733197_1
libuuid 2.32.1 h7f98852_1000 conda-forge
libxml2 2.9.10 h72842e0_4 conda-forge
locket 0.2.0 py_2 conda-forge
lz4-c 1.9.3 h9c3ff4c_0 conda-forge
markupsafe 1.1.1 py38h7b6447c_0 anaconda
msgpack-python 1.0.2 py38h1fd1430_1 conda-forge
multidict 5.1.0 py38h497a2fe_1 conda-forge
ncurses 6.2 h58526e2_4 conda-forge
numpy 1.19.1 py38h30dfecb_0 anaconda
numpy-base 1.19.1 py38h75fe3a5_0 anaconda
olefile 0.46 py_0 anaconda
openssl 1.1.1k h7f98852_0 conda-forge
packaging 20.4 py_0 anaconda
pandas 1.1.3 py38he6710b0_0 anaconda
partd 1.2.0 pyhd8ed1ab_0 conda-forge
pillow 8.0.0 py38h9a89aac_0 anaconda
pip 21.1 pyhd8ed1ab_0 conda-forge
protobuf 3.15.8 py38h709712a_0 conda-forge
psutil 5.8.0 py38h497a2fe_1 conda-forge
pyarrow 4.0.0 pypi_0 pypi
pycparser 2.20 pyh9f0ad1d_2 conda-forge
pyopenssl 20.0.1 pyhd8ed1ab_0 conda-forge
pyparsing 2.4.7 py_0 anaconda
pysocks 1.7.1 py38h578d9bd_3 conda-forge
python 3.8.8 hffdb5ce_0_cpython conda-forge
python-dateutil 2.8.1 py_0 anaconda
python_abi 3.8 1_cp38 conda-forge
pytz 2020.1 py_0 anaconda
pyyaml 5.4.1 py38h497a2fe_0 conda-forge
readline 8.1 h46c0cb4_0 conda-forge
s3fs 2021.4.0 pyhd8ed1ab_0 conda-forge
setuptools 49.6.0 py38h578d9bd_3 conda-forge
six 1.15.0 pyh9f0ad1d_0 conda-forge
skein 0.8.1 py38h578d9bd_1 conda-forge
sortedcontainers 2.3.0 pyhd8ed1ab_0 conda-forge
sqlalchemy 1.4.11 py38h497a2fe_0 conda-forge
sqlite 3.35.5 h74cdb3f_0 conda-forge
tblib 1.7.0 pyhd8ed1ab_0 conda-forge
tk 8.6.10 h21135ba_1 conda-forge
toolz 0.11.1 py_0 conda-forge
tornado 6.1 py38h497a2fe_1 conda-forge
typing-extensions 3.7.4.3 0 conda-forge
typing_extensions 3.7.4.3 py_0 anaconda
urllib3 1.26.4 pyhd8ed1ab_0 conda-forge
wheel 0.36.2 pyhd3deb0d_0 conda-forge
wrapt 1.12.1 py38h497a2fe_3 conda-forge
xz 5.2.5 h516909a_1 conda-forge
yaml 0.2.5 h516909a_0 conda-forge
yarl 1.6.3 py38h497a2fe_1 conda-forge
zict 2.0.0 py_0 conda-forge
zlib 1.2.11 h516909a_1010 conda-forge
zstd 1.4.9 ha95c52a_0 conda-forge
我必须使用 pyarrow
安装 pip3
,否则我会遇到另一个异常,它不允许我从 hdfs 或 s3 中读取数据。
纱线日志如下:
21/04/28 23:28:31 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:28:31 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:dask.scheduler.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:787
LogContents:
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://XXXXXXXXXXXX:32843
distributed.scheduler - INFO - dashboard at: :34205
distributed.scheduler - INFO - Receive client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Remove client Client-6371f976-a879-11eb-aff0-063a3c27c63d
distributed.scheduler - INFO - Close client connection: Client-6371f976-a879-11eb-aff0-063a3c27c63d
End of LogType:dask.scheduler.log
***********************************************************************************
End of LogType:prelaunch.err
******************************************************************************
Container: container_1619645048753_0020_01_000002 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container
End of LogType:prelaunch.out
******************************************************************************
Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:application.master.log
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:2700
LogContents:
21/04/28 23:27:54 INFO skein.ApplicationMaster: Starting Skein version 0.8.1
21/04/28 23:27:54 INFO skein.ApplicationMaster: Running as user hadoop
21/04/28 23:27:54 INFO conf.Configuration: resource-types.xml not found
21/04/28 23:27:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = memory-mb,units = Mi,type = COUNTABLE
21/04/28 23:27:54 INFO resource.ResourceUtils: Adding resource type - name = vcores,units =,type = COUNTABLE
21/04/28 23:27:54 INFO skein.ApplicationMaster: Application specification successfully loaded
21/04/28 23:27:55 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8030
21/04/28 23:27:55 INFO skein.ApplicationMaster: gRPC server started at ip-xxxxxxxxx.us-west-1.compute.internal:46405
21/04/28 23:27:56 INFO skein.ApplicationMaster: WebUI server started at ip-xxxxxxxxx.us-west-1.compute.internal:46231
21/04/28 23:27:56 INFO skein.ApplicationMaster: Registering application with resource manager
21/04/28 23:27:56 INFO client.RMProxy: Connecting to ResourceManager at ip-XXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:8032
21/04/28 23:27:56 INFO client.AHSProxy: Connecting to Application History server at ip-XXXXXXXXX.us-west-1.compute.internal/XXXXXXXXXX:10200
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
21/04/28 23:27:56 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
21/04/28 23:27:57 INFO skein.ApplicationMaster: Starting container_1619645048753_0020_01_000002...
21/04/28 23:27:57 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1619645048753_0020_01_000002
21/04/28 23:28:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances,a delta of 2.
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
21/04/28 23:28:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
21/04/28 23:28:09 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
21/04/28 23:28:09 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
21/04/28 23:28:09 INFO skein.ApplicationMaster: Deleted application directory hdfs://ip-XXXXXXXXX.us-west-1.compute.internal:8020/user/hadoop/.skein/application_1619645048753_0020
21/04/28 23:28:09 INFO skein.ApplicationMaster: WebUI server shut down
21/04/28 23:28:09 INFO skein.ApplicationMaster: gRPC server shut down
End of LogType:application.master.log
***************************************************************************************
End of LogType:prelaunch.err
******************************************************************************
Container: container_1619645048753_0020_01_000001 on ip-xxxxxxxxx.us-west-1.compute.internal_8041
LogAggregationType: AGGREGATED
===================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Apr 28 23:28:10 +0000 2021
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container
End of LogType:prelaunch.out
******************************************************************************
有人知道这个问题的解决方法吗?
谢谢!
解决方法
您的 dask 和分布式版本不同步,2021.4.0 与 2021.4.1。更新 dask 应该可以解决这个问题。请注意,您需要确保在用于 YARN 的环境中也存在完全相同的版本。
,我通过安装以前版本的 string
而不是最新版本解决了这个问题。
dask 更改日志 (https://distributed.dask.org/en/latest/changelog.html) 显示 distributed
在分布式 2021.4.1 中已被删除,因此降级到 2021.4.0 就成功了。
但是,从环境中删除软件包会导致更多问题,因此我建议您从一开始就使用此命令安装分布式,以确保使用 risght 分布式版本:
dumps_msgpack
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。