如何解决使用 Apache spark 使用 JanusGraph 执行 OLAP 查询时,Gremlin 控制台和 Spark UI 没有响应
我在 Janusgraph(v0.5.3) 上有一个图,其中包含大约 200 万个顶点和 2000 万条边。我正在制作一个 OLAP 查询,它是 lowest_common_ancestor 配方的修改版本(下面添加了查询)。
查询耗时太长(超过 1 小时),我看到 Managed memory leak detected;
警告,然后 Spark Web UI 没有响应
不再(不能再调试了)。
我还看到了 Lost executor driver on localhost: Executor heartbeat timed out
警告。但是即使在 1 小时后查询也没有退出。作业开始 30 分钟后,我看到这些警告。一世
希望 spark 和 hadoop 能让查询更快,但这似乎
非常慢。我无法分析查询或查看 Spark Web UI 以了解进度。
注意:我已经安装了 hadoop(3.2.2) 并使用了 Apache Spark(2.4.0)。我假设 spark 附带 janusgraph 发行版,我不记得安装了。但是 JanusGraph 文档说 v0.5.3 是 compatible with spark 2.2.x ),不确定是否是 spark 兼容性问题?
以下是我如何使用 bin/gremlin.sh
控制台运行查询。
graph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
g = graph.traversal().withComputer(SparkGraphComputer)
// OLAP query
input = [2437272,4956336]
g.V().has(id,within(input)).
aggregate('input').hasId(input.head()).
repeat(__.in('has_word')).emit().as('x').
select('input').unfold().has(id,within(input.tail())).
repeat(__.in('has_word')).emit(where(eq('x'))).
group().
by(select('x')).
by(path().count(local).fold()).
unfold().filter(select(values).count(local).is(input.tail().size())).
order().
by(select(values).unfold().sum()).
select(keys).limit(5).elementMap()
我假设 hadoop 配置良好
user@xyz-WS:~/Downloads/janusgraph-full-0.5.3$ bin/gremlin.sh
\,/
(o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
00:56:59 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin>
gremlin> hdfs
==>storage[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1845984070_1,ugi=rrmerugu (auth:SIMPLE)]]]
conf/hadoop-graph/read-cql.properties 如下所述
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsIndistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true
gremlin.spark.persistStorageLevel=disK_ONLY #MEMORY_AND_disK
#
# JanusGraph Cassandra InputFormat configuration
#
# These properties defines the connection properties which were used while write data to JanusGraph.
janusgraphmr.ioformat.conf.storage.backend=cql
# This specifies the hostname & port for Cassandra data store.
janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1
janusgraphmr.ioformat.conf.storage.port=9042
# This specifies the keyspace where data is stored.
janusgraphmr.ioformat.conf.storage.cql.keyspace=janusgraph
# This defines the indexing backend configuration used while writing data to JanusGraph.
janusgraphmr.ioformat.conf.index.search.backend=elasticsearch
janusgraphmr.ioformat.conf.index.search.hostname=127.0.0.1
# Use the appropriate properties for the backend when using a different storage backend (HBase) or indexing backend (Solr).
#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.widerows=true
#
# SparkGraphComputer Configuration
#
spark.master=local[8]
spark.executor.memory=12g
spark.driver.memory=12g
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator
spark.executor.memoryOverhead=1g
spark.driver.memoryOverhead=1g
spark.network.timeout=600s
spark.executor.heartbeatInterval=119s
spark.io.compression.codec=snappy
gremlin 控制台输出
rrmerugu@Code-WS:~/Downloads/janusgraph-full-0.5.3$ bin/gremlin.sh
\,/
(o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
07:57:25 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin> graph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
==>hadoopgraph[cqlinputformat->nulloutputformat]
gremlin> g = graph.traversal().withComputer(SparkGraphComputer)
==>graphtraversalsource[hadoopgraph[cqlinputformat->nulloutputformat],sparkgraphcomputer]
gremlin> input = [2437272,4956336]
==>2437272
==>4956336
gremlin> g.V().has(id,within(input)).
......1> aggregate('input').hasId(input.head()).
......2> repeat(__.in('has_word')).emit().as('x').
......3> select('input').unfold().has(id,within(input.tail())).
......4> repeat(__.in('has_word')).emit(where(eq('x'))).
......5> group().
......6> by(select('x')).
......7> by(path().count(local).fold()).
......8> unfold().filter(select(values).count(local).is(input.tail().size())).
......9> order().
.....10> by(select(values).unfold().sum()).
.....11> select(keys).limit(5).elementMap()
07:58:05 WARN org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer - class org.apache.hadoop.mapreduce.lib.output.NullOutputFormat does not implement PersistResultGraphAware and thus,persistence options are unkNown -- assuming all options are possible
07:58:06 WARN org.apache.spark.util.Utils - Your hostname,Code-WS resolves to a loopback address: 127.0.1.1; using 192.168.0.10 instead (on interface enp7s0)
07:58:06 WARN org.apache.spark.util.Utils - Set SPARK_LOCAL_IP if you need to bind to another address
08:01:59 WARN org.apache.spark.executor.Executor - Managed memory leak detected; size = 40472352 bytes,TID = 1633
08:02:03 WARN org.apache.spark.executor.Executor - Managed memory leak detected; size = 41050016 bytes,TID = 1683
08:11:26 WARN org.apache.spark.rpc.netty.NettyRpcEnv - Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.0.10:43701 in 119 seconds
08:11:29 WARN org.apache.spark.executor.Executor - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [119 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:835)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:864)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:864)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [119 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more
08:21:13 WARN org.apache.spark.rpc.netty.NettyRpcEnv - Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.0.10:43701 in 119 seconds
08:21:37 WARN org.apache.spark.executor.Executor - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [119 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:835)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:864)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:864)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [119 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more
08:32:07 WARN org.apache.spark.executor.Executor - Issue communicating with driver in heartbeater
08:35:18 WARN org.apache.spark.HeartbeatReceiver - Removing executor driver with no recent heartbeats: 614252 ms exceeds timeout 600000 ms
Exception in thread "dispatcher-event-loop-7" java.lang.OutOfMemoryError: Java heap space
08:44:53 ERROR org.apache.spark.util.Utils - Uncaught exception in thread driver-heartbeater
08:57:17 WARN org.spark_project.jetty.io.ManagedSelector -
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.Utils - uncaught error in thread Spark Context Cleaner,stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.Utils - throw uncaught Fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.executor.Executor - Exception in task 91.0 in stage 16.0 (TID 1890)
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler - Uncaught exception in thread Thread[Executor task launch worker for task 1890,5,main]
java.lang.OutOfMemoryError: Java heap space
08:58:56 WARN org.apache.spark.scheduler.TaskSetManager - Lost task 91.0 in stage 16.0 (TID 1890,localhost,executor driver): java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.scheduler.TaskSetManager - Task 91 in stage 16.0 Failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 91 in stage 16.0 Failed 1 times,most recent failure: Lost task 91.0 in stage 16.0 (TID 1890,executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:
Type ':help' or ':h' for help.
系统规格:我在配备 32GB RAM 和 1TB SSD 的 Ubuntu 6 核 i7 处理器上执行此操作。
我想找到的答案:
- 我是否遗漏了什么,有什么关于为什么这个查询花费太长时间并出现错误的提示?任何建议表示赞赏。
- 是否有更好的方法来分析此查询以及为什么它花费了太长时间;一旦发生内存泄漏错误,控制台和 Spark Web UI 就不会响应,而且我无法对此进行调试。
更新
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。