我正在写一个ETL过程,我将需要读取小时日志文件,分割数据并保存.我正在使用Spark(在Databricks中).
日志文件是CSV,因此我读取它们并应用模式,然后执行我的转换.
日志文件是CSV,因此我读取它们并应用模式,然后执行我的转换.
我的问题是,如何保存每小时的数据作为镶木地板格式,但是附加到现有的数据集?保存时,我需要按数据帧中存在的4列进行分区.
这是我的保存线
data .filter(validPartnerIds($"partnerID")) .write .partitionBy("partnerID","year","month","day") .parquet(saveDestination)
问题是如果目标文件夹存在,则保存会抛出错误.
如果目的地不存在,那么我不会附加我的文件.
我尝试使用.mode(“append”),但是我发现Spark有时会在中途失败,所以我最终失去了我的数据写入量以及我还需要写多少.
我正在使用镶木地板,因为分区大大增加了我在未来的查询.同样,我必须将数据写入磁盘上的一些文件格式,并且不能使用诸如Druid或Cassandra之类的数据库.
任何关于如何分割数据框和保存文件的建议(坚持使用镶木地板或其他格式)都是非常感激的.
解决方法
如果你需要附加文件,你必须使用追加模式.我不知道你希望生成多少个分区,但是我发现如果你有很多分区,partitionBy会导致一些问题(内存和IO问题一样).
如果您认为您的问题是由于写入操作太长时间造成的,我建议您尝试以下两件事情:
1)通过添加配置来使用snappy:
conf.set("spark.sql.parquet.compression.codec","snappy")
2)禁止在SparkContext上的hadoopConfiguration中生成元数据文件,如下所示:
sc.hadoopConfiguration.set("parquet.enable.summary-Metadata","false")
元数据文件会产生一些耗时(见this blog post),但根据this,它们并不重要.就个人而言,我总是禁用它们,没有问题.
如果生成多个分区(> 500),我恐怕我能做的最好的是建议您查看一个不使用追加模式的解决方案 – 我根本没有设法使partitionBy与多个分区一起工作.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。