微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

过滤后如何将数据帧写入S3

如何解决过滤后如何将数据帧写入S3

我正在尝试使用下面的Scala代码在脚本编辑中以CVS格式过滤到S3后写入数据帧。

当前状态:

  • 运行后不显示任何错误,只是不写入S3。

  • 日志屏幕显示“开始”,但是看不到“打印结束”。

  • 没有表明该问题的特定错误消息。

  • 以温度计数停止。

环境条件:我对所有S3都具有管理员权限。

import com.amazonaws.services.glue.glueContext
import <others>

object glueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: glueContext = new glueContext(spark)
    // @params: [JOB_NAME]
    val args = glueArgParser.getResolvedOptions(sysArgs,Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"),glueContext,args.asJava)
    
    val datasource0 = glueContext.getCatalogSource(database = "db",tableName = "table",redshiftTmpDir = "",transformationContext = "datasource0").getDynamicFrame()
    val appymapping1 = datasource0.appyMapping(mapping=........)

    val temp=appymapping1.toDF.filter(some filtering rules)
    print("start")
    if (temp.count() <= 0) {
    temp.write.format("csv").option("sep",",").save("s3://directory/error.csv")
  }
    print("End")
     

解决方法

您正在使用if条件将Dataframe写入S3(If条件是检查数据帧是否具有一行或多行),但是If条件是反转的。仅当数据帧具有0(或更小)行时才是正确的。所以改变它。

高级:Spark始终将文件保存为“ part-”名称。因此将S3路径更改为 s3:// directory / 。并添加 .mode(“ overwrite”)

因此您的写df查询应该是

temp.write.format(“ csv”)。option(“ sep”,“,”).mode(“ overwrite”).save(“ s3:// directory”)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。