如何解决如何并行化Amundsen中的元摄取工作?
我正在尝试在使用 Amundsen 的项目中并行处理元数据提取工作,但是我遇到了问题。以下是相同的代码段。我正在雪花中执行帐户级别的并行化,然后将从雪花中获取的元数据提取到 Neo4J 。
def process_all_snowflake_accounts():
"""Function that loops through all the SF accounts"""
snowflake_config = read_snowflake_configuration()
start_time = time.time()
processes = []
for ac_key,ac_config in snowflake_config.items():
process = multiprocessing.Process(target=multiprocessing_snowflake_accounts,args=(ac_key,ac_config))
processes.append(process)
process.start()
for process in processes:
process.join()
print("CPU Unit: ",multiprocessing.cpu_count)
print('****************************************************************')
print('Total time taken: ',time.time() - start_time)
print('****************************************************************')
上述代码很少会跳过一些帐户数据库,并且不会显示任何错误,但是大多数情况下,它会显示以下提到的错误:
"Scanning Snowflake ..."
"Process account: Account-1"
"Process account: Account-2"
"Process account: Account-3"
"Launching job for Account-1-DB-1"
"Launching job for Account-2-DB-1"
"Launching job for Account-3-DB-1"
"Launching job for Account-2-DB-2"
"Launching job for Account-1-DB-2"
ERROR:databuilder.publisher.neo4j_csv_publisher:Failed to publish. Rolling back.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/amundsen_databuilder-2.6.4-py3.7.egg/databuilder/publisher/neo4j_csv_publisher.py",line 202,in publish_impl
tx = self._publish_node(node_file,tx=tx)
File "/usr/local/lib/python3.7/site-packages/amundsen_databuilder-2.6.4-py3.7.egg/databuilder/publisher/neo4j_csv_publisher.py",line 266,in _publish_node
with open(node_file,'r',encoding='utf8') as node_csv:
FileNotFoundError: [Errno 2] No such file or directory: '/var/tmp/amundsen/tables/nodes/Description_4.csv'
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。