如何解决Python“FileHandler”到 Azure BlobStorage - 没有这样的东西
我遇到过这样的情况,我在 blob 存储中有一个非常大的 SAS 文件 (300GB),需要在 Azure ML 服务工作区中进行处理。他们的主要任务是将其转换为一堆镶木地板文件。
当然,我可以将文件下载到 Azure ML 工作区的 FileShare 容器,然后使用 Pandas read_sas() 并定义适当的块大小来处理文件:
local_file_path = "./dld/mysas.sas7bat"
with open(local_file_path,"wb") as local_file:
downloader = blobclient.download_blob()
downloader.readinto(local_file)
reader = pd.read_sas(local_file_path,format='sas7bdat',chunksize=1000000)
count = 0
prefix="chunks/testchunk"
chunk: pd.DataFrame
for chunk in reader:
count += 1
name = prefix + str(count) + ".parquet"
chunk.to_parquet(name,engine = "pyarrow")
这当然有效。但是,我希望有一种更有效的方法。就像能够直接从 blobstorage 创建“文件流”一样。无需先将其下载到挂载的 Azure 文件共享中。但我什么也没找到。所以我为 blob 存储编写了自己的“流包装器”:
class BlobStorageFileHandler(RawIOBase):
def __init__(self,storageDownloader: StorageStreamDownloader):
self.downloader = storageDownloader
self.chunks = self.downloader.chunks()
self.current_pos = 0
self.current_chunk_len = 0
self.current_chunk = None
self._read_next_chunk()
def _read_next_chunk(self):
self.current_chunk: bytes = next(self.chunks,None)
if self.current_chunk is None:
self.current_chunk_len = 0
else:
self.current_chunk_len = len(self.current_chunk)
def seekable(self) -> bool:
return True
def seek(self,offset,whence=SEEK_SET):
print("seek: ",offset)
def readable(self) -> bool:
return True
def read(self,size=-1):
end_pos = self.current_pos + size
# if more bytes are requested that are left in the current read bytes-chunk
if end_pos > self.current_chunk_len:
# number of bytes that have to be read from the old chunk
rest_part = self.current_chunk_len - self.current_pos
# number of bytes that have to be read from the next chunk
new_part = end_pos - self.current_chunk_len
old_chunk_end: bytes = b''
if rest_part > 0:
old_chunk_end = self.current_chunk[self.current_pos: self.current_pos + rest_part]
self._read_next_chunk()
new_chunk_part: bytes
# if there was no further chunk left to be read
if self.current_chunk is None:
if rest_part > 0:
return old_chunk_end
return b''
if self.current_chunk_len > new_part:
new_chunk_part = self.current_chunk[0: new_part]
else:
new_chunk_part = self.current_chunk
self.current_pos = new_part
return old_chunk_end + new_chunk_part
else:
result = self.current_chunk[self.current_pos:self.current_pos + size]
self.current_pos += size
return result
有了这个类,就不必先将数据下载到挂载的 FileShare(或任何本地目录,如果在本地使用):
sas_file = BlobStorageFileHandler(blobclient.download_blob())
reader = pd.read_sas(sas_file,chunksize=1000000)
然而,我无法想象只有我一个人有这种需求,因此,我想知道是否有其他解决方案或现有的包装类,就像我上面展示的那样。
感谢任何输入。
谢谢 汉斯约格
解决方法
您也可以使用 download_blob
方法分块下载 blob。基本上,您需要指定 offset
(起始位置)和 length
(要读取的字节数)参数,并且该方法只会将这些字节返回给您。
在您当前的实现中,由于您没有指定这些参数,因此 SDK 正在下载整个 blob。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。