微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 google firestore 中使用 BulkWriter 批量处理

如何解决在 google firestore 中使用 BulkWriter 批量处理

有谁知道为什么这不起作用,我在这里做错了什么。在console.log“读后流”之后卡住了

我正在尝试读取一堆文件,将其转换为 json 并使用bulkwriter 上传到 firestore。 在每 400 个文档之后,我调用 close 将它们写入 firestore,然后我正在创建一个新的批量写入器

我也尝试过等待 bulkWriter.create(eventDoc,{}) 但它不起作用。它也卡住了,没有错误。为什么是这样 ? create 方法返回一个承诺。 为什么不能等待? https://googleapis.dev/nodejs/firestore/latest/BulkWriter.html#create

这个想法是一次处理1个文件,它可以包含需要上传到firestore的数万行

我在 for...of 循环中调用方法并等待 processBatch 方法

非常感谢任何帮助

async processBatch(document: string,file: string): Promise<void> {
    const db = admin.firestore();
    console.log('start: ',document);
    let bulkWriter;
    const writeBatchLimit = 400;
    let documentsInBatch = 0;
    let totalInDocument = 0;
    const eventsCollectionRef = db.collection('events');
    const eventDoc = eventsCollectionRef.doc(document);
    return new Promise((resolve,reject) => {
        console.log('promise');
        bulkWriter = db.bulkWriter();
        const csvStream = fs.createReadStream(file);
        console.log('after read stream');
        bulkWriter.create(eventDoc,{})
            .then(result => {
                console.log('Successfully: ',result);
                csvStream.pipe(csvParser())
                    .on('data',row => {
                        console.log('row');
                        bulkWriter.create(eventDoc.collection('event').doc(),row);
                        documentsInBatch++;
                        if (documentsInBatch > writeBatchLimit) {
                            bulkWriter.close();
                            totalInDocument = + documentsInBatch;
                            documentsInBatch = 0;
                            bulkWriter = db.bulkWriter();
                        }
                    })
                    .on('end',() => {
                        console.log('file: ',file + ',totalInDocument: ',totalInDocument);
                        resolve();
                    });
            })
            .catch(err => {
                console.log('Failed: ',err);
                reject();
            });
    });
}

解决方法

这似乎有效:

async processBatch(document: string,file: string): Promise<void> {
        const db = admin.firestore();
        console.log(`start: ${document}`);
        let bulkWriter;
        const writeBatchLimit = 400;
        let documentsInBatch = 0;
        let numOfBatches = 0;
        let totalInDocument = 0;
        const eventsCollectionRef = db.collection('events');
        const eventDoc = eventsCollectionRef.doc(document);
        bulkWriter = db.bulkWriter();
        const csvStream = fs.createReadStream(file);
        bulkWriter.create(eventDoc,{});
        csvStream.pipe(csvParser())
            .on('data',row => {
                bulkWriter.create(eventDoc.collection('event').doc(),row);
                documentsInBatch++;
                if (documentsInBatch > writeBatchLimit) {
                    numOfBatches++;
                    totalInDocument += documentsInBatch;
                    documentsInBatch = 0;
                    bulkWriter.close();
                    console.log(`Committing batch ${numOfBatches},cumulative: ${totalInDocument}`);
                    bulkWriter = db.bulkWriter();
                }
            })
            .on('end',() => {
                console.log(`file: ${file},totalInDocument: ${totalInDocument}`);
            });
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?