如何解决是否可以将 BigQuery 查询导出到 GCS 存储桶中的 5000 行 CSV 文件中?
我知道可以将查询导出到 GCS Bucket 中的 CSV 文件;但是,当导出到多个文件时,似乎没有办法限制每个文件的行数。我想知道是否有人想出了一种解决方法来允许此功能。我当前的用例是我需要将表(152 列)的查询导出到多个 CSV 文件中,并将它们保存到 GCS 存储桶中。每个文件不能超过 5000 条记录。我希望找到某种可以放入 BigQuery 的语句,以避免必须用 Python 编写解决方案(因为这是一个短期需求,在某个时间点之后就不需要了)。
offset = 0
total = SELECT COUNT(*) FROM WEB_SCRAPING.scrapy_products WHERE spider = "{0}"
while offset < total + 5000:
EXPORT DATA OPTIONS(
uri='gs://webscraping/{0}_{offset}-{offset + 5000}.csv', format='CSV', overwrite=true, header=true
) AS (
SELECT * EXCEPT (id)
FROM WEB_SCRAPING.scrapy_products
WHERE spider = "{0}"
LIMIT 5000 OFFSET {offset};
)
offset += 5000
我想我可以以某种方式将 EXPORT DATA
语句包装在一个循环的 CASE 语句中,增加偏移量直到导出所有结果。但是,据我所知,EXPORT DATA
语句需要一个星号,这可能会导致 5000 条记录块被拆分为更小的文件。
从 BigQuery sql 的角度,我对如何使这项工作有任何想法吗?甚至有可能吗?
更新:这是我目前掌握的 BigQuery 语句
DECLARE spider STRING DEFAULT 'albertsons.albertsons';
DECLARE increment INT64 DEFAULT 5000;
DECLARE tt_name STRING;
DECLARE `offset` INT64;
DECLARE total INT64;
SET `offset` = 0;
SET total = (SELECT COUNT(*) as total_products FROM `WEB_SCRAPING.SCRAPY_PRODUCTS` WHERE spider = FORMAT('"%s"',spider));
WHILE offset < (total + increment) DO
SET tt_name = FORMAT("`WEB_SCRAPING.%s_%dto%d`",@spider,@offset,@`offset` + @increment);
CREATE TEMP TABLE tt_name AS (
SELECT * EXCEPT (id)
FROM `WEB_SCRAPING.SCRAPY_PRODUCTS`
WHERE spider = FORMAT('"%s"',@spider)
LIMIT @increment OFFSET @`offset`
);
-- Now,to just handle the export portion
SET `offset` = @`offset` + @increment;
END WHILE;
解决方法
您不能仅使用 SQL 代码执行此操作。您需要像以前一样创建临时表,然后迭代它们(使用 python 或其他语言)以将表提取到带有 extract API
的文件中 ,正如之前评论的那样,您不能仅使用 SQL 代码导出 BQ 数据。在此 post 中,社区提到了一些有关如何从 BQ 导出数据的变通方法。
我建议您使用属于 extract_table
的 BQ Extract API 函数。此 API 能够使用单个通配符 URI(用于将大于 1 GB 最大值的导出数据)将数据导出到 multiple files。有了这个,BQ 根据提供的模式将您的数据分片到多个文件中。导出文件的大小会有所不同。
我使用公共数据集创建了一个测试,该数据集的表大小为 4.58 GB,它有 33,319,019 行和 15 列
from google.cloud import bigquery
client = bigquery.Client()
bucket_name = "[BUCKET_NAME]"
project = "bigquery-public-data"
dataset_id = "new_york"
table_id = "citibike_trips"
def exporting_data():
destination_uri = "gs://{}/{}".format(bucket_name,"citibike_*.csv")
dataset_ref = bigquery.DatasetReference(project,dataset_id)
table_ref = dataset_ref.table(table_id)
extract_job = client.extract_table(
table_ref,destination_uri,# Location must match that of the source table.
location="US",) # API request
extract_job.result() # Waits for job to complete.
print(
"Exported {}:{}.{} to {}".format(project,dataset_id,table_id,destination_uri)
)
if __name__ == "__main__":
exporting_data()
运行此代码后,将 49 个文件导出到存储桶 [BUCKET_NAME]
。每个文件的大小约为 98 MB,大约有 600 000 行。请考虑创建时态表以导出所需数据。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。