正文
海思
Reactive编程(三):一个简单的HTTP服务
书接上文
Reactive编程
,我们已经了解了基础的API,现在我们开始编写实际的应用。Reactive对并发编程进行了很好的抽象,也有很多底层的特性需要我们去关注。当使用这些特性时,我们可以对之前隐藏在容器、平台、框架中的细节进行控制。
Spring MVC由阻塞转向Reactive
Reactive要求我们以不同的思路来看待问题。区别于传统的request->response模式,所有的数据都是发布为一个序列 (Publisher) 然后进行订阅(Subscriber)。区别于同步等待返回结果,改为注册一个回调。只要我们习惯了这种方式,就不会觉得很复杂。但是没办法让整个环境都同时变为Reactive模式,所以避免不了要跟老式的阻塞API打交道。
假设我们有一个返回HttpStatus的阻塞方法:
private RestTemplate restTemplate = new RestTemplate();
private HttpStatus block(int value) {
return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
.getStatusCode();
}
我们需要传递不同的参数来重复调用这个方法,并对返回的结果进行处理。这是一个典型的 “scatter-gather”应用场景。例如从多个页面中提取前N条数据。
这是一个采用错误方式的例子:
Flux.range(1, 10) (1)
.log()
.map(this::block) (2)
.collect(Result::new, Result::add) (3)
.doOnSuccess(Result::stop) (4)
-
调用10次接口
-
产生阻塞
-
将结果进行汇总后放入一个对象
-
最后结束处理 (结果是一个
Mono<Result>
)
不要采用这种方式来编写代码。这是一种错误的实现方式,这样会阻塞住调用线程,这跟循环调用block()没什么区别。好的实现应该是把
block()
的调用放到工作线程中。我们可以采用一个返回
Mono<HttpStatus>
的方法:
private Mono<HttpStatus> fetch(int value) {
return Mono.fromCallable(() -> block(value)) (1)
.subscribeOn(this.scheduler); (2)
}
-
将阻塞调用放到一个 ```Callable`` 中
-
在工作线程中进行订阅
scheduler
单独定义为一个共享变量:
Scheduler scheduler = Schedulers.parallel()
然后用
flatMap()
代替
map()
Flux.range(1, 10)
.log()
.flatMap( (1)
this::fetch, 4) (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop)
-
在新的publisher中并行处理
-
flatMap的并行参数
嵌入 Non-Reactive 服务
如果想将上面的代码放入一个servlet这种的Non-Reactive的服务中,可以使用Spring MVC:
@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
return Flux.range(1, 10)
...
.doOnSuccess(Result::stop)
.toFuture();
}
在阅读了
@RequestMapping
的javadoc以后,我们会发现这个方法会返回一个
CompletableFuture
,应用会选择在单独的线程中返回值。在我们的例子中这个单独的线程由
scheduler
提供。
没有免费的午餐
利用工作线程进行 scatter-gather 计算是一个好的模式,但是也不完美 - 没有阻塞调用方,但还是阻塞了一些东西,只不过是把问题转移了。我们有一个非阻塞IO的 HTTP 服务,将处理放入线程池,一个请求一个线程 - 这是servlet容器的机制(例如tomcat)。请求是异步处理的,因此tomcat内部的工作线程没有被阻塞,我们的
scheduler
会创建4个线程。在处理10个请求时,理论上处理性能会提高4倍。简单来说,如果我们在单个线程中顺序处理10个请求要花1000ms,我们所采用的方式只需250ms。
我们可以通过增加线程来进一步提升性能(分配16个线程):
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
Tomcat 默认会分配100个线程来处理请求,当所有的请求同时处理时,我们的 scheduler 线程池会成为一个瓶颈。我们的 scheduler 线程池数量远小于 Tomcat 的线程池数量。这说明性能调优不是一个简单的事情,需要考虑各个参数和资源的匹配情况。
相比固定数量的线程池,我们可以采用更灵活的线程池,可以根据需要动态调整线程数量。Reactor 已经提供了这种机制,使用
Schedulers.elastic()
后可以看到当请求增多时,线程数量会随之增加。
全面采用 Reactive
从阻塞调用到reactive的桥接是一种有效的模式,并且用Spring MVC的技术很容易实现。接下来我们将完全弃用阻塞模式,采用新的API和新的工具。最终我们实现全栈Reactive。