微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark Text文件流分析

如何解决Spark Text文件流分析

我是火花流的新手。我想分析从不同应用程序主机复制到 HDFS 公共目标位置的文本文件。我得到了空白数据框 :( 没有获取记录。 XML 记录获取逻辑是正确的,我已经在控制台 RDD[String] 上测试过,但看起来像 DStream[String] 的一些问题。请问有人可以帮忙吗?


    package com.sparkstreaming.loganalysis
    import org.apache.spark._
    import org.apache.spark.storage._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.receiver._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import scala.xml.XML
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.sqlContext
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.sqlContext
    class addStreaming(sc: SparkContext,sqlContext: sqlContext,cpDir: String) {
      
      def creatingFunc(): StreamingContext = {
        val batchInterval = Seconds(4)
        val ssc = new StreamingContext(sc,batchInterval)
        // Set the active sqlContext so that we can access it statically within the foreachRDD
        sqlContext.setActive(sqlContext)
        ssc.checkpoint(cpDir)
        val streamRDD = ssc.textFileStream("/home/dataflair/SparkProjects/SparkStreamingProject/src/main/resources/")
            
       
        val resultRDD = streamRDD
        .map(scala.xml.XML.loadString _).map(x => {
          val dt = (x \ "RECORD" \ "DATE").text
          val host = (x \ "RECORD" \ "HOST").text
          val ip = (x \ "RECORD" \ "IP").text
          (dt,host,ip)
        })
        
     
        resultRDD.foreachRDD(row => {
            val spark = SparkSession.builder.config(row.sparkContext.getConf).getorCreate()
            import spark.implicits._
            
            val streamDF = row.toDF("DATE","HOST","IP")
            
            streamDF.show(false)
            
        }) 
        
         ssc
      }
    }
    object textStreamAnalysis {
      
      def main(args: Array[String]) {
      
       val cpDir = "/tmp/checkpoint"
       val conf = new SparkConf().setMaster("local[*]").setAppName("textSTream")
       val sc = new SparkContext(conf)
       val sqlContext = new sqlContext(sc)
       val addStr = new addStreaming(sc,sqlContext,cpDir)   
       val xsc = StreamingContext.getActiveOrCreate(cpDir,addStr.creatingFunc _)
       xsc.start()
       xsc.awaitTermination()
      }
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。