如何解决使用合并函数将 RDD 保存为 csv 文件
我正在尝试在 Intellij 中使用 Apache Spark 流式传输 Twitter 数据,但是当我使用函数 coalesce
时,它说它无法解析符号合并。这是我的主要代码:
val spark = SparkSession.builder().appName("twitterStream").master("local[*]").getorCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
val streamContext = new StreamingContext(sc,Seconds(5))
val filters = Array("Singapore")
val filtered = TwitterUtils.createStream(streamContext,None,filters)
val englishTweets = filtered.filter(_.getLang() == "en")
//englishTweets.print()
englishTweets.foreachRDD{rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getorCreate()
import spark.implicits._
val tweets = rdd.map( field =>
(
field.getId,field.getUser.getScreenName,field.getCreatedAt.toInstant.toString,field.getText.toLowerCase.split(" ").filter(_.matches("^[a-zA-Z0-9 ]+$")).fold("")((a,b) => a + " " + b).trim,sentiment(field.getText)
)
)
val tweetsdf = tweets.toDF("userID","user","createdAt","text","sentimentType")
tweetsdf.printSchema()
tweetsdf.show(false)
}.coalesce(1).write.csv("hdfs://localhost:9000/usr/sparkApp/test/testing.csv")
解决方法
我已经尝试过使用我自己的数据集,并且我已经阅读了一个数据集,并且在编写时我已经应用了合并函数并且它给出了结果,请参考这个它可能对你有帮助。
import org.apache.spark.sql.SparkSession
import com.spark.Rdd.DriverProgram
import org.apache.log4j.{ Logger,Level }
import org.apache.spark.sql.SaveMode
import java.sql.Date
object JsonDataDF {
System.setProperty("hadoop.home.dir","C:\\hadoop");
System.setProperty("hadoop.home.dir","C:\\hadoop"); // This is the system property which is useful to find the winutils.exe
Logger.getLogger("org").setLevel(Level.WARN) // This will remove Logs
case class AOK(appDate:Date,arr:String,base:String,Comments:String)
val dp = new DriverProgram
val spark = dp.getSparkSession()
def main(args : Array[String]): Unit = {
import spark.implicits._
val jsonDf = spark.read.option("multiline","true").json("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JSONdata.txt").as[AOK]
jsonDf.coalesce(1) // Refer Here
.write
.mode(SaveMode.Overwrite)
.option("header","true")
.format("csv")
.save("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JsonToCsv")
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。