微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在Web应用程序中使用RxJava Observables不可思议的缺乏性能改进

正在进行一些测试,以评估在使用基于Observables的反应式API中是否存在真正的优势,而不是阻止传统API.

整个例子是available on Githug

令人惊讶的是,结果显示,输出结果是:

>最好的:返回一个包含阻塞操作的Callable / DeferredResult的REST Services.
>不错:阻止REST服务.
>最糟糕的是:返回DeferredResult的REST Services,其结果由RxJava Observable设置.

这是我的Spring WebApp:

应用:

@SpringBootApplication
public class SpringNioRestApplication {

   @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringNioRestApplication.class,args);
    }
}

SyncController:

@RestController("SyncRestController")
@Api(value="",description="Synchronous data controller")
public class SyncRestController {

    @Autowired
    private DataService dataService;

    @RequestMapping(value="/sync/data",method=RequestMethod.GET,produces="application/json")
    @ApiOperation(value = "Gets data",notes="Gets data synchronously")
    @ApiResponses(value={@ApiResponse(code=200,message="OK")})
    public List<Data> getData(){
        return dataService.loadData();
    }
}

AsyncController:具有原始Callable和Observable端点

@RestController
@Api(value="",description="Synchronous data controller")
public class AsyncRestController {

    @Autowired
    private DataService dataService;

    private Scheduler scheduler;

    @Autowired
    private TaskExecutor executor;

     @postconstruct
    protected void initializeScheduler(){
        scheduler = Schedulers.from(executor);
    }

    @RequestMapping(value="/async/data",notes="Gets data asynchronously")
    @ApiResponses(value={@ApiResponse(code=200,message="OK")})
    public Callable<List<Data>> getData(){
        return ( () -> {return dataService.loadData();} );
    }

    @RequestMapping(value="/observable/data",produces="application/json")
     @ApiOperation(value = "Gets data through Observable",notes="Gets data asynchronously through Observable")
     @ApiResponses(value={@ApiResponse(code=200,message="OK")})
     public DeferredResult<List<Data>> getDataObservable(){
         DeferredResult<List<Data>> dr = new DeferredResult<List<Data>>();
         Observable<List<Data>> dataObservable = dataService.loadDataObservable();
         dataObservable.subscribeOn(scheduler).subscribe( dr::setResult,dr::setErrorResult);
         return dr;
     }
}

DataServiceImpl

@Service
public class DataServiceImpl implements DataService{

    @Override
    public List<Data> loadData() {
        return generateData();
    }

    @Override
    public Observable<List<Data>> loadDataObservable() {
        return Observable.create( s -> {
            List<Data> dataList = generateData();
            s.onNext(dataList);
            s.onCompleted();
        });
    }

    private List<Data> generateData(){
        List<Data> dataList = new ArrayList<Data>();
        for (int i = 0; i < 20; i++) {
            Data data = new Data("key"+i,"value"+i);
            dataList.add(data);
        }
        //Processing time simulation
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        return dataList;
    }
}

我设置了一个Thread.sleep(500)延迟来增加服务响应时间.

负载测试的结果是:

与Callable异步:700 rps,没有错误

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/async/data    
...
Requests: 0,requests per second: 0,mean latency: 0 ms
Requests: 2839,requests per second: 568,mean latency: 500 ms
Requests: 6337,requests per second: 700,mean latency: 500 ms
Requests: 9836,mean latency: 500 ms
...
Completed requests:  41337
Total errors:        0
Total time:          60.002348360999996 s
Requests per second: 689
Total time:          60.002348360999996 s

阻塞:大约404 rps,但会产生错误

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/sync/data    
...
Requests: 7683,requests per second: 400,mean latency: 7420 ms
Requests: 9683,mean latency: 9570 ms
Requests: 11680,requests per second: 399,mean latency: 11720 ms
Requests: 13699,requests per second: 404,mean latency: 13760 ms
...
Percentage of the requests served within a certain time
  50%      8868 ms
  90%      22434 ms
  95%      24103 ms
  99%      25351 ms
 100%      26055 ms (longest request)

 100%      26055 ms (longest request)

   -1:   7559 errors
Requests: 31193,requests per second: 689,mean latency: 14350 ms
Errors: 1534,accumulated errors: 7559,24.2% of total requests

与可观察的异步:不超过20 rps,并早日得到错误

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/observable/data
Requests: 0,mean latency: 0 ms
Requests: 90,requests per second: 18,mean latency: 2250 ms
Requests: 187,requests per second: 20,mean latency: 6770 ms
Requests: 265,requests per second: 16,mean latency: 11870 ms
Requests: 2872,requests per second: 521,mean latency: 1560 ms
Errors: 2518,accumulated errors: 2518,87.7% of total requests
Requests: 6373,mean latency: 1590 ms
Errors: 3401,accumulated errors: 5919,92.9% of total requests

Observable执行的corePoolSize为10,但是将其增加到50也没有改善任何东西.

什么可以解释?

更新:根据akarnokd的建议,我进行了以下更改.从Object.create移动到服务中的Object.fromCallable,并重新使用控制器中的Scheduler,但是我仍然得到相同的结果.

解决方法

这个问题是由某些程序错误引起的.其实这个问题的例子是完美的.

一个警告,以防止其他人出现问题:注意使用Observable.just(func),func实际上是在可观察创建上调用的.所以放置在那里的任何Thread.sleep都会阻塞调用线程

@Override
public Observable<List<Data>> loadDataObservable() {
    return Observable.just(generateData()).delay(500,TimeUnit.MILLISECONDS);
}

private List<Data> generateData(){
    List<Data> dataList = new ArrayList<Data>();
    for (int i = 0; i < 20; i++) {
        Data data = new Data("key"+i,"value"+i);
        dataList.add(data);
    }
    return dataList;
}

我在RxJava Google group开始讨论,他们帮助我解决了这个问题.

原文地址:https://www.jb51.cc/html/229640.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐