微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Apache spark 使用 JanusGraph 执行 OLAP 查询时,Gremlin 控制台和 Spark UI 没有响应

如何解决使用 Apache spark 使用 JanusGraph 执行 OLAP 查询时,Gremlin 控制台和 Spark UI 没有响应

我在 Janusgraph(v0.5.3) 上有一个图,其中包含大约 200 万个顶点和 2000 万条边。我正在制作一个 OLAP 查询,它是 lowest_common_ancestor 配方的修改版本(下面添加查询)。

查询耗时太长(超过 1 小时),我看到 Managed memory leak detected; 警告,然后 Spark Web UI 没有响应 不再(不能再调试了)。

我还看到了 Lost executor driver on localhost: Executor heartbeat timed out 警告。但是即使在 1 小时后查询也没有退出。作业开始 30 分钟后,我看到这些警告。一世 希望 spark 和 hadoop 能让查询更快,但这似乎 非常慢。我无法分析查询或查看 Spark Web UI 以了解进度。

注意:我已经安装了 hadoop(3.2.2) 并使用了 Apache Spark(2.4.0)。我假设 spark 附带 janusgraph 发行版,我不记得安装了。但是 JanusGraph 文档说 v0.5.3 是 compatible with spark 2.2.x ),不确定是否是 spark 兼容性问题?

以下是我如何使用 bin/gremlin.sh 控制台运行查询

graph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
g = graph.traversal().withComputer(SparkGraphComputer)

// OLAP query
input = [2437272,4956336]
g.V().has(id,within(input)).
           aggregate('input').hasId(input.head()). 
           repeat(__.in('has_word')).emit().as('x').
           select('input').unfold().has(id,within(input.tail())).
           repeat(__.in('has_word')).emit(where(eq('x'))).
           group().
             by(select('x')).
             by(path().count(local).fold()).
           unfold().filter(select(values).count(local).is(input.tail().size())).
           order().
             by(select(values).unfold().sum()).
           select(keys).limit(5).elementMap()

我假设 hadoop 配置良好

user@xyz-WS:~/Downloads/janusgraph-full-0.5.3$ bin/gremlin.sh 

         \,/
         (o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
00:56:59 WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin> 
gremlin> hdfs
==>storage[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1845984070_1,ugi=rrmerugu (auth:SIMPLE)]]]

conf/hadoop-graph/read-cql.properties 如下所述

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.cql.CqlInputFormat
gremlin.hadoop.graphWriter=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat

gremlin.hadoop.jarsIndistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.persistContext=true
gremlin.spark.persistStorageLevel=disK_ONLY #MEMORY_AND_disK

#
# JanusGraph Cassandra InputFormat configuration
#
# These properties defines the connection properties which were used while write data to JanusGraph.
janusgraphmr.ioformat.conf.storage.backend=cql
# This specifies the hostname & port for Cassandra data store.
janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1
janusgraphmr.ioformat.conf.storage.port=9042
# This specifies the keyspace where data is stored.
janusgraphmr.ioformat.conf.storage.cql.keyspace=janusgraph
# This defines the indexing backend configuration used while writing data to JanusGraph.
janusgraphmr.ioformat.conf.index.search.backend=elasticsearch
janusgraphmr.ioformat.conf.index.search.hostname=127.0.0.1
# Use the appropriate properties for the backend when using a different storage backend (HBase) or indexing backend (Solr).

#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.widerows=true

#
# SparkGraphComputer Configuration
#
spark.master=local[8]
spark.executor.memory=12g
spark.driver.memory=12g
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator


spark.executor.memoryOverhead=1g
spark.driver.memoryOverhead=1g
spark.network.timeout=600s
spark.executor.heartbeatInterval=119s

spark.io.compression.codec=snappy

gremlin 控制台输出

rrmerugu@Code-WS:~/Downloads/janusgraph-full-0.5.3$ bin/gremlin.sh 

         \,/
         (o o)
-----oOOo-(3)-oOOo-----
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/rrmerugu/Downloads/janusgraph-full-0.5.3/lib/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
plugin activated: janusgraph.imports
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
07:57:25 WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin> graph = GraphFactory.open('conf/hadoop-graph/read-cql.properties')
==>hadoopgraph[cqlinputformat->nulloutputformat]
gremlin> g = graph.traversal().withComputer(SparkGraphComputer)
==>graphtraversalsource[hadoopgraph[cqlinputformat->nulloutputformat],sparkgraphcomputer]
gremlin> input = [2437272,4956336]
==>2437272
==>4956336
gremlin> g.V().has(id,within(input)).
......1>            aggregate('input').hasId(input.head()). 
......2>            repeat(__.in('has_word')).emit().as('x').
......3>            select('input').unfold().has(id,within(input.tail())).
......4>            repeat(__.in('has_word')).emit(where(eq('x'))).
......5>            group().
......6>              by(select('x')).
......7>              by(path().count(local).fold()).
......8>            unfold().filter(select(values).count(local).is(input.tail().size())).
......9>            order().
.....10>              by(select(values).unfold().sum()).
.....11>            select(keys).limit(5).elementMap()
07:58:05 WARN  org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer  - class org.apache.hadoop.mapreduce.lib.output.NullOutputFormat does not implement PersistResultGraphAware and thus,persistence options are unkNown -- assuming all options are possible
07:58:06 WARN  org.apache.spark.util.Utils  - Your hostname,Code-WS resolves to a loopback address: 127.0.1.1; using 192.168.0.10 instead (on interface enp7s0)
07:58:06 WARN  org.apache.spark.util.Utils  - Set SPARK_LOCAL_IP if you need to bind to another address
08:01:59 WARN  org.apache.spark.executor.Executor  - Managed memory leak detected; size = 40472352 bytes,TID = 1633
08:02:03 WARN  org.apache.spark.executor.Executor  - Managed memory leak detected; size = 41050016 bytes,TID = 1683
08:11:26 WARN  org.apache.spark.rpc.netty.NettyRpcEnv  - Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.0.10:43701 in 119 seconds
08:11:29 WARN  org.apache.spark.executor.Executor  - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [119 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:835)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:864)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [119 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
08:21:13 WARN  org.apache.spark.rpc.netty.NettyRpcEnv  - Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.0.10:43701 in 119 seconds
08:21:37 WARN  org.apache.spark.executor.Executor  - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [119 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:835)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:864)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:864)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [119 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
08:32:07 WARN  org.apache.spark.executor.Executor  - Issue communicating with driver in heartbeater
08:35:18 WARN  org.apache.spark.HeartbeatReceiver  - Removing executor driver with no recent heartbeats: 614252 ms exceeds timeout 600000 ms
Exception in thread "dispatcher-event-loop-7" java.lang.OutOfMemoryError: Java heap space
08:44:53 ERROR org.apache.spark.util.Utils  - Uncaught exception in thread driver-heartbeater
08:57:17 WARN  org.spark_project.jetty.io.ManagedSelector  - 
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.Utils  - uncaught error in thread Spark Context Cleaner,stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.Utils  - throw uncaught Fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.executor.Executor  - Exception in task 91.0 in stage 16.0 (TID 1890)
java.lang.OutOfMemoryError: Java heap space
08:58:56 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler  - Uncaught exception in thread Thread[Executor task launch worker for task 1890,5,main]
java.lang.OutOfMemoryError: Java heap space
08:58:56 WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 91.0 in stage 16.0 (TID 1890,localhost,executor driver): java.lang.OutOfMemoryError: Java heap space

08:58:56 ERROR org.apache.spark.scheduler.TaskSetManager  - Task 91 in stage 16.0 Failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 91 in stage 16.0 Failed 1 times,most recent failure: Lost task 91.0 in stage 16.0 (TID 1890,executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
Type ':help' or ':h' for help.

系统规格:我在配备 32GB RAM 和 1TB SSD 的 Ubuntu 6 核 i7 处理器上执行此操作。

我想找到的答案

  1. 我是否遗漏了什么,有什么关于为什么这个查询花费太长时间并出现错误提示?任何建议表示赞赏。
  2. 是否有更好的方法来分析此查询以及为什么它花费了太长时间;一旦发生内存泄漏错误,控制台和 Spark Web UI 就不会响应,而且我无法对此进行调试。

更新

用程序退出的OOM错误更新日志

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。