公众号后台回复“
学习
”,获取作者独家秘制精品资料
扫描下方海报二维码,试听课程:
本文来自公众号读者王码农的投稿
感谢王码农同学的技术分享
简介
如果大家对RPC有一些了解的话,或多或者都会听到过一些大名鼎鼎的RPC框架,比如Dobbo、gRPC。但是大部分人对于他们底层的实现原理其实不甚了解。
有一种比较好的学习方式:就是如果你想要了解一个框架的原理,你可以尝试去写一个简易版的框架出来,就比如如果你想理解Spring IOC的思想,最好的方式就是自己实现一个小型的IOC容器,自己慢慢体会。
所以本文尝试带领大家去设计一个小型的RPC框架,同时对于框架会保持一些拓展点。
通过阅读本文,你可以收获:
-
理解RPC框架最核心的理念
-
学习在设计框架的时候,如何保持拓展性
本文会依赖一些组件,他们是实现RPC框架必要的一些知识,文中会尽量降低这些知识带来的障碍。但是,最好期望读者有以下知识基础:
RPC框架应该长什么样子
我们首先来看一下:一个RPC框架是什么东西?
我们最直观的感觉就是:
集成了RPC框架之后,通过配置一个注册中心的地址。一个应用(称为服务提供者)将某个接口(interface)“暴露”出去,另外一个应用(称为服务消费者)通过“引用”这个接口(interface),然后调用了一下,就很神奇的可以调用到另外一个应用的方法了
给我们的感觉就好像调用了一个本地方法一样。即便两个应用不是在同一个JVM中甚至两个应用都不在同一台机器中。
那他们是如何做到的呢?
其实啊,当我们的服务消费者调用某个RPC接口的方法之后,它的底层会通过动态代理,然后经过网络调用,去到服务提供者的机器上,然后执行对应的方法。
接着方法的结果通过网络传输返回到服务消费者那里,然后就可以拿到结果了。
整个过程如下图:
那么这个时候,可能有人会问了:
服务消费者怎么知道服务提供者在哪台机器的哪个端口呢?
这个时候,就需要“
注册中心
”登场了,具体来说是这样子的:
这样一来,服务消费者就有了一份服务提供者所在的机器列表了
“服务消费者”拿到了“服务提供者”的机器列表就可以通过网络请求来发起请求了。
网络客户端,我们应该采用什么呢?有几种选择:
-
使用JDK原生BIO(也就是ServerSocket那一套)。阻塞式IO方法,无法支撑高并发。
-
使用JDK原生NIO(Selector、SelectionKey那一套)。非阻塞式IO,可以支持高并发,但是自己实现复杂,需要处理各种网络问题。
-
使用大名鼎鼎的NIO框架Netty,天然支持高并发,封装好,API易用。
作为一个有追求的程序员,我们要求开发出来的框架要求支持高并发、又要求简单、还要快。当然是选择Netty来实现了,使用Netty的一些很基本的API就能满足我们的需求。
网络协议定义
当然了,既然我们要使用网络传输数据。我们首先要定义一套网络协议出来。
你可能又要问了,
啥叫网络协议?
网络协议,通俗理解,意思就是说我们的客户端发送的数据应该长什么样子,服务端可以去解析出来知道要做什么事情。话不多说,上代码:
假设我们现在服务提供者有两个类:
public interface HelloService {
String sayHello(TestBean testBean);
}
public class TestBean {
private String name;
private Integer age;
public TestBean(String name, Integer age) {
this.name = name;
this.age = age;
}
}
现在我要调用HelloService.sayHello(TestBean testBean)这个方法
作为“服务消费者”,应该怎么定义我们的请求,从而让服务端知道我是要调用这个方法呢?
这需要我们将这个接口信息产生一个唯一的标识: 这个标识会记录了接口名、具体是那个方法、然后具体参数是什么!
然后将这些信息组织起来发送给服务端,我这里的方式是将信息保存为一个JSON格式的字符串来传输。
比如上面的接口我们传输的数据大概是这样的:
{
"interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean",
"requestId": "3",
"parameter": {
"com.study.rpc.test.producer.TestBean": {
"age": 20,
"name": "张三"
}
}
}
嗯,我这里用一个JSON来标识这次调用是调用哪个接口的哪个方法,其中interface标识了唯一的类,parameter标识了里面具体有哪些参数, 其中key就是参数的类全限定名,value就是这个类的JSON信息。
可能看到这里,大家可能有意见了: 数据不一定用JSON格式传输啊,而且使用JSON也不一定性能最高啊。
你使用JDK的Serializable配合Netty的ObjectDecoder来实现,这当然也可以,其实这里是一个拓展点,我们应该要提供多种序列化方式来供用户选择
但是这里选择了JSON的原因是因为它比较直观,对于写文章来说比较合理。
开发服务提供者
嗯,搞定了网络协议之后,我们开始开发“服务提供者”了。对于服务提供者,因为我们这里是写一个简单版本的RPC框架,为了保持简洁。
我们不会引入类似Spring之类的容器框架,所以我们需要定义一个服务提供者的配置类,它用于定义这个服务提供者是什么接口,然后它具体的实例对象是什么:
public class ServiceConfig{
public Class type;
public T instance;
public ServiceConfig(Classtype, T instance) {
this.type = type;
this.instance = instance;
}
public ClassgetType() {
return type;
}
public void setType(Classtype) {
this.type = type;
}
public T getInstance() {
return instance;
}
public void setInstance(T instance) {
this.instance = instance;
}
}
有了这个东西之后,我们就知道需要暴露哪些接口了。
为了框架有一个统一的入口,我定义了一个类叫做
ApplicationContext
,可以认为这是一个应用程序上下文,他的构造函数,接收2个参数,代码如下:
public ApplicationContext(String registryUrl, ListserviceConfigs){
this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;
initRegistry(registryUrl);
RegistryInfo registryInfo = null;
InetAddress addr = InetAddress.getLocalHost();
String hostname = addr.getHostName();
String hostAddress = addr.getHostAddress();
registryInfo = new RegistryInfo(hostname, hostAddress, port);
doRegistry(registryInfo);
if (!this.serviceConfigs.isEmpty()) {
nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
nettyServer.init(port);
}
}
注册中心设计
这里分为几个步骤,首先保存了接口配置,接着初始化注册中心,因为注册中心可能会提供多种来供用户选择,所以这里需要定义一个注册中心的接口:
public interface
Registry {
void register(Class clazz, RegistryInfo registryInfo) throws Exception;
}
这里我们提供一个注册的方法,这个方法的语义是将clazz对应的接口注册到注册中心。接收两个参数,一个是接口的class对象,另一个是注册信息,
里面包含了本机的一些基本信息,如下:
public class RegistryInfo {
private String hostname;
private String ip;
private Integer port;
public RegistryInfo(String hostname, String ip, Integer port) {
this.hostname = hostname;
this.ip = ip;
this.port = port;
}
}
好了,定义好注册中心,回到之前的实例化注册中心的地方,代码如下:
private Registry registry;
private void initRegistry(String registryUrl) {
if (registryUrl.startsWith("zookeeper://")) {
registryUrl = registryUrl.substring(12);
registry = new ZookeeperRegistry(registryUrl);
} else if (registryUrl.startsWith("multicast://")) {
registry = new MulticastRegistry(registryUrl);
}
}
这里逻辑也非常简单,就是根据url的schema来判断是那个注册中心
注册中心这里实现了2个实现类,分别使用zookeeper作为注册中心,另外一个是使用广播的方式作为注册中心。
广播注册中心这里仅仅是做个示范,内部没有实现。我们主要是实现了zookeeper的注册中心。
(当然了,如果有兴趣,可以实现更多的注册中心供用户选择,比如redis之类的,这里只是为了保持“拓展点”)
那么实例化完注册中心之后,回到上面的代码:
注册服务提供者
RegistryInfo registryInfo = null;
InetAddress addr = InetAddress.getLocalHost();
String hostname = addr.getHostName();
String hostAddress = addr.getHostAddress();
registryInfo = new RegistryInfo(hostname, hostAddress, port);
doRegistry(registryInfo);
这里逻辑很简单,就是获取本机的的基本信息构造成RegistryInfo,然后调用了doRegistry方法:
private MapinterfaceMethods = new ConcurrentHashMap<>();
private void doRegistry(RegistryInfo registryInfo) throws Exception {
for (ServiceConfig config : serviceConfigs) {
Class type = config.getType();
registry.register(type, registryInfo);
Method[] declaredMethods = type.getDeclaredMethods();
for (Method method : declaredMethods) {
String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);
interfaceMethods.put(identify, method);
}
}
}
这里做了2件事情:
下面分别分析这两件事情,首先是注册方法:
因为我们用到了zookeeper,为了方便,引入了zookeeper的客户端框架curator
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>2.3.0version>
dependency>
接着看代码:
public class ZookeeperRegistry implements Registry {
private CuratorFramework client;
public ZookeeperRegistry(String connectString) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start()
;
try {
Stat myRPC = client.checkExists().forPath("/myRPC");
if (myRPC == null) {
client.create()
.creatingParentsIfNeeded()
.forPath("/myRPC");
}
System.out.println("Zookeeper Client初始化完毕......");
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void register(Class clazz, RegistryInfo registryInfo) throws Exception {
Method[] declaredMethods = clazz.getDeclaredMethods();
for (Method method : declaredMethods) {
String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
String path = "/myRPC/" + key;
Stat stat = client.checkExists().forPath(path);
ListregistryInfos;
if (stat != null) {
byte[] bytes = client.getData().forPath(path);
String data = new String(bytes, StandardCharsets.UTF_8);
registryInfos = JSONArray.parseArray(data, RegistryInfo.class);
if (registryInfos.contains(registryInfo)) {
System.out.println("地址列表已经包含本机【" + key + "】,不注册了");
} else {
registryInfos.add(registryInfo);
client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
System.out.println("注册到注册中心,路径为:【" + path + "】 信息为:" + registryInfo);
}
} else {
registryInfos = new ArrayList<>();
registryInfos.add(registryInfo);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
System.out.println("注册到注册中心,路径为:【" + path + "】 信息为:" + registryInfo);
}
}
}
}
zookeeper注册中心在初始化的时候,会建立好连接。然后注册的时候,针对clazz接口的每一个方法,都会生成一个唯一标识
这里使用了InvokeUtils.buildInterfaceMethodIdentify方法:
public static String buildInterfaceMethodIdentify(Class clazz, Method method) {
Map<String, String> map = new LinkedHashMap<>()
map.put("interface", clazz.getName())
map.put("method", method.getName())
Parameter[] parameters = method.getParameters()
if (parameters.length > 0) {
StringBuilder param = new StringBuilder();
for (int i = 0
Parameter p = parameters[i]
param.append(p.getType().getName())
if (i < parameters.length - 1) {
param.append(",")
}
}
map.put("parameter", param.toString())
}
return map2String(map)
}
public static String map2String(Map<String, String> map) {
StringBuilder sb = new StringBuilder();
IteratorString>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
sb.append(entry.getKey() + "=" + entry.getValue())
if (iterator.hasNext()) {
sb.append("&")
}
}
return sb.toString()
}
其实就是对接口的方法使用他们的限定名和参数来组成一个唯一的标识,比如
HelloService#sayHello(TestBean)
生成的大概是这样的:
interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean
接下来的逻辑就简单了,在Zookeeper中的/myRPC路径下面建立临时节点,节点名称为我们上面的接口方法唯一标识,数据内容为机器信息。
之所以采用临时节点是因为:如果机器宕机了,连接断开之后,消费者可以通过zookeeper的watcher机制感知到
大概看起来是这样的:
/myRPC/interface=com.study.rpc.test.producer.HelloService
&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean
[
{
"hostname":peer1,
"port":8080
},
{
"hostname":peer2,
"port":8081
}
]
通过这样的方式,在服务消费的时候就可以拿到这样的注册信息,然后知道可以调用那台机器的那个端口。
好了,注册中心弄完了之后,我们回到前面说的注册方法做的第二件事情,我们将每一个接口方法标识的方法放入了一个map中:
private Map<String, Method> interfaceMethods = new ConcurrentHashMap<>();
这个的原因是因为,我们在收到网络请求的时候,需要调用反射的方式调用method对象,所以存起来。
启动网络服务端接受请求
接下来我们就可以看第四步了:
if (!this.serviceConfigs.isEmpty()) {
nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
nettyServer.init(port);
}
因为这里使用Netty来做的所以需要引入Netty的依赖:
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>4.1.30.Finalversion>
dependency>
接着来分析:
public class NettyServer {
private RpcInvokeHandler rpcInvokeHandler;
public NettyServer(ListserverConfigs, MapinterfaceMethods) throws InterruptedException {
this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods);
}
public int init(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$$");
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast().addLast(rpcInvokeHandler);
}
});
ChannelFuture sync = b.bind(port).sync();
System.out.println("启动NettyService,端口为:" + port);
return port;
}
}
这部分主要的都是netty的api,我们不做过多的说明,就简单的说一下:
public class RpcInvokeHandler extends ChannelInboundHandlerAdapter {
private Map<String, Method> interfaceMethods;
private MapObject> interfaceToInstance;
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,
50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
new ThreadFactory() {
AtomicInteger m = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "IO-thread-" + m.incrementAndGet());
}
});
public RpcInvokeHandler(ListserviceConfigList,
Map<String, Method> interfaceMethods) {
this.interfaceToInstance = new ConcurrentHashMap<>();
this.interfaceMethods = interfaceMethods;
for (ServiceConfig config : serviceConfigList) {
interfaceToInstance.put(config.getType(), config.getInstance());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String
message = (String) msg;
System.out.println("接收到消息:" + msg);
RpcRequest request = RpcRequest.parse(message, ctx);
threadPoolExecutor.execute(new RpcInvokeTask(request));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生了异常..." + cause);
cause.printStackTrace();
ctx.close();
}
public class RpcInvokeTask implements Runnable {
private RpcRequest rpcRequest;
RpcInvokeTask(RpcRequest rpcRequest) {
this.rpcRequest = rpcRequest;
}
@Override
public void run() {
try {
String interfaceIdentity = rpcRequest.getInterfaceIdentity();
Method method = interfaceMethods.get(interfaceIdentity);
Map<String, String> map = string2Map(interfaceIdentity);
String interfaceName = map.get("interface");
Class interfaceClass = Class.forName(interfaceName);
Object o = interfaceToInstance.get(interfaceClass);
String parameterString = map.get("parameter");
Object result;
if (parameterString != null) {
String[] parameterTypeClass = parameterString.split(",");
Map<String, Object> parameterMap = rpcRequest.getParameterMap();
Object[] parameterInstance = new Object[parameterTypeClass.length];
for (int i = 0; i < parameterTypeClass.length; i++) {
String parameterClazz = parameterTypeClass[i];
parameterInstance[i] = parameterMap.get(parameterClazz);
}
result = method.invoke(o, parameterInstance);
} else {
result = method.invoke(o);
}
ChannelHandlerContext ctx = rpcRequest.getCtx();
String requestId = rpcRequest.getRequestId();
RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity,
requestId);
String s = JSONObject.toJSONString(response) + "$$";
ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
ctx.writeAndFlush(byteBuf);
System.out.println("响应给客户端:" + s);
} catch (Exception e) {
e.printStackTrace();
}
}
public static Map<String, String> string2Map(String str) {
String[] split = str.split("&");
Map<String, String> map = new HashMap<>(16);
for (String s : split) {
String[] split1 = s.split("=");
map.put(split1[0], split1[1]);
}
return map;
}
}
}
这里说明一下上面的逻辑:
channelRead方法用于接收消息,接收到的就是我们前面分析的那个JSON格式的数据,接着我们将消息解析成RpcRequest
public class RpcRequest {
private String interfaceIdentity;
private Map<String, Object> parameterMap = new HashMap<>();
private ChannelHandlerContext ctx;
private String requestId;
public static RpcRequest parse(String message, ChannelHandlerContext ctx) throws ClassNotFoundException {
JSONObject jsonObject = JSONObject.parseObject(message);
String interfaces = jsonObject.getString("interfaces");
JSONObject parameter = jsonObject.getJSONObject("parameter");
Set<String> strings = parameter.keySet();
RpcRequest request = new RpcRequest();
request.setInterfaceIdentity(interfaces);
Map<String, Object> parameterMap = new HashMap<>(16);
String requestId = jsonObject.getString("requestId");
for (String key : strings) {
if (key.equals("java.lang.String")) {
parameterMap.put(key, parameter.getString(key));
} else {
Class clazz = Class.forName(key);
Object object = parameter.getObject(key, clazz);
parameterMap.put(key, object);
}
}
request.setParameterMap(parameterMap);
request.setCtx(ctx);
request.setRequestId(requestId);
return request;
}
}
接着从request中解析出来需要调用的接口,然后通过反射调用对应的接口,得到结果后我们将响应封装成PrcResponse写回给客户端:
public
class RpcResponse {
private String result;
private String interfaceMethodIdentify;
private String requestId;
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public static RpcResponse create(String result, String interfaceMethodIdentify, String requestId) {
RpcResponse response = new RpcResponse();
response.setResult(result);
response.setInterfaceMethodIdentify(interfaceMethodIdentify);
response.setRequestId(requestId);
return response;
}
}
里面包含了请求的结果JSON串,接口方法唯一标识,请求ID。数据大概看起来这个样子:
{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"牛逼,我收到了消息:TestBean{name='张三', age=20}\""}
通过这样的信息,客户端就可以通过响应结果解析出来。
测试服务提供者
既然我们代码写完了,现在需要测试一把:
首先我们先写一个HelloService的实现类出来:
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(TestBean testBean) {
return "牛逼,我收到了消息:" + testBean;
}
}
接着编写服务提供者代码:
public class TestProducer {
public static void main(String[] args) throws Exception {
String connectionString = "zookeeper://localhost1:2181,localhost2:2181,localhost3:2181";
HelloService service = new HelloServiceImpl();
ServiceConfig config = new ServiceConfig<>(HelloService.class, service);
ListserviceConfigList = new ArrayList<>();
serviceConfigList.add(config);
ApplicationContext ctx = new ApplicationContext(connectionString, serviceConfigList,
null, 50071);
}
}
接着启动起来,看到日志:
Zookeeper Client初始化完毕......
注册到注册中心,路径为:【/myRPC/interface=com.study.rpc.test.producer.HelloService&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean】
信息为:RegistryInfo{hostname='localhost', ip='192.168.16.7', port=50071}
启动NettyService,端口为:50071
这个时候,我们期望用NettyClient发送请求:
{
"interfaces": "interface=com.study.rpc.test.producer.HelloService&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean",
"requestId": "3",
"parameter": {
"com.study.rpc.test.producer.TestBean": {
"age": 20,
"name": "张三"
}
}
}
得到的响应应该是:
{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"牛逼,我收到了消息:TestBean{name='张三', age=20}\""}
那么,可以编写一个测试程序(这个程序仅仅用于中间测试用,读者不必理解):
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try
{
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture sync = b.connect("127.0.0.1", 50071).sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private static class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject jsonObject = new JSONObject();
jsonObject.put("interfaces", "interface=com.study.rpc.test.producer" +
".HelloService&method=sayHello¶meter=com.study.rpc.test.producer.TestBean");
JSONObject param = new JSONObject();
JSONObject bean = new JSONObject();
bean.put("age", 20);
bean.put("name", "张三");
param.put("com.study.rpc.test.producer.TestBean", bean);
jsonObject.put("parameter", param);
jsonObject.put("requestId", 3);
System.out.println("发送给服务端JSON为:" + jsonObject.toJSONString());
String msg = jsonObject.toJSONString() + "$$";
ByteBuf byteBuf = Unpooled.buffer(msg.getBytes().length);
byteBuf.writeBytes(msg.getBytes());
ctx.writeAndFlush(byteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到消息:" + msg);
}
}
}
启动之后,看到控制台输出:
发送给服务端JSON为:{"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello&
parameter=com.study.rpc.test.producer.TestBean","requestId":3,
"parameter":{"com.study.rpc.test.producer.TestBean":{"name":"张三","age":20}}}
收到消息:{"interfaceMethodIdentify":"interface=com.study.rpc.test.producer.HelloService&
method=sayHello¶meter=com.study.rpc.test.producer.TestBean","requestId":"3",
"result":"\"牛逼,我收到了消息:TestBean{name='张三', age=20}\""}
bingo,完美实现了RPC的服务提供者。接下来我们只需要实现服务消费者就完成了。
开发服务消费者
服务消费者是同样的处理,我们同样要定义一个消费者的配置:
public class ReferenceConfig{
private Class type;
public ReferenceConfig(Classtype) {
this.type = type;
}
public ClassgetType() {
return type;
}
public void setType(Classtype) {
this.type = type;
}
}
然后我们是统一入口,在ApplicationContext中修改代码:
public ApplicationContext(String registryUrl, ListserviceConfigs,
ListreferenceConfigs, int port) throws Exception {
this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;
this.referenceConfigs = referenceConfigs == null ? new ArrayList<>() : referenceConfigs;
}
private void doRegistry(RegistryInfo registryInfo) throws Exception {
for (ServiceConfig config : serviceConfigs) {
Class type = config.getType();
registry.register(type, registryInfo);
Method[] declaredMethods = type.getDeclaredMethods();
for (Method method : declaredMethods) {
String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);
interfaceMethods.put(identify, method);
}
}
for (ReferenceConfig config : referenceConfigs) {
ListregistryInfos = registry.fetchRegistry(config.getType());
if (registryInfos != null) {
interfacesMethodRegistryList.put(config.getType(), registryInfos);
initChannel(registryInfos);
}
}
}
在注册的时候,我们需要将需要消费的接口,通过注册中心抓取出来,所以注册中心要增加一个接口方法:
public interface Registry {
void register(Class clazz, RegistryInfo registryInfo) throws Exception;
ListfetchRegistry
(Class clazz) throws Exception;
}
获取服务提供者的机器列表
具体在Zookeeper中的实现如下:
@Override
public ListfetchRegistry(Class clazz) throws Exception {
Method[] declaredMethods = clazz.getDeclaredMethods();
ListregistryInfos = null;
for (Method method : declaredMethods) {
String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
String path = "/myRPC/" + key;
Stat stat = client.checkExists()
.forPath(path);
if (stat == null) {
System.out.println("警告:无法找到服务接口:" + path);
continue;
}
if (registryInfos == null) {
byte[] bytes = client.getData().forPath(path);
String data = new String(bytes, StandardCharsets.UTF_8);
registryInfos = JSONArray.parseArray(data, RegistryInfo.class);
}
}
return registryInfos;
}
其实就是去zookeeper获取节点中的数据,得到接口所在的机器信息,获取到的注册信息诸侯,就会调用以下代码:
if (registryInfos != null) {
interfacesMethodRegistryList.put(config.getType(), registryInfos);
initChannel(registryInfos);
}
private void initChannel(ListregistryInfos) throws InterruptedException {
for (RegistryInfo info : registryInfos) {
if (!channels.containsKey(info)) {
System.out.println("开始建立连接:" + info.getIp() + ", " + info.getPort());
NettyClient client = new NettyClient(info.getIp(), info.getPort());
client.setMessageCallback(message -> {
RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);
responses.offer(response);
synchronized (ApplicationContext.this) {
ApplicationContext.this.notifyAll();
}
});
ChannelHandlerContext ctx = client.getCtx();
channels.put(info, ctx);
}
}
}
我们会针对每一个唯一的RegistryInfo建立一个连接,然后有这样一段代码:
client.setMessageCallback(message -> {
RpcResponse response = JSONObject.parseObject(message, RpcResponse.class);
responses.offer(response);
synchronized (ApplicationContext.this) {
ApplicationContext.this.notifyAll();
}
});
设置一个callback,用于收到消息的时候,回调这里的代码,这部分我们后面再分析。
然后在client.getCtx()的时候,同步阻塞直到连接完成,建立好连接后通过,NettyClient的代码如下:
public class NettyClient {
private ChannelHandlerContext ctx;
private MessageCallback messageCallback;
public NettyClient(String ip, Integer port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$$".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture sync = b.connect(ip, port).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public void setMessageCallback(MessageCallback callback) {
this.messageCallback = callback;
}
public ChannelHandlerContext getCtx() throws InterruptedException {
System.out.println("等待连接成功...");
if (ctx == null) {
synchronized (this) {
wait();
}
}
return ctx;
}
private class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String message = (String) msg;
if (messageCallback != null) {
messageCallback.onMessage(message);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyClient.this.ctx = ctx;
System.out.println("连接成功:" + ctx);
synchronized (NettyClient.this) {
NettyClient.this.notifyAll();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
public interface MessageCallback {
void onMessage(String message);
}
}
这里主要是用了wait()和notifyAll()来实现同步阻塞等待连接建立。
建立好连接后,我们保存到集合中:
ChannelHandlerContext ctx = client.getCtx();
channels.put(info, ctx);
发送请求
好了,到了这里我们为每一个需要消费的接口建立了网络连接,接下来要做的事情就是提供一个接口给用户获取服务提供者实例:
我把这个方法写在ApplicationContext中:
private LongAdder requestIdWorker = new LongAdder();
@SuppressWarnings("unchecked")
publicT getService(Classclazz) {
return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
throw new IllegalAccessException("不能访问" + methodName + "方法");
}
if ("toString".equals(methodName)) {
return clazz.getName() + "#" + methodName;
}
ListregistryInfos = interfacesMethodRegistryList.get(clazz);
if (registryInfos == null) {
throw new RuntimeException("无法找到服务提供者