微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从Cloud Function触发数据流作业时出现莳萝错误 问题我尝试过的

如何解决从Cloud Function触发数据流作业时出现莳萝错误 问题我尝试过的

问题

我正在编写一个GCP云函数,该函数将从pubsub消息中获取输入ID,进行处理,并将表输出到BigQuery。

代码如下:

from __future__ import absolute_import
import base64
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import os


def processing_data_function():
    # do stuff and return desired data 

def create_data_from_id():
    # take scrapinghub's job id and extract the data through api 

def run(event,context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    # Take pubsub message and also Scrapinghub job's input id 
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')  

    agrv = ['--project=project-name','--region=us-central1','--runner=DataflowRunner','--temp_location=gs://temp/location/','--staging_location=gs://staging/location/']
    p = beam.Pipeline(options=PipelineOptions(agrv))
    (p
        | 'Read from Scrapinghub' >> beam.Create(create_data_from_id(pubsub_message))
        | 'Trim b string' >> beam.FlatMap(processing_data_function)
        | 'Write Projects to BigQuery' >> beam.io.WritetoBigQuery(
                'table_name',schema=schema,# Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQuerydisposition.CREATE_IF_NEEDED,write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND)
    )
    p.run()


if __name__ == '__main__':
    run()

请注意,有2个函数create_data_from_idprocessing_data_function处理来自Scrapinghub(一个用于刮擦的刮擦站点)的数据,它们很长,因此我不想在这里包括它们。它们也与该错误无关,因为如果我从云外壳运行该代码并改为使用argparse.ArgumentParser()传递参数,则此代码将起作用。

关于我的错误,虽然部署代码没有问题,并且pubsub消息可以成功触发该功能,但数据流作业失败并报告了此错误

"Error message from worker: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",line 279,in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 275,in loads
    return load(file,ignore,**kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 270,in load
    return Unpickler(file,ignore=ignore,**kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 472,in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py",line 826,in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'main'

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",line 649,in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",line 179,in execute
    op.start()
  File "apache_beam/runners/worker/operations.py",line 662,in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py",line 664,line 665,line 284,in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py",line 290,line 611,in apache_beam.runners.worker.operations.DoOperation.setup
  File "apache_beam/runners/worker/operations.py",line 616,in apache_beam.runners.worker.operations.DoOperation.setup
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",line 283,in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'main'

我尝试过的

鉴于我可以从云外壳运行相同的管道,但是使用参数解析器而不是指定选项,我认为选项说明的方式是问题所在。因此,我尝试了不同的选项组合,无论有无--save_main_session--staging_location--requirement_file=requirements.txt--setup_file=setup.py ...他们都或多或少地报告了同样的问题,所有莳萝都不知道该选择哪个模块。在指定save_main_session的情况下,无法启动主会话。在指定了require_file和setup_file的情况下,甚至没有成功创建作业,因此我可以省去查找其错误的麻烦。我的主要问题是我不知道这个问题是从哪里来的,因为我以前从未使用过莳萝,为什么从shell和云函数运行管道有何不同?有人有线索吗?

谢谢

解决方法

您也可以尝试将最后一部分修改为,并测试以下各项是否有效:

if __name__ == "__main__":
    ...

此外,请确保您在正确的文件夹中执行脚本,这可能与文件的命名或目录中的位置有关。

请考虑以下来源,您可能会有所帮助:Source 1Source 2

我希望这些信息对您有帮助。

,

您可能正在使用gunicorn在Cloud Run上启动应用程序(作为标准做法),例如:

from django.http import JsonResponse data = {'machines': machines} return JsonResponse(data)

我也遇到了同样的问题,并且找到了一种解决方法,可以在不使用gunicorn的情况下启动该应用程序:

CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app

可能是因为gunicorn跳过了 main 上下文并直接启动main:app对象。我不知道如何使用Gunicorn进行修复。

===附加说明===

我找到了一种使用金枪鱼的方法。

  1. 将一个函数(启动管道)移动到另一个模块,例如CMD exec python3 main.py
df_pipeline/pipe.py
.
├── df_pipeline
│   ├── __init__.py
│   └── pipe.py
├── Dockerfile
├── main.py
├── requirements.txt
└── setup.py
  1. # in main.py import df_pipeline as pipe result = pipe.preprocess(....) 所在的目录中创建setup.py
main.py
  1. # setup.py import setuptools setuptools.setup( name='df_pipeline',install_requires=[],packages=setuptools.find_packages(include=['df_pipeline']),) 中将管道选项setup_file设置为./setup.py

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。