如何解决使用Google Data Fusion中的Spark工具重命名输出文件 编辑:
我在Google Data Fusion中有一个管道,该管道会在Google Cloud存储桶的目标目录中生成一个名为“ part-00000-XXXXXX”的CSV文件(以及名为“ _SUCCESS”的文件)。 “ part-00000”之后的其余文件名总是不同且随机的。
管道通过解析,处理并将输入文件连接在一起(全部来自某些Google Cloud Storage位置)来产生新的输出,然后将该新输出与较旧的现有输出文件连接起来,并吐出“ part-00000” ”文件,与名称为“ internal_dashboard.csv”的旧输出文件位于同一位置。
通过任何可行的方法,我需要以某种方式手动将“ part-00000”文件重命名为“ internal_dashboard.csv”并替换旧文件。
以下是我在Spark Sink中编写的尝试(我从here,here,here,here和here中获得了这些尝试。 )。想法是首先找到文件名中带有“ part-00000”的文件,然后重命名并覆盖旧文件。到目前为止,我所有的尝试都失败了:
- 尝试1
import java.nio.file.{Files,Paths,StandardcopyOption}
import scala.util.matching.Regex
def recursiveListFiles(f: File,r: Regex): Array[File] = {
val these = f.listFiles
val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}
def moveRenameFile(source: String,destination: String): Unit = {
val path = Files.move(
Paths.get(source),Paths.get(destination),StandardcopyOption.REPLACE_EXISTING
)
// Could return `path`
}
def sink(df: DataFrame,context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
df.coalesce(1)
.write.format("csv")
.option("header","true")
.mode("append") // "overwrite" "append"
.save(fullpath)
val existingfilename = recursiveListFiles(new File(fullpath),"part-00000-.*")
moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}
- 尝试2:
import java.io.File
def getlistofFiles(dir: File,extensions: List[String]): List[File] = {
dir.listFiles.filter(_.isFile).toList.filter { file =>
extensions.exists(file.getName.startsWith(_))
}
}
def moveRenameFile(source: String,"true")
.mode("append") // "overwrite" "append"
.save(fullpath)
val suffixList = List("part-00000")
val existingfilename = getlistofFiles(new File(fullpath),suffixList )
moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}
- 尝试3:
def sink(df: DataFrame,context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
val pathandfile = fullpath + "/" + targefilename
df.coalesce(1)
.write.format("csv")
.option("header","true")
.mode("append") // "overwrite" "append"
.save(pathandfile )
dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}
我需要Scala或其他方式的帮助,以将“ part-00000”文件重命名为“ internal_dashboard.csv”并覆盖旧版本。
供未使用Data Fusion的用户参考,我可以使用的工具是:
Description
Executes user-provided Spark code in Scala.
Use Case
This plugin can be used when you want arbitrary Spark code.
Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside,with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method,with the method signature as def run(implicit sec: SparkExecutionContext): Unit
Description
Executes user-provided Spark code in Python.
Use Case
This plugin can be used when you want to run arbitrary Spark code.
编辑:
(2020年11月2日),我刚刚了解到,还有Google Cloud Functions可以用Python(或Java)编写,并且只要它所在的存储桶发生变化就可以触发。如果有人知道如何进行此类功能可以在触发时重命名并覆盖“ part-00000”文件,请告诉我。如果其他所有操作均失败,我将尝试一下。
解决方法
避免在AWS S3上重命名对象。没有这样的事情,它所做的只是“剪切并粘贴” =>非常昂贵的操作。
您可以尝试:
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet(outputBasePath)
如果您坚持使用“重命名”,请使用Hadoop库而不是Java:
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
val srcPath = new Path("source/...")
val destPath = new Path("dest/...")
srcPath.getFileSystem(new Configuration()).rename(srcPath,destPath)
注意:使用AWS S3时,两个路径必须位于同一存储桶中(它们具有不同的FileSystem对象,在使用重命名(...)时适用。)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。