微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

javascript – 从ORM返回的Promise序列生成RxJS Observable

我试图从sql数据库的一系列(批处理)记录生成一个Observable,我试图运行数据库中的所有记录.我在node-js上使用ORM,Sequelize返回包含在promise中的记录.

我已经定义了一个函数fetchbatch(),它获取一个批处理并返回一个Promise [Array [Record]]并将该结果平面映射到一个Observable.

根据查询是否返回没有记录,我的条件(终止)被设置为promise的then块中的全局,但是永远不会调用回调,只会无限地返回promise,因此永远不会满足终止条件.有关如何处理的任何建议?这是代码的要点.

function getAllPaginated(conditions) {
    var remaining = true;
    var batch_size = 20;
    function condition(){ return remaining; }
    function selector(promisedBatchOfRecords){
      //console.log(promisedBatchOfRecords);
      //return Observable.fromPromise(promisedBatchOfRecords[1]);
      return (promisedBatchOfRecords[1]);
    }
    function fetchBatch(batchNumberAndBatch) { // Returns [NextBatchNumber,Promise[Array[Record]]]
      //console.log(remaining);
      var batch_number = batchNumberAndBatch[0];
      var offset = (batch_number - 1) * batch_size;
      var rs = Records.findAll({where: conditions,offset: offset,limit: batch_size});
      return [batch_number + 1,rs.then(function(batch) {
                console.log(batch.length);
                if (!(batch.length > 0)){
                  remaining = false;
                };
                return batch.map(function(r){r.dataValues});
              })];
    }
    return Observable.generate(fetchBatch([1,[]]),condition,fetchBatch).flatMap(Ramda.identity/*over the promise*/).flatMap(Ramda.identity/*over the list*/);
  }
var o = getAllPaginated({where: {a: "b"}})
o.subScribeOnNext(console.log)

解决方法

你可以试试这样的东西:

const result = new Rx.Subject;
const batch_size = 3;

// Init the recursion
whileFind(0)
  .subscribe();

// Grab the result here
result
  .mergeAll()
  .map(batch => batch.dataValues)
  .subscribe(value => console.log(value));
    
// Recursion function
function whileFind(offset) {
  return Rx.Observable.fromPromise(findAll(offset))
    .concatMap(batch => {
      if (batch.length <= 0) { // Stop condition
        return Rx.Observable.of(null);
      }
      else {
        result.next(batch); // Push the chunk to the result
        return whileFind(offset + batch_size);
      }
  });
}

// Emulate Records.findAll from your BO
function findAll(offset): Promise<Object[]> {
  const data = [
    { dataValues: 1 },{ dataValues: 2 },{ dataValues: 3 },{ dataValues: 4 },{ dataValues: 5 },{ dataValues: 6 },{ dataValues: 7 },{ dataValues: 8 },{ dataValues: 9 },{ dataValues: 10 }
  ];

  return Promise.resolve(data.slice(offset,offset + batch_size));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.12/Rx.min.js"></script>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐