微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

带有 PySpark 的 AWS Glue - DynamicFrame 导出到 S3 中途失败并出现 UnsupportedOperationException

如何解决带有 PySpark 的 AWS Glue - DynamicFrame 导出到 S3 中途失败并出现 UnsupportedOperationException

首先我应该说我一直在使用 AWS Glue Studio 来学习如何将 glue 与 PySpark 结合使用,并且到目前为止一切顺利。直到我遇到一个我无法理解的错误(更不用说解决了)。可以在底部找到数据示例。

背景

我所做的只是一个简单的数据转换。 Input S3 Bucket --> CustomTransform --> Output S3。但是程序在导出一些数据后一直崩溃。我后来也提到过,但我什至尝试删除 CustomTransformation,但 S3 数据导出仍然失败,即使只是从一个 Bucket 到另一个 Bucket。

错误

这是我得到的错误的 Python 部分(从 CloudWatch 复制):

2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/glueTest.py",line 69,in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0,connection_type = "s3",format = "glueparquet",connection_options = {
    "path": "s3://example-bucket-here/data/","compression": "snappy","partitionKeys": []
},transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py",line 640,in from_options
    format_options,transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py",line 242,in write_dynamic_frame_from_options
    format,format_options,line 265,in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py",line 35,in write
    return self.writeFrame(dynamic_frame_or_dfc,info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py",line 31,in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf,callsite(),info),dynamic_frame.glue_ctx,dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
    answer,self.gateway_client,self.target_id,self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 63,in deco
    return f(*a,**kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
    format(target_id,".",name),value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 Failed 4 times,most recent failure: Lost task 4.3 in stage 1.0 (TID 76,172.36.109.34,executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

真正的谜题

最让我困惑的是,这次崩溃发生在之后它已经将大部分数据导出到 S3。这会立即表明数据有问题,因为它会处理一些损坏(或格式错误)的数据,然后崩溃。

于是我查看了成功导出的数据和输入数据之间的差异,并找到了所有未导出的行。没有什么让我感到奇怪或导出失败的原因。

当我选择 S3 存储桶作为输入源时,了解 AWS glue 正在推断架构可能会有所帮助。

我的尝试

所以我尝试以 glue 支持的所有不同格式导出数据,但都没有奏效。我还尝试跳过所有数据转换,只使用 Input S3 Bucket 并直接导出到 Output S3 Bucket,但它仍然因相同的错误而崩溃(实际上这就是我上面包含的错误消息!)。

同样,这一切都表明数据有问题,但我已经查看了所有未通过该过程的数据(只有大约 180 条记录),它们看起来都与确实产生的数据一样它通过。

为了进行完整性检查,我在其他一些(非常相似的)数据上使用了 Input S3 --> Output S3 方法效果很好,基本上起到了复制粘贴的作用。

我也遇到了this article。但这并没有真正的帮助,当我尝试更改输出格式以获取更多信息时,我遇到了同样的错误 - 没有额外的信息。

有没有人能够帮助确定这里的问题?没有任何迹象表明这应该会崩溃。如果这对人们有帮助,我很乐意提供 Java 错误的其余部分。

数据示例

这是我的数据的样子:

Date        ticker_name  currency exchange_name instrument_type first_Trade_date Amount
1612229400  0382.HK      HKD      HKG           EQUITY          1563240600       0.049
1613140200  SO           USD      NYQ           EQUITY          378657000        0.64
1613053800  SIGI         USD      NMS           EQUITY          322151400        0.25
1614240000  SIGT.L       GBp      LSE           EQUITY          828601200        1.68
1612249200  SIH.BE       EUR      BER           EQUITY          1252044000       0.038

除Date(long)、first_Trade_date(long)和Amount(double)之外的所有字段都是字符串。

当我调用 .printSchema() 时,我得到以下信息:

root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_Trade_date: long
|-- Amount: double

解决方法

解决方案

因此,如果有人遇到此问题,可能会令人沮丧,因为此错误似乎无法提供有关实际问题的任何信息。我得到的唯一线索之一是this article。这表明我的架构有问题。

我不得不非常仔细地查看我的数据,并最终注意到只有当我将某些文件与其他文件结合运行时才会出现此错误。

事实证明,我的某些镶木地板文件的日期采用 int 格式,而在其他时候则采用 float 格式。该数据是在不同的函数中使用 .to_parquet() 从 Pandas DataFrame 创建的,所以我不确定为什么数据类型不一致。

最让我困惑的是,为什么当我尝试将日期类型强制转换为 int(如所见 here)时,我仍然遇到错误。

无论如何,我的解决方案是修复 Pandas 输出数据的方式,并确保它在 Glue 处理数据之前始终将日期输出为整数。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。