专栏名称: 狗厂
目录
相关文章推荐
51好读  ›  专栏  ›  狗厂

Reactive编程(三):一个简单的HTTP服务

狗厂  · 掘金  ·  · 2018-03-27 02:53

正文

海思

Reactive编程(三):一个简单的HTTP服务

书接上文 Reactive编程 ,我们已经了解了基础的API,现在我们开始编写实际的应用。Reactive对并发编程进行了很好的抽象,也有很多底层的特性需要我们去关注。当使用这些特性时,我们可以对之前隐藏在容器、平台、框架中的细节进行控制。

Spring MVC由阻塞转向Reactive

Reactive要求我们以不同的思路来看待问题。区别于传统的request->response模式,所有的数据都是发布为一个序列 (Publisher) 然后进行订阅(Subscriber)。区别于同步等待返回结果,改为注册一个回调。只要我们习惯了这种方式,就不会觉得很复杂。但是没办法让整个环境都同时变为Reactive模式,所以避免不了要跟老式的阻塞API打交道。

假设我们有一个返回HttpStatus的阻塞方法:

private RestTemplate restTemplate = new RestTemplate();

private HttpStatus block(int value) {
    return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
            .getStatusCode();
}

我们需要传递不同的参数来重复调用这个方法,并对返回的结果进行处理。这是一个典型的 “scatter-gather”应用场景。例如从多个页面中提取前N条数据。

这是一个采用错误方式的例子:

Flux.range(1, 10) (1)
    .log()
    .map(this::block) (2)
    .collect(Result::new, Result::add) (3)
    .doOnSuccess(Result::stop) (4)
  1. 调用10次接口
  2. 产生阻塞
  3. 将结果进行汇总后放入一个对象
  4. 最后结束处理 (结果是一个 Mono<Result> )

不要采用这种方式来编写代码。这是一种错误的实现方式,这样会阻塞住调用线程,这跟循环调用block()没什么区别。好的实现应该是把 block() 的调用放到工作线程中。我们可以采用一个返回 Mono<HttpStatus> 的方法:

private Mono<HttpStatus> fetch(int value) {
    return Mono.fromCallable(() -> block(value)) (1)
        .subscribeOn(this.scheduler);            (2)
}
  1. 将阻塞调用放到一个 ```Callable`` 中
  2. 在工作线程中进行订阅

scheduler 单独定义为一个共享变量:


  Scheduler scheduler = Schedulers.parallel()

然后用 flatMap() 代替 map()


Flux.range(1, 10)
    .log()
    .flatMap(                             (1)
        this::fetch, 4)                   (2)
    .collect(Result::new, Result::add)
    .doOnSuccess(Result::stop)

  1. 在新的publisher中并行处理
  2. flatMap的并行参数

嵌入 Non-Reactive 服务

如果想将上面的代码放入一个servlet这种的Non-Reactive的服务中,可以使用Spring MVC:


@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
    return Flux.range(1, 10)
      ...
      .doOnSuccess(Result::stop)
      .toFuture();
}

在阅读了 @RequestMapping 的javadoc以后,我们会发现这个方法会返回一个 CompletableFuture ,应用会选择在单独的线程中返回值。在我们的例子中这个单独的线程由 scheduler 提供。

没有免费的午餐

利用工作线程进行 scatter-gather 计算是一个好的模式,但是也不完美 - 没有阻塞调用方,但还是阻塞了一些东西,只不过是把问题转移了。我们有一个非阻塞IO的 HTTP 服务,将处理放入线程池,一个请求一个线程 - 这是servlet容器的机制(例如tomcat)。请求是异步处理的,因此tomcat内部的工作线程没有被阻塞,我们的 scheduler 会创建4个线程。在处理10个请求时,理论上处理性能会提高4倍。简单来说,如果我们在单个线程中顺序处理10个请求要花1000ms,我们所采用的方式只需250ms。

我们可以通过增加线程来进一步提升性能(分配16个线程):


private Scheduler scheduler = Schedulers.newParallel("sub", 16);

Tomcat 默认会分配100个线程来处理请求,当所有的请求同时处理时,我们的 scheduler 线程池会成为一个瓶颈。我们的 scheduler 线程池数量远小于 Tomcat 的线程池数量。这说明性能调优不是一个简单的事情,需要考虑各个参数和资源的匹配情况。

相比固定数量的线程池,我们可以采用更灵活的线程池,可以根据需要动态调整线程数量。Reactor 已经提供了这种机制,使用 Schedulers.elastic() 后可以看到当请求增多时,线程数量会随之增加。

全面采用 Reactive

从阻塞调用到reactive的桥接是一种有效的模式,并且用Spring MVC的技术很容易实现。接下来我们将完全弃用阻塞模式,采用新的API和新的工具。最终我们实现全栈Reactive。







请到「今天看啥」查看全文