微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将内存中的文件下载并上传到 Google 云端硬盘 修改点:修改后的脚本:结果:注意:参考:

如何解决将内存中的文件下载并上传到 Google 云端硬盘 修改点:修改后的脚本:结果:注意:参考:

目标

使用 Google Drive API 的可恢复 URL 下载文件并将其上传到 Google Drive 纯内存中。

挑战/问题

我想缓冲文件被下载到内存(不是文件系统),然后上传到 Google Drive。 Google Drive API 要求块的最小长度为 256 * 1024,(262144 bytes)

进程应该从要上传的缓冲区中传递一个块。如果块出错,则该缓冲区块最多重试 3 次。如果块成功,则应清除缓冲区中的该块,并且该过程应继续直到完成。

背景工作/研究 (参考如下

我研究和测试过的大多数文章、示例和软件包都对流、管道和分块提供了一些见解,但使用 filesystem 作为可读流的起点。

我尝试了不同的方法,使用 passthroughhighWaterMark 之类的流以及 requestgaxiosgot 等第三方库,它们具有内置流/管道支持,但在过程的上传端无济于事。

意思是,我不确定如何构建 pipingchunking 机制,无论是使用 buffer 还是 pipeline 以正确地流向上传过程直到完成,并以有效的方式处理进度和完成事件。

问题

  1. 使用下面的代码,我如何使用正确的 PUTContent-Length 标头将文件Content-Range 适当地缓冲到谷歌提供的 URL,同时有足够的缓冲空间处理 3 次重试?

  2. 在处理背压或缓冲方面,利用 .cork().uncork() 是管理缓冲流的有效方法吗?

  3. 有没有办法使用带有 TransformhighWaterMarkpipeline 流来有效地管理缓冲区? 例如...

pipeline(
  downloadStream,transformStream,uploadStream,(err) => {
    if (err) {
      reject(err)
    } else {
        resolve(true)
      }
    }
  )

下面是一个可视化模型和我想要完成的代码

视觉示例

[====================]
File Length (20 MB)

[==========          ]
Download (10 MB)
       
      [======      ]
      Buffer (e.g. 6 MB,size 12 MB)

      [===]
      Upload Chunk (3 MB) => Error? Retry from Buffer (max 3 times)
                          => Success? Empty Buffer => Continue =>
      [===]
      Upload next Chunk (3 MB)

代码

/* 
   Assume resumable_drive_url was already obtained from Google Api
   with the proper access token,which already contains the 
   Content-Type and Content-Length in the session. 
*/

transfer(download_url,resumable_drive_url,file_type,file_length) {

    return new Promise((resolve,reject) => {

        let timeout = setTimeout(() => {
            reject(new Error("Transfer timed out."))
        },80000)


       // Question #1: Should the passthrough stream 
       // and .on events be declared here?

       const passthrough = new stream.Passthrough({
            highWaterMark: 256 * 1024
       })

       passthrough.on("error",(error) => {
            console.error(`Upload Failed: ${error.message}`)
            reject(error.message)
       })

       passthrough.on("end",() => {
            clearTimeout(timeout)
            resolve(true)
       })

        
        // Download file
        axios({
            method: 'get',url: download_url,responseType: 'stream',maxRedirects: 1
        }).then(result => {
            
            // QUESTION #2: How do we buffer the file from here 
            // via axios.put to the resumable_url with the correct 
            // header information Content-Range and Content-Length?

            // CURIOSITY #1: Do we pipe from here 
            // to a passthrough stream that maintains a minimum buffer size?

            result.data.pipe(passthrough)
        }
        ).catch(error => {
            reject(error)
        })


    })
}

参考资料

  1. Chunked Upload Class - (体面的分块机制但臃肿;似乎有一种更有效的流管道方法
  2. Google Drive API v3 - Upload via Resumable URL with Multiple Requests
  3. resumableUpload.js - (概念上正确,但使用文件系统)
  4. Google-Drive-Uploader - (概念上正确,但使用文件系统和自定义 StreamFactory)
  5. Resumable upload in Drive Rest API V3 - (体面但看起来臃肿和过时)

解决方法

我相信你的目标和现状如下。

  • 您想使用 Axios 和 Node.js 下载数据并将下载的数据上传到 Google Drive。
  • 为了上传数据,您希望通过从流中检索数据使用可恢复上传的多个块进行上传。
  • 您的访问令牌可用于将数据上传到 Google 云端硬盘。
  • 您已经知道要上传的数据的数据大小和 mimeType。

修改点:

  • 在这种情况下,为了实现多块的可续传,我想提出以下流程。

    1. 从 URL 下载数据。
    2. 为可恢复上传创建会话。
    3. 从流中检索下载的数据并将其转换为缓冲区。
      • 为此,我使用了 stream.Transform
      • 在这种情况下,我会停止流并将数据上传到 Google 云端硬盘。我想不出在不停止流的情况下可以实现的方法。
      • 我认为本节可能是您的问题 2 和问题 3 的答案。
    4. 当缓冲区大小与声明的块大小相同时,将缓冲区上传到 Google Drive。
      • 我认为本节可能是您的问题 3 的答案。
    5. 当上传发生错误时,再次上传相同的缓冲区。在此示例脚本中,运行了 3 次重试。 3 次重试后,发生错误。
      • 我认为本节可能是您的问题 1 的答案。

当上述流程反映到您的脚本中时,它变成如下所示。

修改后的脚本:

请在函数main()中设置变量。

const axios = require("axios");
const stream = require("stream");

function transfer(
  download_url,resumable_drive_url,file_type,file_length,accessToken,filename,chunkSize
) {
  return new Promise((resolve,reject) => {
    axios({
      method: "get",url: download_url,responseType: "stream",maxRedirects: 1,})
      .then((result) => {
        const streamTrans = new stream.Transform({
          transform: function (chunk,_,callback) {
            callback(null,chunk);
          },});

        // 1. Retrieve session for resumable upload.
        axios({
          method: "POST",url: resumable_drive_url,headers: {
            Authorization: `Bearer ${accessToken}`,"Content-Type": "application/json",},data: JSON.stringify({
            name: filename,mimeType: file_type,}),})
          .then(({ headers: { location } }) => {
            // 2. Upload the file.
            let startByte = 0;
            result.data.pipe(streamTrans);
            let bufs = [];
            streamTrans.on("data",async (chunk) => {
              bufs.push(chunk);
              const temp = Buffer.concat(bufs);
              if (temp.length >= chunkSize) {
                const dataChunk = temp.slice(0,chunkSize);
                const left = temp.slice(chunkSize);
                streamTrans.pause();
                let upcount = 0;
                const upload = function () {
                  console.log(
                    `Progress: from ${startByte} to ${
                      startByte + dataChunk.length - 1
                    } for ${file_length}`
                  );
                  axios({
                    method: "PUT",url: location,headers: {
                      "Content-Range": `bytes ${startByte}-${
                        startByte + dataChunk.length - 1
                      }/${file_length}`,data: dataChunk,})
                    .then(({ data }) => resolve(data))
                    .catch((err) => {
                      if (err.response.status == 308) {
                        startByte += dataChunk.length;
                        streamTrans.resume();
                        return;
                      }
                      if (upcount == 3) {
                        reject(err);
                      }
                      upcount++;
                      console.log("Retry");
                      upload();
                      return;
                    });
                };
                upload();
                bufs = [left];
              }
            });
            streamTrans.on("end",() => {
              const dataChunk = Buffer.concat(bufs);
              if (dataChunk.length > 0) {
                // 3. Upload last chunk.
                let upcount = 0;
                const upload = function () {
                  console.log(
                    `Progress(last): from ${startByte} to ${
                      startByte + dataChunk.length - 1
                    } for ${file_length}`
                  );
                  axios({
                    method: "PUT",})
                    .then(({ data }) => resolve(data))
                    .catch((err) => {
                      if (upcount == 3) {
                        reject(err);
                      }
                      upcount++;
                      upload();
                      return;
                    });
                };
                upload();
              }
            });
            streamTrans.on("error",(err) => reject(err));
          })
          .catch((err) => reject(err));
      })
      .catch((error) => {
        reject(error);
      });
  });
}

function main() {
  const download_url = "###";
  const resumable_drive_url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable";
  const file_type = "###"; // Please set the mimeType of the downloaded data.
  const file_length = 12345; // Please set the data size of the downloaded data.
  const accessToken = "###"; // Please set the access token.
  const filename = "sample filename"; // Please set the filename on Google Drive.
  const chunkSize = 10485760; // This is used as the chunk size for the resumable upload. This is 10 MB as a sample. In this case,please set the multiples of 256 KB (256 x 1024 bytes).

  transfer(
    download_url,chunkSize
  )
    .then((res) => console.log(res))
    .catch((err) => console.log(err));
}

main();

结果:

当文件大小为23558108(样本数据)运行上述脚本时,控制台得到如下结果..

Progress: from 0 to 10485759 for 23558108
Progress: from 10485760 to 20971519 for 23558108
Progress(last): from 20971520 to 23558107 for 23558108
{
  kind: 'drive#file',id: '###',name: 'sample filename',mimeType: '###'
}

注意:

  • 如果您想使用单个块实现可续传上传,您可以在 here 处查看示例脚本。

参考:

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?