如何使用 Cloud Functions 读取存储在 Google Cloud Storage 中的 CSV 数据

如何解决如何使用 Cloud Functions 读取存储在 Google Cloud Storage 中的 CSV 数据

作为与庞大用户群沟通工作的一部分,我每天需要发送超过 75,000 封电子邮件。我正在联系的用户的电子邮件存储在一个 CSV 文件中。我一直在使用 Postman Runner 通过 SendGrid(电子邮件 API)发送这些请求,但是由于数量如此之大,我的计算机要么速度变慢,要么 Postman 在批处理完成之前完全崩溃。即使没有崩溃,通过 Runner 发送这么多 POST 请求也需要 3 个小时以上的时间。

我想将包含电子邮件的 CSV 文件上传到 Cloud Storage 存储分区,然后使用 Cloud Functions 访问该文件以针对每封电子邮件发送 POST 请求。这样,所有的处理都可以由 GCP 处理,而不是由我的个人机器处理。但是,我似乎无法让 Cloud Function 逐行读取 CSV 数据。我已尝试将 Cloud Storage NodeJS 客户端库中的 createReadStream() 与 csv-parser 一起使用,但无法使此解决方案起作用。以下是我尝试过的:

const sendGridMail = require('@sendgrid/mail');
const { Storage } = require('@google-cloud/storage');
const fs = require('fs');
const csv = require('csv-parser');

exports.sendMailFromCSV = (file,context) => {

    console.log(`  Event: ${context.eventId}`);
    console.log(`  Event Type: ${context.eventType}`);
    console.log(`  Bucket: ${file.bucket}`);
    console.log(`  File: ${file.name}`);
    console.log(`  Metageneration: ${file.metageneration}`);
    console.log(`  Created: ${file.timeCreated}`);
    console.log(`  Updated: ${file.updated}`);

    const storage = new Storage();
    const bucket = storage.bucket(file.bucket);
    const remoteFile = bucket.file(file.name);
    console.log(remoteFile);

    let emails = [];
       
    fs.createReadStream(remoteFile)
        .pipe(csv())
        .on('data',function (row) {
            console.log(`Email read: ${row.email}`);
            emails.push(row.email);
        //send email using the SendGrid helper library
        const msg = {
                to: [{
                    "email": row.email;
                }],from: "fakeemail@gmail.com",template_id: "fakeTemplate",};

            sendGridMail.send(msg).then(() =>
                context.status(200).send(file.body))
                .catch(function (err) {
                    console.log(err);
                    context.status(400).send(file.body);
                });
        })
        .on('end',function () {
            console.table(emails);
        });    
};

云函数当前由上传到云存储存储桶触发。

有没有办法在不将文件加载到内存中的情况下构建此问题的解决方案? Cloud Functions 是向下移动的正确路径,还是使用 App Engine 或其他一些工具会更好?愿意尝试将这个过程移到云端的任何 GCP 解决方案

解决方法

Cloud Function 的内存可以作为临时目录 /tmp 共享/使用。因此,您可以将云存储桶中的 csv 文件作为本地文件下载到该目录中,然后对其进行处理,就像从本地驱动器处理该文件一样。

同时,您可能想记住 2 个主要限制:

  1. 内存 - 高达 2Gb 的所有内容
  2. 超时 - 每次调用不超过 540 秒。

我个人会根据一些 GCP 资源的组合创建一个解决方案。

第一个云函数由 'finlize' 事件触发 - 当 csv 文件保存在存储桶中时。此云功能读取文件,并为每条记录撰写包含相关详细信息的 Pub/Sub 消息(足以发送电子邮件)。该消息发布到 Pub/Sub 主题中。

Pub/Sub 主题用于传输来自第一个云函数的所有消息以触发第二个云函数。

第二个云函数由 Pub/Sub 消息触发,其中包含处理和发送电子邮件所需的所有详细信息。由于源 csv 文件中可能有 75K 条记录(例如),您应该期望第二个云函数调用 75K 次。

这在高层次上可能就足够了。 Pub/Sub 范式保证至少发送一次(但可能不止一次),因此如果您需要每个地址不超过一封电子邮件,则可能需要一些额外的资源来实现幂等行为。

,

基本上,您必须将文件本地下载到 Cloud Function 机器中才能以这种方式读取。

现在有多种方法可以解决此问题。

最基本/最简单的方法是配置 Compute Engine 机器,如果是一次性事件,则从它运行此操作。

如果您需要更频繁地(即每天)执行此操作,您可以使用在线工具将您的 csv 文件转换为 json 并将其导入 Firestore,然后您可以更快地阅读来自 Firestore 的电子邮件。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res