如何解决Dataflow 作业处于运行状态,但不处理来自 pubsub 订阅的任何元素
我正在尝试运行一个 Python Dataflow 作业,该作业将从 pubsub 订阅读取并将其写入 BQ。
Dataflow 作业被触发,然后进入运行状态而不从订阅中获取任何数据。任何想法/线索,可能出了什么问题?相同的代码在 python 2.7 上运行良好
Python 版本是 3.8 & apache-beam 是 2.25.0。 在 macOS 11.2 上通过 Pycharm 运行代码
代码(由于作业没有处理任何事情,我只是将打印元素保留在 pardo 函数中以查看它是否有效)
import argparse
import logging
import apache_beam as beam
from apache_beam import DoFn
from apache_beam.options.pipeline_options import PipelineOptions,StandardOptions
class DecodeMsgs(DoFn):
def process(self,element):
print(element)
return [element]
def run(subscription,pipeline_args=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
p | beam.io.ReadFromPubSub(subscription=subscription) | beam.ParDo(DecodeMsgs())
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
'--subscription',required=True
)
known_args,pipeline_args = parser.parse_known_args()
run(known_args.subscription,pipeline_args)
传递给作业的参数
--subscription
<<projects/PROJECT/subscriptions/SUBSCRIPTION>>
--runner
DataflowRunner
--project
PROJECT
--temp_location
gs://temp_location
--staging_location
gs://staging_location
--subnetwork
<<regions/REGION/subnetworks/SUBNETWORK>>
--service_account_email
<<SERVICE_ACCOUNT_EMAIL>>
这里是python依赖列表
apache-beam==2.25.0
apitools==0.1.4
avro-python3==1.9.2.1
bcrypt==3.2.0
bigquery==0.0.8
BigQuery-Python==1.15.0
cachetools==4.2.1
certifi==2020.12.5
cffi==1.14.4
chardet==4.0.0
crcmod==1.7
cryptography==3.3.1
dacktool==0.0.7
dbstream==0.0.30
dill==0.3.1.1
docopt==0.6.2
fastavro==1.3.0
fasteners==0.16
flatten-json==0.1.7
future==0.18.2
google-api-core==1.25.1
google-api-python-client==1.7.11
google-apitools==0.5.31
google-auth==1.24.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.2
google-cloud==0.34.0
google-cloud-bigquery==1.28.0
google-cloud-bigquery-storage==2.2.1
google-cloud-bigtable==1.6.1
google-cloud-build==2.0.0
google-cloud-core==1.5.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-language==1.3.0
google-cloud-pubsub==1.7.0
google-cloud-spanner==1.19.1
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-crc32c==1.1.2
google-resumable-media==1.2.0
googleapis-common-protos==1.52.0
googleauthentication==0.0.14
grpc-google-iam-v1==0.12.3
grpcio==1.35.0
grpcio-gcp==0.2.2
hdfs==2.5.8
httplib2==0.17.4
idna==2.10
json-flatten==0.1
libcst==0.3.16
mock==2.0.0
mypy-extensions==0.4.3
numpy==1.19.5
oauth2client==4.1.3
oauthlib==3.1.0
pandas==1.2.1
paramiko==2.7.2
pbr==5.5.1
proto-plus==1.13.0
protobuf==3.14.0
pyarrow==0.17.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydot==1.4.1
pymongo==3.11.2
PyNaCl==1.4.0
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.5
PyYAML==5.4.1
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7
six==1.15.0
sshtunnel==0.1.5
typing-extensions==3.7.4.3
typing-inspect==0.6.0
uritemplate==3.0.1
urllib3==1.26.3
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。