微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

流管道上的 Node.Js 异步迭代器

如何解决流管道上的 Node.Js 异步迭代器

我有以下管道:

readFile > parseCSV > otherProcess

readFile 是标准的 Node.Js createReadStream,而 parseCSV 是 Node.js 转换流(模块 link)。

我想逐行遍历一个 csv 文件并在当时处理一行。因此,流和异步迭代器是绝配。

我有以下运行正常的代码

async function* readByLine(path,opt) {
  const readFileStream = fs.createReadStream(path);
  const csvParser = parse(opt);
  const parser = readFileStream.pipe(csvParser);
  for await (const record of parser) {
    yield record;
  }
}

我对 Node.Js 流很陌生,但我从许多来源了解到,模块 stream.pipeline 比读取流的 .pipe 方法更受欢迎。

如何更改上面的代码以使用 stream.pipeline(实际上是从 util.promisify(pipeline) 获得的 promise 版本)并同时生成一行?

解决方法

您实际上应该能够将 fs-stream 和解析器流都传递给 pipeline() 并在解析器流上使用您的异步迭代器:

const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream')
const util = require('util');
const pipeline = util.promisify(stream.pipeline);

async function* readByLine(path,opt) {
    const readFileStream = fs.createReadStream(path);
    const csvParser = parse(opt);
    await pipeline(readFileStream,csvParser);
    for await (const record of csvParser) {
        yield record;
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。