微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark Streaming连接Flume Sink连接在VPS上被拒绝

如何解决Spark Streaming连接Flume Sink连接在VPS上被拒绝

在这里使用Spark Streaming(2.2.0)和Flume(1.6.0)的pull方法对字数进行实验。在本地VM上一切正常,因此我将所有内容在线迁移到Vultr上的VPS。然后使用第三方面板进行管理,并配置成功组。

这是我的工作:

  1. 启动Flume Server,该服务器在44444上绑定源,在41414上放置spark-sink。使用netstat -anp|grep 44444和41414进行的测试显示正确的用法。因此,水槽工作正常。
  2. Telnet localhost 44444,用于模拟输入。
  3. 使用IDEA测试我的程序,这里是出现问题的地方:

这是错误消息:

20/09/11 13:39:33 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.io.IOException: Error connecting to hadoop/96.30.196.34:41414
    at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:138)
    at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83)
    at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
    at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: hadoop/96.30.196.34:41414
    at sun.nio.ch.socketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.socketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    ... 3 more

使用VPS可能导致连接拒绝错误的原因是什么?

这是我的单词计数示例,在程序参数中将主机名和端口设置为hadoop 41414,并且在主机中将hadoop正确设置为VPS的公用IP。

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds,StreamingContext}
    
    object Count {
    
      def main(args: Array[String]): Unit = {
    
        if(args.length !=2){
          System.err.println("Usage: FlumePullWordCount <hostname> <port>")
          System.exit(1)
        }
    
        val Array(hostname,port) = args
        val sparkConf = new SparkConf().setAppName("Count").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf,Seconds(5))
    
        // Get stream from 41414 based on the flume conf
        val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
    
        flumeStream.map(x=> new String(x.event.getBody.array()).trim)
          .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
    
        ssc.start()
        ssc.awaitTermination()
      }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。