如何解决Spark Streaming连接Flume Sink连接在VPS上被拒绝
我在这里使用Spark Streaming(2.2.0)和Flume(1.6.0)的pull方法对字数进行实验。在本地VM上一切正常,因此我将所有内容在线迁移到Vultr上的VPS。然后使用第三方面板进行管理,并配置成功组。
这是我的工作:
- 启动Flume Server,该服务器在44444上绑定源,在41414上放置spark-sink。使用
netstat -anp|grep 44444
和41414进行的测试显示正确的用法。因此,水槽工作正常。 - Telnet localhost 44444,用于模拟输入。
- 使用IDEA测试我的程序,这里是出现问题的地方:
这是错误消息:
20/09/11 13:39:33 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.io.IOException: Error connecting to hadoop/96.30.196.34:41414
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:138)
at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83)
at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: hadoop/96.30.196.34:41414
at sun.nio.ch.socketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.socketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
... 3 more
使用VPS可能导致连接拒绝错误的原因是什么?
这是我的单词计数示例,在程序参数中将主机名和端口设置为hadoop 41414,并且在主机中将hadoop正确设置为VPS的公用IP。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds,StreamingContext}
object Count {
def main(args: Array[String]): Unit = {
if(args.length !=2){
System.err.println("Usage: FlumePullWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname,port) = args
val sparkConf = new SparkConf().setAppName("Count").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
// Get stream from 41414 based on the flume conf
val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。