如何解决从云存储加载多个文件到不同表中的大查询
我是 GCP 的新手,我可以将 1 个文件从我的 VM 导入 GCS,然后将其传输到 bigquery。 如何将多个文件从 GCS 传输到 Bigquery。我知道通配符 URI 是它的解决方案,但在下面的代码中还需要哪些其他更改?
def hello_gcs(event,context):
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# Todo(developer): Set table_id to the ID of the table to create.
table_id = "test_project.test_dataset.test_Table"
job_config = bigquery.LoadJobConfig(
autodetect=True,skip_leading_rows=1,# The source format defaults to CSV,so the line below is optional.
source_format=bigquery.sourceFormat.CSV,)
uri = "gs://test_bucket/*.csv"
load_job = client.load_table_from_uri(
uri,table_id,job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print(f"Processing file: {file['name']}.")
因为可能有多个上传,所以我无法定义特定的表名或文件名?是否可以自动执行此任务?
每当 GCS 存储桶中有新文件时,此功能由 PubSub 触发。 谢谢
解决方法
如果我错了,请纠正我,我了解您的云功能是由 finalize
事件 (Google Cloud Storage Triggers) 触发的,当新文件(或对象)出现在存储桶中时。这意味着存储桶中的每个“新”对象都有一个事件。因此,至少为每个对象调用一次云函数。
上面的链接有一个来自 event
字典的数据示例。那里有大量信息,包括要加载的对象(文件)的详细信息。
例如,您可能希望进行一些配置,在文件名模式和目标 BigQuery 表之间进行映射以进行数据加载。使用该地图,您将能够决定应该使用哪个表进行加载。或者您可能有其他选择目标表的机制。
其他一些需要考虑的事情:
- 异常处理 - 如果出现异常,您将如何处理该文件 数据未加载(出于任何原因)?谁以及如何被告知? 要做什么(更正源数据或目标表 和)重复加载等
- 如果加载时间比云函数多,会发生什么 超时(目前最大 540 秒)?
- 如果有多个云函数会发生什么
来自一个
finalize
事件的调用,或来自不同事件但 来自语义相同的源文件(重复数据、重复、 等)
不要回答我,如果你还没有做过,就想想这种情况。
,要将多个文件从 GCS 传输到 Bigquery,您只需遍历所有文件即可。下面是带有注释的工作代码示例。
我相信 event
和 context
(函数参数)默认由 Google 云函数处理,因此无需修改该部分。或者您可以通过利用 event
而不是循环来简化代码。
def hello_gcs(event,context):
import re
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
bq_client = bigquery.Client()
bucket = storage.Client().bucket("bucket-name")
for blob in bucket.list_blobs(prefix="folder-name/"):
if ".csv" in blob.name: #Checking for csv blobs as list_blobs also returns folder_name
job_config = bigquery.LoadJobConfig(
autodetect=True,skip_leading_rows=1,source_format=bigquery.SourceFormat.CSV,)
csv_filename = re.findall(r".*/(.*).csv",blob.name) #Extracting file name for BQ's table id
bq_table_id = "project-name.dataset-name."+csv_filename[0] # Determining table name
try: #Check if the table already exists and skip uploading it.
bq_client.get_table(bq_table_id)
print("Table {} already exists. Not uploaded.".format(bq_table_id))
except NotFound: #If table is not found,upload it.
uri = "gs://bucket-name/"+blob.name
print(uri)
load_job = bq_client.load_table_from_uri(
uri,bq_table_id,job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = bq_client.get_table(bq_table_id) # Make an API request.
print("Table {} uploaded.".format(bq_table_id))
,
如果您的数据源是 GCS 而目的地是 BQ,您可以使用 BigQuery Data Transfer Service 在 BQ 中对您的数据进行 ETL。每个传输作业都是针对某个表的,您可以选择是否要使用流式模式附加或覆盖某个表中的数据。
您也可以安排此作业。每日、每周等。
,要在单个 Cloud Function 调用中将多个 GCS 文件加载到多个 BQ 表中,您需要列出这些文件,然后迭代它们,为每个文件创建一个加载作业,就像您为一个文件所做的那样。但是在单个函数调用中完成所有工作,有点违背了使用 Cloud Functions 的目的。
如果您的要求不强迫您这样做,您可以利用 Cloud Functions 的强大功能,并让每个文件在添加到存储桶后触发单个 CF,因为它是一个事件驱动的函数。请参考https://cloud.google.com/functions/docs/writing/background#cloud-storage-example。每次有指定活动时都会触发它,为此会有事件元数据。
因此,在您的应用程序中,我们无需获取 URI 中的整个存储桶内容,而是获取触发事件的文件的名称,然后仅将该文件加载到 bigquery 表中,如下面的代码示例所示。
>以下是解决代码中问题的方法。在您的代码中尝试以下更改。
-
您可以从云函数事件字典中提取事件的详细信息和触发事件的文件的详细信息。在您的情况下,我们可以获取文件名作为 event[‘name’] 并更新“uri”变量。
-
生成一个新的唯一的table_id(这里作为一个例子,table_id 与文件名相同)。您可以根据需要使用其他方案生成唯一的文件名。
参考下面的代码
def hello_gcs(event,context):
from google.cloud import bigquery
client = bigquery.Client() # Construct a BigQuery client object.
print(f"Processing file: {event['name']}.") #name of the file which triggers the function
if ".csv" in event['name']:
# bq job config
job_config = bigquery.LoadJobConfig(
autodetect=True,)
file_name = event['name'].split('.')
table_id = "<project_id>.<dataset_name>."+file_name[0] #[generating new id for each table]
uri = "gs://<bucket_name>/"+event['name']
load_job = client.load_table_from_uri(
uri,table_id,job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Table {} uploaded.".format(table_id))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。