我正在进行一些测试,以评估在使用基于Observables的反应式API中是否存在真正的优势,而不是阻止传统API.
整个例子是available on Githug
>最好的:返回一个包含阻塞操作的Callable / DeferredResult的REST Services.
>不错:阻止REST服务.
>最糟糕的是:返回DeferredResult的REST Services,其结果由RxJava Observable设置.
这是我的Spring WebApp:
应用:
- @SpringBootApplication
- public class SpringNioRestApplication {
- @Bean
- public ThreadPoolTaskExecutor executor(){
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(10);
- executor.setMaxPoolSize(20);
- return executor;
- }
- public static void main(String[] args) {
- SpringApplication.run(SpringNioRestApplication.class,args);
- }
- }
SyncController:
- @RestController("SyncRestController")
- @Api(value="",description="Synchronous data controller")
- public class SyncRestController {
- @Autowired
- private DataService dataService;
- @RequestMapping(value="/sync/data",method=RequestMethod.GET,produces="application/json")
- @ApiOperation(value = "Gets data",notes="Gets data synchronously")
- @ApiResponses(value={@ApiResponse(code=200,message="OK")})
- public List<Data> getData(){
- return dataService.loadData();
- }
- }
AsyncController:具有原始Callable和Observable端点
- @RestController
- @Api(value="",description="Synchronous data controller")
- public class AsyncRestController {
- @Autowired
- private DataService dataService;
- private Scheduler scheduler;
- @Autowired
- private TaskExecutor executor;
- @PostConstruct
- protected void initializeScheduler(){
- scheduler = Schedulers.from(executor);
- }
- @RequestMapping(value="/async/data",notes="Gets data asynchronously")
- @ApiResponses(value={@ApiResponse(code=200,message="OK")})
- public Callable<List<Data>> getData(){
- return ( () -> {return dataService.loadData();} );
- }
- @RequestMapping(value="/observable/data",produces="application/json")
- @ApiOperation(value = "Gets data through Observable",notes="Gets data asynchronously through Observable")
- @ApiResponses(value={@ApiResponse(code=200,message="OK")})
- public DeferredResult<List<Data>> getDataObservable(){
- DeferredResult<List<Data>> dr = new DeferredResult<List<Data>>();
- Observable<List<Data>> dataObservable = dataService.loadDataObservable();
- dataObservable.subscribeOn(scheduler).subscribe( dr::setResult,dr::setErrorResult);
- return dr;
- }
- }
DataServiceImpl
- @Service
- public class DataServiceImpl implements DataService{
- @Override
- public List<Data> loadData() {
- return generateData();
- }
- @Override
- public Observable<List<Data>> loadDataObservable() {
- return Observable.create( s -> {
- List<Data> dataList = generateData();
- s.onNext(dataList);
- s.onCompleted();
- });
- }
- private List<Data> generateData(){
- List<Data> dataList = new ArrayList<Data>();
- for (int i = 0; i < 20; i++) {
- Data data = new Data("key"+i,"value"+i);
- dataList.add(data);
- }
- //Processing time simulation
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return dataList;
- }
- }
我设置了一个Thread.sleep(500)延迟来增加服务响应时间.
负载测试的结果是:
与Callable异步:700 rps,没有错误
- >>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/async/data
- ...
- Requests: 0,requests per second: 0,mean latency: 0 ms
- Requests: 2839,requests per second: 568,mean latency: 500 ms
- Requests: 6337,requests per second: 700,mean latency: 500 ms
- Requests: 9836,mean latency: 500 ms
- ...
- Completed requests: 41337
- Total errors: 0
- Total time: 60.002348360999996 s
- Requests per second: 689
- Total time: 60.002348360999996 s
- >>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/sync/data
- ...
- Requests: 7683,requests per second: 400,mean latency: 7420 ms
- Requests: 9683,mean latency: 9570 ms
- Requests: 11680,requests per second: 399,mean latency: 11720 ms
- Requests: 13699,requests per second: 404,mean latency: 13760 ms
- ...
- Percentage of the requests served within a certain time
- 50% 8868 ms
- 90% 22434 ms
- 95% 24103 ms
- 99% 25351 ms
- 100% 26055 ms (longest request)
- 100% 26055 ms (longest request)
- -1: 7559 errors
- Requests: 31193,requests per second: 689,mean latency: 14350 ms
- Errors: 1534,accumulated errors: 7559,24.2% of total requests
与可观察的异步:不超过20 rps,并早日得到错误
- >>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/observable/data
- Requests: 0,mean latency: 0 ms
- Requests: 90,requests per second: 18,mean latency: 2250 ms
- Requests: 187,requests per second: 20,mean latency: 6770 ms
- Requests: 265,requests per second: 16,mean latency: 11870 ms
- Requests: 2872,requests per second: 521,mean latency: 1560 ms
- Errors: 2518,accumulated errors: 2518,87.7% of total requests
- Requests: 6373,mean latency: 1590 ms
- Errors: 3401,accumulated errors: 5919,92.9% of total requests
Observable执行的corePoolSize为10,但是将其增加到50也没有改善任何东西.
什么可以解释?
更新:根据akarnokd的建议,我进行了以下更改.从Object.create移动到服务中的Object.fromCallable,并重新使用控制器中的Scheduler,但是我仍然得到相同的结果.
解决方法
这个问题是由某些程序错误引起的.其实这个问题的例子是完美的.
一个警告,以防止其他人出现问题:注意使用Observable.just(func),func实际上是在可观察创建上调用的.所以放置在那里的任何Thread.sleep都会阻塞调用线程
- @Override
- public Observable<List<Data>> loadDataObservable() {
- return Observable.just(generateData()).delay(500,TimeUnit.MILLISECONDS);
- }
- private List<Data> generateData(){
- List<Data> dataList = new ArrayList<Data>();
- for (int i = 0; i < 20; i++) {
- Data data = new Data("key"+i,"value"+i);
- dataList.add(data);
- }
- return dataList;
- }
我在RxJava Google group开始讨论,他们帮助我解决了这个问题.