微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Google Data Fusion中的Spark工具重命名输出文件 编辑:

如何解决使用Google Data Fusion中的Spark工具重命名输出文件 编辑:

我在Google Data Fusion中有一个管道,该管道会在Google Cloud存储桶的目标目录中生成一个名为“ part-00000-XXXXXX”的CSV文件(以及名为“ _SUCCESS”的文件)。 “ part-00000”之后的其余文件名总是不同且随机的。

enter image description here

管道通过解析,处理并将输入文件连接在一起(全部来自某些Google Cloud Storage位置)来产生新的输出,然后将该新输出与较旧的现有输出文件连接起来,并吐出“ part-00000” ”文件,与名称为“ internal_dashboard.csv”的旧输出文件位于同一位置。

通过任何可行的方法,我需要以某种方式手动将“ part-00000”文件重命名为“ internal_dashboard.csv”并替换旧文件

以下是我在Spark Sink中编写的尝试(我从hereherehereherehere中获得了这些尝试。 )。想法是首先找到文件名中带有“ part-00000”的文件,然后重命名并覆盖旧文件。到目前为止,我所有的尝试都失败了:

  • 尝试1
import java.nio.file.{Files,Paths,StandardcopyOption}
import scala.util.matching.Regex

def recursiveListFiles(f: File,r: Regex): Array[File] = {
  val these = f.listFiles
  val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
  good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}


def moveRenameFile(source: String,destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),Paths.get(destination),StandardcopyOption.REPLACE_EXISTING
    )
    // Could return `path`
}


def sink(df: DataFrame,context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header","true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)
 
  val existingfilename = recursiveListFiles(new File(fullpath),"part-00000-.*")
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

  • 尝试2:

import java.io.File

def getlistofFiles(dir: File,extensions: List[String]): List[File] = {
    dir.listFiles.filter(_.isFile).toList.filter { file =>
        extensions.exists(file.getName.startsWith(_))
    }
}

def moveRenameFile(source: String,"true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)
 
  val suffixList = List("part-00000")
  val existingfilename = getlistofFiles(new File(fullpath),suffixList )
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

  • 尝试3:
def sink(df: DataFrame,context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"
  val pathandfile = fullpath + "/" + targefilename

  df.coalesce(1)
    .write.format("csv")
    .option("header","true")
    .mode("append") // "overwrite" "append"
    .save(pathandfile )

dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}

我需要Scala或其他方式的帮助,以将“ part-00000”文件重命名为“ internal_dashboard.csv”并覆盖旧版本。

供未使用Data Fusion的用户参考,我可以使用的工具是:

  • 火花塞:

    enter image description here

  • Scala Spark程序(可以在Sink之前或之后出现):

    enter image description here

Description
Executes user-provided Spark code in Scala.

Use Case
This plugin can be used when you want arbitrary Spark code.

Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside,with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method,with the method signature as def run(implicit sec: SparkExecutionContext): Unit 

  • PySpark程序(可以在接收器之前或之后出现):

    enter image description here

Description
Executes user-provided Spark code in Python.

Use Case
This plugin can be used when you want to run arbitrary Spark code.

编辑:

(2020年11月2日),我刚刚了解到,还有Google Cloud Functions可以用Python(或Java)编写,并且只要它所在的存储桶发生变化就可以触发。如果有人知道如何进行此类功能可以在触发时重命名并覆盖“ part-00000”文件,请告诉我。如果其他所有操作均失败,我将尝试一下。

解决方法

避免在AWS S3上重命名对象。没有这样的事情,它所做的只是“剪切并粘贴” =>非常昂贵的操作。

您可以尝试:

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet(outputBasePath)

如果您坚持使用“重命名”,请使用Hadoop库而不是Java:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val srcPath = new Path("source/...")
val destPath = new Path("dest/...")

srcPath.getFileSystem(new Configuration()).rename(srcPath,destPath)

注意:使用AWS S3时,两个路径必须位于同一存储桶中(它们具有不同的FileSystem对象,在使用重命名(...)时适用。)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?