netty是一个经典的网络框架,提供了基于NIO、AIO的方式来完成少量线程支持海量用户请求连接的模型。netty里面充斥了大量的非阻塞回调模式,主要是靠Future/Promise异步模型来实现的。
Future是java.util.concurrent.Future,是Java提供的接口,可以用来做异步执行的状态获取,它避免了异步任务在调用者那里阻塞等待,而是让调用者可以迅速得到一个Future对象,后续可以通过Future的方法来获取执行结果。
Java的Future有一个比较尴尬的问题,就是当你想获取异步执行结果时,要通过future.get()方法,这一步还是阻塞的!而且我们无法确定到底异步任务何时执行完毕,提前get了,就还是阻塞,get晚了,可能会漏掉执行结果,写个死循环,不停去轮询是否执行完毕,又浪费资源。所以,这个Future并不好用。
先来看一下Java的future使用:
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future future = executor.submit(new Task());
System.out.println(future.get());
executor.shutdown();
}
private static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(2000);
return 1
;
}
}
}
代码很简单,就是将一个Runnable、Callable的实例放到一个线程池里,就会返回一个Future对象。后续通过future.get()取得执行结果,但事实上代码并没有达到异步回调的结果,而是get时阻塞了。
理想状态其实是netty的future,可以添加Listener,当异步任务执行完毕后,主动回调一下自己就可以了,不必在那苦等get()方法的执行结果。
看一个netty的回调的小例子:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new IdleStateHandler(10, 0, 0), new NettyClientHandler())
;
}
});
ChannelFuture channelFuture = bootstrap.connect().sync().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
});
我们可以理解为bootstrap.connect这一步是一个耗时操作,我不想在那等待它执行完毕,而是希望它执行完毕后主动给我一个回调即可。所以,在connect后面有个addListener,当connect完成后,会回调operationComplete方法。
可以看到netty的这种回调方式比较优雅,不像java的future那样需要阻塞get。整个netty里面大量充斥着类似的回调,但是如果我们要用,仅仅是针对一个或多个异步任务,希望能有个类似的回调,netty就帮不上忙了。
打开netty的源码,想搞明白future、promise的逻辑,一眼看去,心情是这样的:
netty是为特定的场景设计的,里面的各种逻辑也是为了服务于netty本身。
当看不懂,或难以理解它的工作逻辑时,我们可以考虑自己实现一个对任意异步线程进行回调的框架。
首先我们来拆分一下需求,我有N个耗时任务,可能是一次网络请求,可能是一个耗时文件IO,可能是一堆复杂的逻辑,我在主线程里发起这个任务的调用,但不希望它阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。
据此,我们拆分出几个角色,master主线程,调度器(发起异步调用),worker(异步工作线程)。然后就是将他们组合起来,完成各种异步回调,以及每个worker的正常、异常、超时等的回调。
下面来看一下worker的定义:
public interface Worker {
String action(Object object);
}
一个worker,它需要有个方法,来代表这个worker将来做什么,action就可以理解为一个耗时任务。action可以接收一个参数。
再看一下回调器的定义:
public interface Listener {
void result(Object result)
;
}
这个listener用来做为回调,将worker的执行结果,放到result的参数里。
此外,我们还需要一个包装器Wrapper,来将worker和回调器包装一下。
public class Wrapper {
private Object param;
private Worker worker;
private Listener listener;
public Object getParam() {
return param;
}
public void setParam(Object param) {
this.param = param;
}
public Worker getWorker() {
return worker;
}
public void setWorker(Worker worker) {
this.worker = worker;
}
public Listener getListener() {
return listener;
}
public void addListener(Listener listener) {
this.listener = listener;
}
}
OK,下面就是主逻辑了。
public class Bootstrap {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
Worker worker = bootstrap.newWorker();
Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");
bootstrap.doWork(wrapper).addListener(new Listener() {
@Override
public void result(Object result) {
System.out.println(Thread.currentThread().getName());
System.out.println(result);
}
});
System.out.println(Thread.currentThread().getName());
}
private Wrapper doWork(Wrapper wrapper) {
new Thread(() -> {
Worker worker = wrapper.getWorker();
String result = worker.action(wrapper.getParam());
wrapper.getListener().result(result);
}).start();
return wrapper;
}
private Worker newWorker() {
return new Worker() {
@Override
public String action(Object object) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return object + " world";
}
};
}
}
执行结果如下:
可以看到主线程没有被耗时的线程阻塞掉,耗时线程在执行完毕后,进行了回调。
这就是一个简单的设计模式——“监听器模式”,再来认识一下这种设计模式的三个要素:事件源(被监听的对象)、事件对象(事件完毕这个动作)、监听器(我们的Listener)。
完成了这样的小demo,立马从netty的复杂中恢复了过来,心情变成了这样:
public class BootstrapNew {
public static void main(String[] args) {
BootstrapNew bootstrap = new BootstrapNew();
Worker worker = bootstrap.newWorker();
Wrapper wrapper = new Wrapper();
wrapper.setWorker(worker);
wrapper.setParam("hello");
wrapper.addListener(new Listener() {
@Override
public void result(Object result) {
System.out.println(result);
}
});
CompletableFuture future = CompletableFuture.supplyAsync(() -> bootstrap.doWork(wrapper));
try {
future.get(800, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
wrapper.getListener().result("time out exception");
}