微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Mongoose 使用异步迭代器流式传输聚合查询

如何解决Mongoose 使用异步迭代器流式传输聚合查询

我想使用 Mongoose 流式传输聚合查询的结果,以允许客户端处理巨大的 JSON 响应(最终传输到 CSV 转换器)。

到目前为止我的代码

const pipeline = [
    {
      $unwind: {
        path: '$samples',// The name of a new field to hold the array index of the element.
        includeArrayIndex: 'num_sample',preserveNullAndEmptyArrays: true,},{
       $limit: 10000,{
      $project: {
        _id: 0,t: '$samples.t',station: '$station.name',loc: '$station.location',data: '$samples.data',];
  // const samples = await fixed.aggregate([pipeline]);

  const cursor = fixed
    .aggregate(pipeline)
    .cursor({ batchSize: 1000 })
    .exec();
 res.writeHead(200,{ 'content-type': 'application/json' });
 res.write('[');
 await cursor.eachAsync(async (doc,i) => {
    res.write(JSON.stringify(doc));
    res.write(',');
  });
  res.write('{}]');
  res.end();

但是如何将响应通过管道传送到 CSV 转换器作为 json2csv? 上面的代码功能上是否正确? 我必须向响应流写入额外的字符才能正确格式化 JSON,但找到的解决方案(带有最后的 {})在最终的 JSON 中引入了一个空记录(我还没有找到在每个文档之后写“,”的方法)从猫鼬游标返回,除了最后一个,为此,我不得不引入一个空记录)。

解决方法

它是一个流,这意味着您必须将该流 piperes,如下所示:

cursor.pipe(JSONStream.stringify()).pipe(res)

有关详细信息,请参阅此 question

,

我终于用下面的代码解决了。

欢迎任何改进所提供解决方案的建议。

exports.get_FIXED_Samples_CSV = catchAsync(async (req,res,next) => {
  // retrieve start and end dates
  // if start is not provided then set it to current date - 30 days
  const startDate =
    moment(req.query.start).isValid() && req.query.start
      ? moment(new Date(req.query.start))
      : moment(new Date()).subtract(30,'d');

  // if end is not provided or invalid set it to current date
  const endDate =
    moment(req.query.end).isValid() && req.query.end
      ? moment(new Date(req.query.end))
      : moment(new Date());

  // retrieve station name and check if valid,if not returns null
  const station = [
    'AQ101','AQ102','AQ103','AQ104','AQ105','AQ106','AQ107',].includes(req.query.station)
    ? req.query.station
    : null;

  // eslint-disable line no-unused-vars
  const pipeline = [
    // sort by date DESC adn station.name ASC
    {
      $sort: {
        'station.name': 1,date: -1,},// unwind by samples array adding num_sample counting
    {
      $unwind: {
        path: '$samples',// The name of a new field to hold the array index of the element.
        includeArrayIndex: 'num_sample',preserveNullAndEmptyArrays: true,// ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
    {
      $limit: 216000,// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    // select fields to show
    {
      $project: {
        _id: 0,t: '$samples.t',station: '$station.name',loc: '$station.location',data: '$samples.data',];

  // add as first stage to the pipeline a match aggregation
  if (station) {
    // if valid station  is provided
    pipeline.unshift({
      $match: {
        'station.name': station,date: {
          $gte: new Date(startDate.format('YYYY-MM-DD')),$lte: new Date(endDate.format('YYYY-MM-DD')),});
  } else {
    // if station is INVALID OR NOT provided
    pipeline.unshift({
      $match: {
        date: {
          $gte: new Date(startDate.format('YYYY-MM-DD')),});
  }

  // transform to apply to generate CSV
  const custTransf = (item,strFormat = 'DD/MM/YYYY HH:mm:ss') => ({
    utc: item.t,t: moment(item.t).format(strFormat),station: item.station,...item.data,});
  // Unwind Samples properties and flatten arrays
  const transforms = [
    // flatten({ objects: false,arrays: true }),custTransf,];
  // const fields = ['t','station','data'];
  const opts = { transforms };
  const transformOpts = { highWaterMark: 8192 };
  const pipeTransf = new Transform(opts,transformOpts);
  // remove data prefix from fields
  const regex = /(data.)/gi;
  const filename = 'FixedStations';
  const strAtt = `attachment;filename=${filename}-${startDate.format(
    'YYYY-MMM-DD'
  )}-${endDate.format('YYYY-MMM-DD')}.csv`;
  res.header('Content-Type','text/csv');
  res.setHeader('Content-Type','text/csv');
  res.setHeader(
    'Content-Disposition',strAtt
    // 'attachment;filename=download.csv'
  );
  res.setHeader('Cache-Control','no-cache');
  res.setHeader('Pragma','no-cache');

  const cursor = fixed
    .aggregate(pipeline)
    .allowDiskUse(true)
    .cursor({ transfor: JSON.stringify,batchSize: 1000 })
    .exec()
    .pipe(JSONStream.stringify())
    .pipe(pipeTransf)
    .pipe(replace(regex,''))
    .pipe(res);
});

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。