专栏名称: 亿级流量网站架构
开涛技术点滴
目录
相关文章推荐
程序员的那些事  ·  国产 DeepSeek V3 ... ·  4 天前  
OSC开源社区  ·  继V3之后,沐曦GPU再完成DeepSeek ... ·  3 天前  
码农翻身  ·  漫画 | ... ·  2 天前  
程序猿  ·  清晰的、模块化的编码风格 ·  4 天前  
码农翻身  ·  为何 Linus ... ·  3 天前  
51好读  ›  专栏  ›  亿级流量网站架构

netty的Future异步回调难理解?手写个带回调异步框架就懂了

亿级流量网站架构  · 公众号  · 程序员  · 2020-02-17 09:13

正文

netty是一个经典的网络框架,提供了基于NIO、AIO的方式来完成少量线程支持海量用户请求连接的模型。netty里面充斥了大量的非阻塞回调模式,主要是靠Future/Promise异步模型来实现的。

Future是java.util.concurrent.Future,是Java提供的接口,可以用来做异步执行的状态获取,它避免了异步任务在调用者那里阻塞等待,而是让调用者可以迅速得到一个Future对象,后续可以通过Future的方法来获取执行结果。



Jdk的Future不便之处

Java的Future有一个比较尴尬的问题,就是当你想获取异步执行结果时,要通过future.get()方法,这一步还是阻塞的!而且我们无法确定到底异步任务何时执行完毕,提前get了,就还是阻塞,get晚了,可能会漏掉执行结果,写个死循环,不停去轮询是否执行完毕,又浪费资源。所以,这个Future并不好用。

先来看一下Java的future使用:

import java.util.concurrent.*; /** * @author wuweifeng wrote on 2019-12-10 * @version 1.0 */public class Test {    public static void main(String[] args) throws ExecutionException, InterruptedException {        //创建线程池        ExecutorService executor = Executors.newCachedThreadPool();        Future future = executor.submit(new Task());        //这一步get会阻塞当前线程        System.out.println(future.get());         executor.shutdown();    }     private static class Task implements Callable<Integer> {         @Override        public Integer call() throws Exception {            System.out.println("子线程在进行计算");            Thread.sleep(2000);            return 1




    
;        }     } }


代码很简单,就是将一个Runnable、Callable的实例放到一个线程池里,就会返回一个Future对象。后续通过future.get()取得执行结果,但事实上代码并没有达到异步回调的结果,而是get时阻塞了。



Netty future无法单独抽出来使用

理想状态其实是netty的future,可以添加Listener,当异步任务执行完毕后,主动回调一下自己就可以了,不必在那苦等get()方法的执行结果。


看一个netty的回调的小例子:

EventLoopGroup group = new NioEventLoopGroup();try {    Bootstrap bootstrap = new Bootstrap();     bootstrap.group(group).channel(NioSocketChannel.class)            .remoteAddress(host, port)            .option(ChannelOption.SO_KEEPALIVE, true)            .option(ChannelOption.TCP_NODELAY, true)            .handler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel ch) throws Exception {                    ch.pipeline()                            .addLast(new StringDecoder())                            .addLast(new StringEncoder())                            //10秒没消息时,就发心跳包过去                            .addLast(new IdleStateHandler(10, 0, 0), new NettyClientHandler())                    ;                }            });    ChannelFuture channelFuture = bootstrap.connect().sync().addListener(new ChannelFutureListener() {        @Override




    
        public void operationComplete(ChannelFuture channelFuture) throws Exception {            //do your job        }    });

我们可以理解为bootstrap.connect这一步是一个耗时操作,我不想在那等待它执行完毕,而是希望它执行完毕后主动给我一个回调即可。所以,在connect后面有个addListener,当connect完成后,会回调operationComplete方法。

可以看到netty的这种回调方式比较优雅,不像java的future那样需要阻塞get。整个netty里面大量充斥着类似的回调,但是如果我们要用,仅仅是针对一个或多个异步任务,希望能有个类似的回调,netty就帮不上忙了。

打开netty的源码,想搞明白future、promise的逻辑,一眼看去,心情是这样的:



如何实现一个简单带回调异步任务

netty是为特定的场景设计的,里面的各种逻辑也是为了服务于netty本身。 当看不懂,或难以理解它的工作逻辑时,我们可以考虑自己实现一个对任意异步线程进行回调的框架。

首先我们来拆分一下需求,我有N个耗时任务,可能是一次网络请求,可能是一个耗时文件IO,可能是一堆复杂的逻辑,我在主线程里发起这个任务的调用,但不希望它阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。

据此,我们拆分出几个角色,master主线程,调度器(发起异步调用),worker(异步工作线程)。然后就是将他们组合起来,完成各种异步回调,以及每个worker的正常、异常、超时等的回调。


下面来看一下worker的定义:

/** * @author wuweifeng wrote on 2019-12-13 * @version 1.0 */public interface Worker {    String action(Object object);}


一个worker,它需要有个方法,来代表这个worker将来做什么,action就可以理解为一个耗时任务。action可以接收一个参数。

再看一下回调器的定义:


/** * @author wuweifeng wrote on 2019-12-13 * @version 1.0 */public interface Listener {    void result(Object result)




    
;}


这个listener用来做为回调,将worker的执行结果,放到result的参数里。

此外,我们还需要一个包装器Wrapper,来将worker和回调器包装一下。


public class Wrapper {    private Object param;    private Worker worker;    private Listener listener;     public Object getParam() {        return param;    }     public void setParam(Object param) {        this.param = param;    }     public Worker getWorker() {        return worker;    }     public void setWorker(Worker worker) {        this.worker = worker;    }     public Listener getListener() {        return listener;    }     public void addListener(Listener listener) {        this.listener = listener;    }}


OK,下面就是主逻辑了。


/** * @author wuweifeng wrote on 2019-12-13 * @version 1.0 */public class Bootstrap {     public static void main(String[] args) {        Bootstrap bootstrap = new Bootstrap();         Worker worker = bootstrap.newWorker();                 Wrapper wrapper = new Wrapper();        wrapper.setWorker(worker);        wrapper.setParam("hello");                 bootstrap.doWork(wrapper).addListener(new Listener() {            @Override            public void result(Object result) {                System.out.println(Thread.currentThread().getName());                System.out.println(result);            }        });         System.out.println(Thread.currentThread().getName());     }     private Wrapper doWork(Wrapper wrapper) {        new Thread(() -> {            Worker worker = wrapper.getWorker();            String result = worker.action(wrapper.getParam());            wrapper.getListener().result(result);        }).start();         return wrapper;    }     




    
    private Worker newWorker() {        return new Worker() {            @Override            public String action(Object object) {                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                return object + " world";            }        };    } }


执行结果如下:



可以看到主线程没有被耗时的线程阻塞掉,耗时线程在执行完毕后,进行了回调。

这就是一个简单的设计模式——“监听器模式”,再来认识一下这种设计模式的三个要素:事件源(被监听的对象)、事件对象(事件完毕这个动作)、监听器(我们的Listener)。

完成了这样的小demo,立马从netty的复杂中恢复了过来,心情变成了这样:




实现一个简单带回调、超时异步任务

public class BootstrapNew {     public static void main(String[] args) {        BootstrapNew bootstrap = new BootstrapNew();         Worker worker = bootstrap.newWorker();         Wrapper wrapper = new Wrapper();        wrapper.setWorker(worker);        wrapper.setParam("hello");        //添加结果回调器        wrapper.addListener(new Listener() {            @Override            public void result(Object result) {                System.out.println(result);            }        });         CompletableFuture future = CompletableFuture.supplyAsync(() -> bootstrap.doWork(wrapper));        try {            future.get(800, TimeUnit.MILLISECONDS);        } catch (InterruptedException | TimeoutException | ExecutionException e) {            //超时了            wrapper.getListener().result("time out exception");        }






请到「今天看啥」查看全文