微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

围绕异步 API 的反应式 Java 应用程序设计

如何解决围绕异步 API 的反应式 Java 应用程序设计

我正在开发一个简单的 JavaFX 应用程序,该应用程序使用 IB TWS API 显示选项链。反应式程序设计对我来说是新的,我想问你我是否正确地设计了这个小部分。作为使用反应器的替代方案,我考虑 CompletableFuture。据我了解 CompletableFuture 更容易使用,也许这里的反应器有点矫枉过正。

首先它应该请求合约详细信息,当异步接收详细信息时,可以请求期权链和市场数据。期权链和市场数据结果也是异步的。这个简单的场景显示在下面的图表中。 request 方法是起点。所有发布商的构建方式都相同,我将在此处仅展示其中之一。

enter image description here

入口点 MainController.request

  @FXML
  public void request(ActionEvent event) {
    if (reqId != -1) {
      ibApi.cancelMktData(reqId);
      reqId = -1;
    }
    String symbolStr = this.symbol.getText();
    if (symbolStr.isEmpty())
      return;
    instrument = new Instrument(symbolStr);
    Mono<List<ContractDetails>> detailsList = detailsPublisher.requestContractDetails(instrument.contract)
        .collectList();
    Mono<ContractDetails> details = detailsList.handle((contractDetailsList,sink) -> {
      if (contractDetailsList.size() == 1) {
        sink.next(contractDetailsList.get(0));
        sink.complete();
      } else {
        //Todo
        logger.error("more then one contract found");
      }
    });
    details.subscribe(contractDetails -> {
      Platform.runLater(() -> name.setText(contractDetails.longName()));
      logger.debug("set name");
    });
    details.subscribe(contractDetails -> {
      requestOptChain(contractDetails);
      requestMarketData(contractDetails.contract());
      logger.debug("request option chain");
    });
    logger.debug("setup is done");
  }

MainController.requestOptChain 方法

  private void requestOptChain(ContractDetails contractDetails) {
    Flux<OptChain> optChainFlux = optChainPublisher.requestOptChain(contractDetails);
    Mono<OptChain> optChain = optChainFlux.filter(oc -> oc.getExchange().equals("SMART")).collectList()
        .handle((optChainsList,sink) -> {
          if (optChainsList.size() == 1) {
            sink.next(optChainsList.get(0));
            sink.complete();
          } else {
            sink.error(new RuntimeException("More then one option chain list on SMART exchange"));
          }
        });
    optChain.subscribe(oc -> {
      instrument.expirations = oc.getExpirations();
      instrument.strikes = oc.getStrikes();
      Platform.runLater(() -> {
        instrument.expirations.forEach(exp -> addExpirationsPane(exp,instrument.strikes));
      });
    });
  }

DetailsPublisher.requestContractDetails 方法

  @Autowired
  private IBApi ibApi;

  public Flux<ContractDetails> requestContractDetails(Contract contract) {
    return Flux.<ContractDetails>create(sink -> {
      DefaultDetailsListener listener = new DefaultDetailsListener() {
        @Override
        public void contractDetails(ContractDetails contractDetails) {
          sink.next(contractDetails);
        }

        @Override
        public void contractDetailsEnd() {
          sink.complete();
        }

        @Override
        public void error(int errorCode,String errorMsg) {
          sink.error(new RuntimeException(errorMsg));
        }
      };
      logger.debug("requesting contract details for {}",contract.symbol());
      ibApi.reqContractDetails(listener,contract);
    });
  }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。