微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Azure Python SDK 数据表

如何解决Azure Python SDK 数据表

我需要帮助来完成此工作流程。 我有 2 个存储帐户,分别命名为 storage1storage2

storage1 包含包含一些数据的表列表,我想遍历所有这些表,将它们的内容复制到 storage2 中。我尝试过 azcopy,但我没有运气,因为此功能仅在 azcopy v7.3 中可用,而且我找不到适用于 MacOs M1 的此版本。另一种选择是数据工厂,但它对于我想要实现的目标来说太复杂了。所以我决定使用 azure Python sdk。

作为图书馆,我使用 azure.data.tables import TableServiceClient

我写的代码是这样的:

from azure.data.tables import TableServiceClient
my_conn_str_out = 'storage1-Conn-Str'

table_service_client_out = TableServiceClient.from_connection_string(my_conn_str_out)
list_table = []
for table in table_service_client_out.list_tables():
    list_table.append(table.table_name)

my_conn_str_in = 'Storage2-Conn-str'

table_service_client_in = TableServiceClient.from_connection_string(my_conn_str_in)
for new_tables in table_service_client_out.list_tables():
    table_service_client_in.create_table_if_not_exists(new_tables.table_name)
    print(f'tables created successfully {new_tables.table_name}')

这就是我构建代码的方式。

for table in table_service_client_out.list_tables():
    list_table.append(table.table_name)

我遍历存储帐户中的所有表并将它们附加到列表中。

然后:

for new_tables in table_service_client_out.list_tables():
    table_service_client_in.create_table_if_not_exists(new_tables.table_name)
    print(f'tables created successfully {new_tables.table_name}')

我在 storage2

中创建了相同的表

到目前为止一切正常。

我现在想要实现的是查询allstorage1中每个表中的数据并将其传递给storage2中的相应表

根据微软文档,我可以使用这个来实现查询表

query = table_service_client_out.query_tables(filter=table)

所以我像这样将它集成到我的循环中:

for table in table_service_client_out.list_tables():
    query = table_service_client_out.query_tables(filter=table)
    list_table.append(table.table_name)
    print(query)

当我运行我的 python 代码时,我得到了查询的内存分配而不是表中的数据:

<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8fbb0>
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8f7f0>
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8fd60>

我想知道是否有办法查询表中的所有数据并将它们传递给我的 storage2

解决方法

试试这个:

from azure.cosmosdb.table.tableservice import TableService,ListGenerator

table_service_out = TableService(account_name='',account_key='')
table_service_in = TableService(account_name='',account_key='')

#query 100 items per request,in case of consuming too much menory load all data in one time
query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:" + str(item) + "into table:"+ tb_name)
        table_in.insert_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()

for tb in tbs_out:
    #create table with same name in storage2
    table_service_in.create_table(tb.name)
    #first query 
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(tb.name,table_service_out,table_service_in,query_size)

当然,这是针对你的需求的一个简单的demo。为了提高效率,你也可以通过分区键查询表数据并提交by batch

如果您还有其他问题,请告诉我。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。