如何解决导出数据时如何在RxJS 6和节点中expand
将数据库输出写入多个CSV文件。使用postgres光标。我正在
error is TypeError: You provided 'undefined' where a stream was expected. You can provide an Observable,Promise,Array,or Iterable
读取后续游标迭代时。返回错误的东西。使用节点15,RxJS 6和
const Pool = require('pg-pool');
const SQL = require('sql-template-strings');
const Cursor = require('pg-cursor');
const {map,mergeMap,tap,observeOn,expand,concatAll,concatMap,catchError,takeWhile,takeUntil} = require('rxjs / operators');
班级
class HandleQuery {
constructor(installations,weeks,client,ws) {
this.processing = 0;
this.installations = installations;
this.weeks = weeks;
this.client = client;
this.csvStream = format({headers: true});
return this;
}
runAll() {
console.log('run all');
const installationQueue = from(this.installations);
installationQueue.subscribe(i => {
console.log(i.installation);
if (this.weeks.rows.length > 0) {
this.runWeek(this.weeks.rows,i);
}
});
}
createCsvPipe(week,installation) {
const fileDate = week.readingdate.toISOString().split('T')[0];
const fileName = 'creps_' + installation.installation + '_' + fileDate + '.csv';
console.log('Writing to ' + fileName);
const ws = fs.createWriteStream(fileName,{autoClose: false});
return this.csvStream.pipe(ws);
}
runWeek(weeks,installation) {
let latterWeek = {readingdate: new Date()};
if (weeks.length > 1) {
latterWeek = weeks.pop();
}
from(weeks).subscribe(beginningWeek => {
console.log('week');
this.processing++;
const csv = this.createCsvPipe(beginningWeek,installation);
const cursor = weekDataQuery(this.client,installation.installation,beginningWeek.readingdate,latterWeek.readingdate);
latterWeek = beginningWeek;
const observableCursor = bindNodeCallback(cursor.read.bind(cursor));
observableCursor(BATCH_SIZE)
.pipe(map(rows => {
console.log(JSON.stringify(rows));
rows.forEach(row => {
console.log(JSON.stringify(row));
if (!(row instanceof Array)) {
// csv.write(row);
}
});
}))
.pipe(expand(() => {
console.log('expanding');
console.log(typeof observableCursor);
observableCursor(BATCH_SIZE)
.pipe(map(meme => {
return meme
}));
})
)
// .pipe(takeUntil(rows => rows.length === 0))
.subscribe((data) => {
if (data) {
console.log('data is ' + JSON.stringify(data));
}
},err => {
console.log('error is ' + err);
},() => {
console.log('complete');
cursor.close();
csv.end();
this.ws.close();
}
);
});
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。