微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

气流GoogleCloudStorageToGoogleCloudStorageOperator错误

如何解决气流GoogleCloudStorageToGoogleCloudStorageOperator错误

我在DAG中遇到了一个奇怪的问题已经2周了。我的用例如下:一位同事手动将文件上传到GCS存储桶中。这会触发云功能,进而从API启动Airflow DAG。 DAG的首要任务是将文件从“着陆区”传输到“保存区”,然后其余的DAG继续。

我使用 GoogleCloudStoragetoGoogleCloudStorageOperator 文件从存储桶A移至存储桶B。 一切正常,直到2或3周前。最老的DAG已有6个月的历史,即使我们更改了某些内容,它也位于DAG的另一部分。因此,我们从未接触过这一部分,而且很长一段时间都没有遇到任何问题。

现在,大多数情况下,第一个任务(传输)失败。该文件移动良好,但是由于未知原因,如果我重试,并且连续两次尝试此错误,并且再次尝试使用完全相同的文件...,它可以正常工作。我找不到导致我出现此问题的因素。我要疯了。

--------------------------------------------------------------------------------
[2020-10-15 09:34:50,599] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-10-15 09:34:50,599] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-10-15 09:34:50,620] {taskinstance.py:887} INFO - Executing <Task(GoogleCloudStoragetoGoogleCloudStorageOperator): transfer-landing-to-safe> on 2020-10-15T07:34:40+00:00
[2020-10-15 09:34:50,626] {standard_task_runner.py:53} INFO - Started process 31555 to run task
[2020-10-15 09:34:50,775] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: geometrie-preprocessing.transfer-landing-to-safe 2020-10-15T07:34:40+00:00 [running]> blablabla.internal
[2020-10-15 09:34:50,860] {gcs_to_gcs.py:193} INFO - Executing copy of gs://blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv to gs://blablabla-safe/geometrie/original/track_geometry_20201005_032915.csv
[2020-10-15 09:34:50,861] {logging_mixin.py:112} INFO - [2020-10-15 09:34:50,860] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-10-15 09:34:50,980] {taskinstance.py:1128} ERROR - 404 POST https://storage.googleapis.com/storage/v1/b/blablabla-landing/o/geometrie%2FTrack_Geometry-20201005_032915.csv/rewriteto/b/blablabla-safe/o/geometrie%2Foriginal%2Ftrack_geometry_20201005_032915.csv: No such object: blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py",line 966,in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/operators/gcs_to_gcs.py",line 178,in execute
    destination_object=self.destination_object)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/operators/gcs_to_gcs.py",line 196,in _copy_single_object
    self.destination_bucket,destination_object)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/hooks/gcs_hook.py",line 135,in rewrite
    source=source_object
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/storage/blob.py",line 2098,in rewrite
    timeout=timeout,File "/usr/local/lib/python3.7/dist-packages/google/cloud/_http.py",line 423,in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST https://storage.googleapis.com/storage/v1/b/blablabla-landing/o/geometrie%2FTrack_Geometry-20201005_032915.csv/rewriteto/b/blablabla-safe/o/geometrie%2Foriginal%2Ftrack_geometry_20201005_032915.csv: No such object: blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv
[2020-10-15 09:34:50,984] {taskinstance.py:1185} INFO - Marking task as Failed.dag_id=geometrie-preprocessing,task_id=transfer-landing-to-safe,execution_date=20201015T073440,start_date=20201015T073450,end_date=20201015T073450
[2020-10-15 09:35:00,556] {logging_mixin.py:112} INFO - [2020-10-15 09:35:00,556] {local_task_job.py:103} INFO - Task exited with return code 1

运算符部分:

transfer_landing_to_safe = GoogleCloudStoragetoGoogleCloudStorageOperator(
        task_id=f"transfer-landing-to-safe{env_extension}",source_bucket=f"blablabla-landing{env_extension}",source_object="{{ dag_run.conf['file_name'] }}",destination_bucket=f"blablabla-safe{env_extension}",destination_object="geometrie/original/track_geometry_{{ dag_run.conf['file_name'][-19:] }}",move_object=True,google_cloud_storage_conn_id="gcp_conn"
    )

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。