Dubbo 自 2011 年 10 月 27 日开源后,已被许多非阿里系的公司使用,其中既有当当网、网易考拉等互联网公司,也不乏中国人寿、青岛海尔等大型传统企业。
自去年 12 月开始,
Dubbo 3.0 便已
正式进入开发阶段,并
备受社区和广大 Dubbo 用户的关注,
本文将为您详细解读
3.0 预览版的新特性和新功能。
下面先解答一下两个有意思的与 Dubbo 相关的疑问。
Dubbo 3.0 预览版的要点
Dubbo 3.0 在设计和功能上的新增支持和改进,主要是以下四方面:
Dubbo 内核之 Filter 链的异步化
这里要指出的是,3.0 中规划的异步去阻塞和 2.7 中提供的异步是两个层面的特性。2.7 中的异步是建立在传统 RPC 中 request – response 会话模型上的,而 3.0 中的异步将会从通讯协议层面由下向上构建,关注的是跨进程、全链路的异步问题。通过底层协议开始支持 streaming 方式,不单单可以支持多种会话模型,还可以在协议层面开始支持反压、限流等特性,使得整个分布式体系更具有弹性。综上所述,2.7 关注的异步更局限在点对点的异步(一个 consumer 调用一个 provider),3.0 关注的异步化,宽度上则关注整个调用链上的异步,高度上则向上又可以包装成 Rx 的编程模型。有趣的是,Spring 5.0 发布了对 Flux 的支持,随后开始解决跨进程的异步问题。
功能方面是 reactive(响应式)支持
最近几年,
reactive programming
这个词语的热度迅速提升,Wikipedia 上的 reactive programming 解释是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0会实现Reactive Stream 的 rx 接口,从而能让用户享受到RP带来的响应性提升,甚至面向 RP 的架构升级。当然,我们希望 reactive 不单单能够带来事件(event)驱动的应用集成方式的升级,也希望在 Load Balance(选择最优的服务节点),fault tolerance(限流降级时最好做到自适应)等方面发挥其积极价值。
云原生/ ServiceMesh 方向的探索
我们定下的策略是进入 Envoy 社区来实现 Dubbo 融入 mesh 的理念思想,目前 Dubbo 协议已经被 Envoy 支持。当然,Dubbo Mesh 离真正可用还有很长一段距离,其在选址、负载均衡和服务治理方面的工作需要继续在数据面建设,另外,控制面板的建设在社区也没有提上日程。
融合并支持阿里内部
Dubbo 3.0 定下了内外融合的策略,也就是说 3.0 的核心最终会在阿里巴巴的生产系统中部署,相信通过大流量、大规模的考验,Dubbo 用户可以获得一个性能、稳定、服务治理实践各方面俱佳的核心,用户在生产系统中采用 3.0 也会更加放心。这一点也是 Dubbo 3.0 最重要的使命。
Filter 链的异步化设计
Dubbo 最强大的一处设计是其在 Filter 链上的抽象设计,通过其扩展机制的开放性支持,用户可以对 Dubbo 做功能增强,并允许各个扩展点被定制来是否保留。
Dubbo 的 Filter 定义如下:
@SPI
public interface Filter {
/**
* do invoke filter.
*
*
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
*
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke( Invoker > invoker, Invocation invocation) throws RpcException ;
}
按照“调用一个远程服务的方法就像调用本地的方法一样”这种说法,这个直接返回 Result 响应的方式是非常好的,用起来是简单直接,问题是时代变换到了需要关注体验,需要走 Reactive 响应式的时代,也回到基本点:invoke一个 invocation 需要经过网络在不同的进程处理,天然就是异步的过程,也就是发送请求(invocation)与接收响应(Result)本身是两个不同的事件,是需要两个过程方法来在 Filter 链处理。那么如何改造这个关键的 SPI 呢?有两种方案:
第一种,把 invoke 的返回值改成 CompletableFuture
, 好处是一目了然,Result 不在建议同步获取了;但基础接口的签名一改会导致代码改造量巨大,同时也会让原有的 SPI 扩展不在支持。
第二种,Result 接口直接继承 CompletationStage,是代表了响应的异步计算。这样能进避免第一种的劣势。所以,3.0.0 Preview 版本对内部调用链路实现做了一次重构:基于 CompletableFuture 实现了框架内部的全异步调用,而在外围编程上,同时支持同步、异步调用模式。
值得注意的是,此次重构仅限于框架内部实现,对使用方没有任何影响即接口上保持完全兼容。要了解 Dubbo 异步 API 如何使用,请参考《如何基于 Dubbo 实现全异步的调用链》
(地址:http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html)
,这篇文章将着重对实现思路和原理做一些简单介绍。此次重构的要点有:
基本工作流程
首先我们来看一个通用的跨网络异步调用的线程模型:
通信框架异步发送请求消息,请求消息发送成功后,返回代表业务结果的 CompletableFuture 给业务线程。之后对于 Future 的处理,根据调用类型会有所区别:
对于同步请求(如上图体现的场景),业务线程会调用 future.get 同步阻塞等待结果,当收到网络层返回的业务结果后,future.get 返回并最终将结果传递给调用发起方。
对于异步请求,业务线程不会调用 future.get,而是将 future 保存在调用上下文或者直接返回给调用者,同时会为 future 注册回调监听器,以便当真正的业务结果从通信层返回时监听器可以对结果做进一步的处理。
接下来具体看一下一次异步 Dubbo RPC 请求的调用流程:
消费方面向 Proxy 代理编程,发出调用请求,请求经过 Filter 链向下传递。
Invoker.invoke() 将请求异步转发给网络层,并收到代表返回结果的 Future。
Future 被包装到 Result,转而由 Result 代表这次远程调用的结果(由于 Result 的异步属性,此时它可能并不包含真正的返回值)。
Result 继续沿着调用链返回,在经过每个 Filter 时,Filter 可选择注册 Listener 监听器,以便在业务结果返回时执行结果预处理。
最终 Proxy 调用 result.recreate() 将结果返回给消费者:
6.
调用方在拿到代表异步业务结果的 Future 后,可选择注册回调监听器,以监听真正的业务结果返回。
同步调用和异步调用基本上是一致的,并且也是走的回调模式,只是在链路返回之前做了一次阻塞 get 调用,以确保在收到实际结果时再返回。Filter 在注册 Listener 时由于 Future 已处于 complete 状态,因此会同时触发回调 onResponse()/onError()。
关于流程图中提到的 Result,Result 在 Dubbo 的一次 RPC 调用中代表返回结果,在 3.0 中 Result 自身增加了代表状态的接口,类似 Future 现在 Result 可以代表一次未完成的调用。
要让 Result 具备代表异步返回结果的能力,有两中方式来实现:
1. Result is a Future,在 Java 8 中更合理的方式是继承 CompletionStage 接口。
public interface Result extends CompletionStage {
}
2. 让 Result 实例持有 Future 实例,与 1 的区别即是设计中选用“继承”还是“组合”。
public class AsyncRpcResult implements Result {
private CompletableFuture < RpcResult > resultFuture;
}
同时,为了让 Result 更直观的体现其异步结果的特性,也为了方便面向 Result 接口编程,我们可以考虑为Result增加一些异步接口:
public interface Result extends Serializable {
Result thenApplyWithContext( Function
< Result , Result > fn);
CompletableFuture thenApply( Function < Result , ? extends U> fn);
Result get() throws InterruptedException , ExecutionException ;
}
Filter SPI
Filter 是 Dubbo 预置的拦截器扩展 SPI,用来做请求的预处理、结果的后处理,框架本身内置了一些拦截器实现,而从用户层面,我相信这个 SPI 也应该是被扩展最多的一个。在 3.0 版本中,Filter 回归单一职责的设计模式,将回调接口单独提取到 Listener 中。
@SPI
public interface Filter {
Result invoke( Invoker > invoker, Invocation invocation) throws RpcException ;
interface Listener {
void onResponse( Result result, Invoker > invoker, Invocation invocation);
void onError( Throwable t, Invoker > invoker, Invocation
invocation);
}
}
以上是 Filter 的 SPI 定义,Filter 的核心定义中只有一个 invoke() 方法用来传递调用请求。
同时,增加了一个新的回调接口 Listener,每个 Filter 实现可以定义自己的 Listenr 回调器,从而实现对返回结果的异步监听,参考以下是为 MonitorFilter 增加的 Listener 回调实现:
class MonitorListener implements Listener {
@Override
public void onResponse( Result result, Invoker > invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter( Constants .MONITOR_KEY)) {
collect(invoker, invocation, result, RpcContext .getContext().getRemoteHost(), Long .valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false );
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
@Override
public void onError( Throwable t, Invoker > invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter( Constants .MONITOR_KEY)) {
collect(invoker, invocation, null , RpcContext .getContext().getRemoteHost(), Long .valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true );
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
}
泛化调用异步接口支持
为了更直观的做异步调用,泛化接口新增了
CompletableFuture
$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)
接口:
public interface GenericService {
/**
* Generic invocation
*
* @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws GenericException potential exception thrown from the invocation
*/
Object $invoke( String
method, String [] parameterTypes, Object [] args) throws GenericException ;
default CompletableFuture < Object > $invokeAsync( String method, String [] parameterTypes, Object [] args) throws GenericException {
Object object = $invoke(method, parameterTypes, args);
if (object instanceof CompletableFuture ) {
return ( CompletableFuture < Object >) object;
}
return CompletableFuture .completedFuture(object);
}
}
这样,当我们想做异步调用时,就可以直接这样使用:
CompletableFuture < Object > genericService.$invokeAsync(method, parameterTypes, args);
更具体用例请参见《泛化调用示例》
https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-generic/dubbo-samples-generic-call
异步与性能
组要注意的是,框架内部的异步实现本身并不能提高单次调用的性能,相反,由于线程切换和回调逻辑的存在,异步反而可能会导致单次调用性能的下降,但是异步带来的优势是能减少对资源的占用,提升整个系统的并发程度和吞吐量,这点对于 RPC 这种需要处理网络延迟的场景非常适用。更多关于异步化设计的好处,请参考其他异步化原理介绍相关文章。
响应式编程支持
响应式编程让开发者更方便地编写高性能的异步代码,很可惜,在之前很长一段时间里,dubbo 并不支持响应式编程,简单来说,dubbo 不支持在 rpc 调用时使用 Mono/Flux 这种流对象(reative-stream 里流的概念),给用户使用带来了不便。
(关于响应式编程更详细的信息请参见这里:http://reactivex.io/)
。
RSocket 是一个开源的支持 reactive-stream 语义的网络通信协议,他将 reative 语义的复杂逻辑封装起来了,使得上层可以方便实现网络程序。
(RSocket详细资料请参见这里:http://rsocket.io/)
。
dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 对响应式编程进行了简单的支持,用户可以在请求参数和返回值里使用 Mono 和 Flux 类型的对象。下面我们给出使用范例,
(范例源码可以在这里获取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)
。
首先定义接口如下:
public interface DemoService {
Mono < String > requestMonoWithMonoArg( Mono < String > m1, Mono < String > m2);
Flux < String > requestFluxWithFluxArg( Flux < String > f1, Flux < String > f2);
}
然后实现该 demo 接口:
public class DemoServiceImpl implements DemoService {
@Override
public Mono < String > requestMonoWithMonoArg(
Mono < String > m1, Mono < String > m2) {
return m1.zipWith(m2, new BiFunction < String , String , String >() {
@Override
public String apply( String s, String s2) {
return s+ " " +s2;
}
});
}
@Override
public Flux < String > requestFluxWithFluxArg( Flux < String > f1, Flux < String > f2) {
return f1.zipWith(f2, new BiFunction < String , String , String
>() {
@Override
public String apply( String s, String s2) {
return s+ " " +s2;
}
});
}
}
然后配置并启动服务端,注意协议名字填写 rsocket:
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo = "http://dubbo.apache.org/schema/dubbo"
xmlns = "http://www.springframework.org/schema/beans"
xsi:schemaLocation = "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd" >
name = "demo-provider" />
address = "zookeeper://127.0.0.1:2181" />
name = "rsocket" port = "20890" />
id = "demoService" class = "org.apache.dubbo.samples.basic.impl.DemoServiceImpl" />
interface = "org.apache.dubbo.samples.basic.api.DemoService" ref = "demoService" />
public class RsocketProvider {
public static void main( String [] args) throws
Exception {
new EmbeddedZooKeeper ( 2181 , false ).start();
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext ( new String []{ "spring/rsocket-provider.xml" });
context.start();
System .in.read(); // press any key to exit
}
}
然后配置并启动消费者消费者如下, 注意协议名填写 rsocket:
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo = "http://dubbo.apache.org/schema/dubbo"
xmlns = "http://www.springframework.org/schema/beans"
xsi:schemaLocation = "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd" >
name = "demo-consumer" />
address = "zookeeper://127.0.0.1:2181" />
id = "demoService" check = "true" interface = "org.apache.dubbo.samples.basic.api.DemoService" />
public class RsocketConsumer {