如何解决Mongoose 使用异步迭代器流式传输聚合查询
我想使用 Mongoose 流式传输聚合查询的结果,以允许客户端处理巨大的 JSON 响应(最终传输到 CSV 转换器)。
到目前为止我的代码:
const pipeline = [
{
$unwind: {
path: '$samples',// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',preserveNullAndEmptyArrays: true,},{
$limit: 10000,{
$project: {
_id: 0,t: '$samples.t',station: '$station.name',loc: '$station.location',data: '$samples.data',];
// const samples = await fixed.aggregate([pipeline]);
const cursor = fixed
.aggregate(pipeline)
.cursor({ batchSize: 1000 })
.exec();
res.writeHead(200,{ 'content-type': 'application/json' });
res.write('[');
await cursor.eachAsync(async (doc,i) => {
res.write(JSON.stringify(doc));
res.write(',');
});
res.write('{}]');
res.end();
但是如何将响应通过管道传送到 CSV 转换器作为 json2csv
?
上面的代码在功能上是否正确?
我必须向响应流写入额外的字符才能正确格式化 JSON,但找到的解决方案(带有最后的 {})在最终的 JSON 中引入了一个空记录(我还没有找到在每个文档之后写“,”的方法)从猫鼬游标返回,除了最后一个,为此,我不得不引入一个空记录)。
解决方法
它是一个流,这意味着您必须将该流 pipe
到 res
,如下所示:
cursor.pipe(JSONStream.stringify()).pipe(res)
有关详细信息,请参阅此 question。
,我终于用下面的代码解决了。
欢迎任何改进所提供解决方案的建议。
exports.get_FIXED_Samples_CSV = catchAsync(async (req,res,next) => {
// retrieve start and end dates
// if start is not provided then set it to current date - 30 days
const startDate =
moment(req.query.start).isValid() && req.query.start
? moment(new Date(req.query.start))
: moment(new Date()).subtract(30,'d');
// if end is not provided or invalid set it to current date
const endDate =
moment(req.query.end).isValid() && req.query.end
? moment(new Date(req.query.end))
: moment(new Date());
// retrieve station name and check if valid,if not returns null
const station = [
'AQ101','AQ102','AQ103','AQ104','AQ105','AQ106','AQ107',].includes(req.query.station)
? req.query.station
: null;
// eslint-disable line no-unused-vars
const pipeline = [
// sort by date DESC adn station.name ASC
{
$sort: {
'station.name': 1,date: -1,},// unwind by samples array adding num_sample counting
{
$unwind: {
path: '$samples',// The name of a new field to hold the array index of the element.
includeArrayIndex: 'num_sample',preserveNullAndEmptyArrays: true,// ~~~~~~~~~~~~ set limit to returning docs ~~~~~~~~~~~~~~~~~~~
{
$limit: 216000,// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// select fields to show
{
$project: {
_id: 0,t: '$samples.t',station: '$station.name',loc: '$station.location',data: '$samples.data',];
// add as first stage to the pipeline a match aggregation
if (station) {
// if valid station is provided
pipeline.unshift({
$match: {
'station.name': station,date: {
$gte: new Date(startDate.format('YYYY-MM-DD')),$lte: new Date(endDate.format('YYYY-MM-DD')),});
} else {
// if station is INVALID OR NOT provided
pipeline.unshift({
$match: {
date: {
$gte: new Date(startDate.format('YYYY-MM-DD')),});
}
// transform to apply to generate CSV
const custTransf = (item,strFormat = 'DD/MM/YYYY HH:mm:ss') => ({
utc: item.t,t: moment(item.t).format(strFormat),station: item.station,...item.data,});
// Unwind Samples properties and flatten arrays
const transforms = [
// flatten({ objects: false,arrays: true }),custTransf,];
// const fields = ['t','station','data'];
const opts = { transforms };
const transformOpts = { highWaterMark: 8192 };
const pipeTransf = new Transform(opts,transformOpts);
// remove data prefix from fields
const regex = /(data.)/gi;
const filename = 'FixedStations';
const strAtt = `attachment;filename=${filename}-${startDate.format(
'YYYY-MMM-DD'
)}-${endDate.format('YYYY-MMM-DD')}.csv`;
res.header('Content-Type','text/csv');
res.setHeader('Content-Type','text/csv');
res.setHeader(
'Content-Disposition',strAtt
// 'attachment;filename=download.csv'
);
res.setHeader('Cache-Control','no-cache');
res.setHeader('Pragma','no-cache');
const cursor = fixed
.aggregate(pipeline)
.allowDiskUse(true)
.cursor({ transfor: JSON.stringify,batchSize: 1000 })
.exec()
.pipe(JSONStream.stringify())
.pipe(pipeTransf)
.pipe(replace(regex,''))
.pipe(res);
});
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。