微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

窗口重载方法无法在Spark结构化流标量中解析

如何解决窗口重载方法无法在Spark结构化流标量中解析

以下代码在Spark Scala结构化流中引发了过载错误

错误

Cannot resolve overloaded method window

Code
package Stream
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.streaming.Trigger
import org.apache.log4j.{Level,Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.sql.functions.window




object SparkRestApi {
  def main(args: Array[String]): Unit = {

    val logger = Logger.getLogger("Datapipeline")
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName("StreamTest")
      .config("spark.driver.memory","2g")
      .master("local[*]")
      //.enableHiveSupport()
      .getorCreate()

    import spark.implicits._

    val userSchema = new StructType()
      .add("id","string")
      .add("Faulttime","timestamp")
      .add("name","string")
      .add("Parentgroup","string")
      .add("childgroup","string")
      .add("MountStyle","string")


val JSONDF = spark
      .readStream
      .option("header",true)
      .option("sep",",")
      .schema(userSchema)      // Specify schema of the csv files
      .json("D:/TEST")
     

val windowColumn = window($"timestamp","10 minutes","5 minutes")

    val df2 = JSONDF.withWatermark("timestamp","1 minutes")
    .groupBy("Parentgroup","childgroup","MountStyle",window("timestamp","5 minutes","1 minutes"))
      .agg(countdistinct("id"))

 df2.
      writeStream
      .outputMode("Append")
      .format("csv")
      .option("checkpointLocation","D:/TEST/chkdir")
      .option("path","D:/TEST/OutDir")
      .option("truncate",false)
      .start()
      .awaitTermination()

    spark.stop()


  }

}

非常赞赏所有有价值的建议。 即使添加了所有库,也会引发错误。 ................................................... ................................................... ................................................... ................................................... ................................................... ...........

解决方法

以手册为例:

val windowedCounts = words
    .withWatermark("timestamp","10 minutes")
    .groupBy(
        window($"timestamp","10 minutes","5 minutes"),$"word")
    .count()

尝试将您的window子句放在前面,我可能会猜测。并在字段名称示例中使用$。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。