如何解决使用 Spark Structured Streaming Java 附加文件
我正在尝试使用 spark 附加流文件夹中的所有文件内容,但每次触发微批处理时它都会创建很多部分文件。以下是我的代码。
SparkSession session = SparkSession.builder().appName("SparkJava").getorCreate();
JavaSparkContext sparkContext = new JavaSparkContext(session.sparkContext());
StructType personSchema = new StructType().add("firstName","string").add("lastName","string").add("age","long");
Dataset<Patient> personStream = session.readStream().schema(personSchema).json("file:///C:/jsons1")
.as(Encoders.bean(Patient.class));
//当数据从流中到达时,将执行这些步骤
personStream.createOrReplaceTempView("people");
String sql = "SELECT * FROM people";
Dataset<Row> ageAverage = session.sql(sql);
StreamingQuery query = ageAverage.coalesce(1).writeStream().outputMode(OutputMode.Append())
.option("path","file:///C:/output").format("json").trigger(Trigger.ProcessingTime("10 seconds")).option("checkpointLocation","file:///C:/output")
.partitionBy("age").start();
请提出一种将源文件夹中的所有文件内容合并到输出文件夹中的一个文件的方法
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。