如何解决Azure Python SDK 数据表
我需要帮助来完成此工作流程。
我有 2 个存储帐户,分别命名为 storage1
和 storage2
storage1
包含包含一些数据的表列表,我想遍历所有这些表,将它们的内容复制到 storage2
中。我尝试过 azcopy,但我没有运气,因为此功能仅在 azcopy v7.3 中可用,而且我找不到适用于 MacOs M1 的此版本。另一种选择是数据工厂,但它对于我想要实现的目标来说太复杂了。所以我决定使用 azure Python sdk。
作为图书馆,我使用 azure.data.tables import TableServiceClient
我写的代码是这样的:
from azure.data.tables import TableServiceClient
my_conn_str_out = 'storage1-Conn-Str'
table_service_client_out = TableServiceClient.from_connection_string(my_conn_str_out)
list_table = []
for table in table_service_client_out.list_tables():
list_table.append(table.table_name)
my_conn_str_in = 'Storage2-Conn-str'
table_service_client_in = TableServiceClient.from_connection_string(my_conn_str_in)
for new_tables in table_service_client_out.list_tables():
table_service_client_in.create_table_if_not_exists(new_tables.table_name)
print(f'tables created successfully {new_tables.table_name}')
这就是我构建代码的方式。
for table in table_service_client_out.list_tables():
list_table.append(table.table_name)
我遍历存储帐户中的所有表并将它们附加到列表中。
然后:
for new_tables in table_service_client_out.list_tables():
table_service_client_in.create_table_if_not_exists(new_tables.table_name)
print(f'tables created successfully {new_tables.table_name}')
我在 storage2
到目前为止一切正常。
我现在想要实现的是查询all
中storage1
中每个表中的数据并将其传递给storage2
中的相应表
根据微软文档,我可以使用这个来实现查询表:
query = table_service_client_out.query_tables(filter=table)
所以我像这样将它集成到我的循环中:
for table in table_service_client_out.list_tables():
query = table_service_client_out.query_tables(filter=table)
list_table.append(table.table_name)
print(query)
当我运行我的 python 代码时,我得到了查询的内存分配而不是表中的数据:
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8fbb0>
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8f7f0>
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8fd60>
我想知道是否有办法查询表中的所有数据并将它们传递给我的 storage2
解决方法
试试这个:
from azure.cosmosdb.table.tableservice import TableService,ListGenerator
table_service_out = TableService(account_name='',account_key='')
table_service_in = TableService(account_name='',account_key='')
#query 100 items per request,in case of consuming too much menory load all data in one time
query_size = 100
#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator,table_out:TableService,table_in:TableService,query_size:int):
for item in resp_data:
#remove etag and Timestamp appended by table service
del item.etag
del item.Timestamp
print("instet data:" + str(item) + "into table:"+ tb_name)
table_in.insert_entity(tb_name,item)
if resp_data.next_marker:
data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)
tbs_out = table_service_out.list_tables()
for tb in tbs_out:
#create table with same name in storage2
table_service_in.create_table(tb.name)
#first query
data = table_service_out.query_entities(tb.name,num_results=query_size)
queryAndSaveAllDataBySize(tb.name,table_service_out,table_service_in,query_size)
当然,这是针对你的需求的一个简单的demo。为了提高效率,你也可以通过分区键查询表数据并提交by batch
如果您还有其他问题,请告诉我。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。