微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从源容器复制blob并上传到目标容器异步

如何解决从源容器复制blob并上传到目标容器异步

我正在尝试使用azure.storage.blob.aio一个容器中的Blob上传到另一个容器,但是如果该功能按预期运行,我会收到警告消息:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f6a2ebf6e50>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f6a2e3553a0>,32966.677798886)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f6a2ebf6eb0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f6a2ec03400>
Unclosed connector
...

我的代码有什么问题,以及如何解决以上消息?

async def upload_blob(blob_args):
    # srource blob
    src_container_client = ContainerClient.from_connection_string(blob_args["src_connect_str"],blob_args["src_container"])

    # destination blob
    target_container_client = ContainerClient.from_connection_string(blob_args["target_connect_str"],blob_args["target_container"])
  
    tasks = []
    print(f"\nReading blobs from the container:")
    async for source_blob in src_container_client.list_blobs():
        # push data into specified stream
        source_blob_client = src_container_client.get_blob_client(source_blob.name)
        stream_downloader = await source_blob_client.download_blob()
        stream = await stream_downloader.readall()
            
        # create the BlobClient from the ContainerClient to interact with a specific blob
        target_blob_client = target_container_client.get_blob_client(source_blob.name)

        print(f"\t- Transfering {source_blob.name}.")
        tasks.append(asyncio.create_task(target_blob_client.upload_blob(stream)))

    await asyncio.gather(*tasks)
    print("Finished")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(upload_blob(blob_args))

解决方法

关于警告,您不要使用ContainerClient运行异步上下文管理器。

例如

import io
from azure.storage.blob.aio import ContainerClient
import asyncio


async def copy_blobs():
    # srource blob
    source_container_client = ContainerClient.from_connection_string(
        conn_str,'upload')

    # destination blob
    target_container_client = ContainerClient.from_connection_string(
        conn_str,'copy')
    async with source_container_client,target_container_client:
        tasks = []
        print(f"\nReading blobs from the container:")
        async for source_blob in source_container_client.list_blobs():
            source_blob_client = src_container_client.get_blob_client(source_blob.name)
            stream_downloader = await source_blob_client.download_blob()
            stream = await stream_downloader.readall()
            
            tasks.append(asyncio.create_task(target_blob_client.upload_blob(stream)))
            
            print(f"\t- Transfering {source_blob.name}.")

        await asyncio.gather(*tasks)
        print("Finished")

if __name__ == "__main__":
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(copy_blobs())

enter image description here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。