专栏名称: 架构师社区
架构师小秘圈,聚集10万架构师的小圈子!不定期分享技术干货,行业秘闻,汇集各类奇妙好玩的话题和流行动向!禁止截图,阅后即焚!
目录
相关文章推荐
每日商报  ·  傅园慧,有新身份! ·  20 小时前  
每日商报  ·  傅园慧,有新身份! ·  20 小时前  
笔吧评测室  ·  MNT Reform Next ... ·  2 天前  
每日人物  ·  如何让一个老人不再孤独? ·  2 天前  
笔吧评测室  ·  华硕灵耀景德镇陶瓷鉴赏会官宣 1 月 13 ... ·  3 天前  
环球人物  ·  花20万教农民弹钢琴,她美出圈! ·  4 天前  
51好读  ›  专栏  ›  架构师社区

分析 JDK 源码丨Java NIO

架构师社区  · 公众号  ·  · 2019-11-06 11:43

正文

作者:MobMsg,资深全端工程师一枚,架构师社区合伙人!


Java IO

Java 中传统的 IO 包基于流模型实现,交互方式为同步、阻塞,当发生读取或写入操作时,线程会阻塞在此,直到操作完成。编码时采用这种方式虽然源码较直观易维护,但容易产生应用性能下降问题,且 IO 效率及其拓展性存在较大局限


InputStream/OutputStream vs Reader/Writer:IO 中的输入/输出流(InputStream/OutputStream)用于读写字节,直接操作文件本身。Reader/Writer 用于操作字符,处理的是如文本文件内包含的信息内容


使用 io 相关工具类,切记进行资源释放,否则资源将被占用且无法释放,释放的方式可选 try-finall 机制。io 除了用于对文件的操作,网络编程中的 Socket、ServerSocket、HttpURLConnection 也是 io 操作,因为网络通信也属 io 行为


Java NIO

Java 1.4 开始引入 NIO 框架,提供了 Channel(通道)、Selector(IO复用器/选择器)、Buffer(缓冲区),可构建多路复用、同步非阻塞的 IO 程序,同时在数据操作方式方面更接近操作系统底层所以性能更高

在 Java 7 中,NIO 再次改进,引入异步非阻塞 IO 方式,基于事件和回调机制,也就是处理开始时不阻塞,处理完成后系统通知对应线程继续后续工作。这种方式也称为 NIO 2 或 AIO(Asynchronous IO)


NIO Channel、Buffer、Selector

在 IO 中数据操作基于字节流或字符流,在 NIO 中数据基于 Channel 和 Buffer 进行操作

Channel(通道):在缓冲区和位于通道另一侧的服务之间进行数据传输,支持单向或双向传输,支持阻塞或非阻塞模式

Buffer(缓冲区):高效数据容器。本质上是一块内存区,用于数据的读写,NIO Buffer 将其包裹并提供开发时常用的接口,便于数据操作

Selector(IO复用器/选择器):多路复用的重要组成部分,检查一个或多个Channel(通道)是否是可读、写状态,实现单线程管理多Channel(通道),优于使用多线程或线程池产生的系统资源开销


NIO Channel

Channel 在缓冲区和位于通道另一侧的服务之间进行数据传输,支持单向或双向传输,支持阻塞或非阻塞模式

几种主要的 Channel 类型(FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel)已覆盖了我们日常开发常见场景,如不同网络传输IO、文件IO。Channel 中的数据支持以异步方式存、取至 Buffer


NIO Buffer

Buffer 是高效数据容器。它本质上是一块内存区,基于数组实现存储,用于数据的读写,NIO Buffer 将其包裹并提供开发时常用的接口,便于数据操作

Client 向 Buffer 写入数据后,调用 flip() 将 Buffer 由写模式更改为读模式,此时 Channel 可以读取 Buffer 内数据,读取完成后可调用 clear()compact() 重置缓冲区并允许数据写入


NIO Selector

Selector 是 NIO 多路复用的重要组成部分。它负责检查一个或多个Channel(通道)是否是可读、写状态,实现单线程管理多 Channel(通道),优于使用多线程或线程池产生的系统资源开销

向 Selector 注册 Channel 时可指定的类型有四种:OP_READOP_WRITEOP_CONNECTOP_ACCEPT

// 读
public static final int OP_READ = 1 <0; 

// 写
public static final int OP_WRITE = 1 <2; 

// 连接;SocketChannel 独有,其它类型 channel 不支持
public static final int OP_CONNECT = 1 <3; 

 // 接收;ServerSocketChannel 独有,其它类型 channel 不支持
public static final int OP_ACCEPT = 1 <4;

注册时可以为同一通道注册多个感兴趣的事件,互相之间使用操作符 * | * 位或连接,如:

Channel.register(Selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);

当 Channel(通道)  注册至 Selector 内后,便会产生一个对应的 SelectionKey,存储与此 Channel 相关的数据


NIO SelectionKey

SelectionKey 是 channel 在 Selector 内注册的标识,每个 channel 对应一个 SelectionKey, SelectionKey 内包含有如下属性:

interest Set:兴趣集合,当前 channel 感兴趣的操作
ready Set:就绪集合,此 SelectionKey  已经准备就绪的操作集合
Channel:通道,获取此 SelectionKey 对应的 channel
Selector:选择器,管理此 channel 的 Selector
Attach:附加对象,向 SelectionKey 中添加更多的信息,方便之后的数据操作判断或获取


FileChannel 主要方法

java.io.RandomAccessFile.getChannel()

FileChannel 类型通道实例从输入流中获取,如:FileInputStream.getChannel()RandomAccessFile.getChannel()

java.nio.channels.FileChannel.read(buffer)

从 FileChannel 类通道中读取数据并存入指定的 buffer 中

java.nio.channels.FileChannel.write(buffer)

从指定的 buffer 中读取数据并向 FileChannel 类通道写入

java.nio.channels.FileChannel.position()

获取当前 position 位置

java.nio.channels.FileChannel.position(long newPosition)

从指定位置开始写入操作

java.nio.channels.FileChannel.truncate(long size)

指定长度并截取文件

java.nio.channels.spi.AbstractInterruptibleChannel.close()

关闭此通道


DatagramChannel 主要方法

java.nio.channels.DatagramChannel.open()

打开一个 DatagramChannel 类型通道

java.nio.channels.DatagramChannel.socket().bind(SocketAddress addr)

接收指定端口中的UDP协议数据

java.nio.channels.DatagramChannel.receive(buffer)

将数据写入到指定 buffer 中

java.nio.channels.DatagramChannel.send(ByteBuffer src, SocketAddress target)

将指定的 buffer 内的数据发送给指定的 IP地址+端口号

java.nio.channels.spi.AbstractInterruptibleChannel.close()

关闭此通道


SocketChannel 主要方法

java.nio.channels.SocketChannel.open()

打开一个 SocketChannel 类型通道

java.nio.channels.SocketChannel.connect(SocketAddress remote)

通过 Socket 方式连接至指定的IP地址+端口号

java.nio.channels.SocketChannel.read(buffet)

从 SocketChannel 类通道中读取数据并存入指定的 buffer 中

java.nio.channels.SocketChannel.write(buffer)

从指定的 buffer 中读取数据并向 SocketChannel 类通道写入

java.nio.channels.spi.AbstractSelectableChannel.configureBlocking(boolean block)

设置阻塞模式,false 为非阻塞

java.nio.channels.spi.AbstractInterruptibleChannel.close()

关闭此通道


ServerSocketChannel 主要方法

java.nio.channels.ServerSocketChannel.open()

打开一个 ServerSocketChannel 类型通道

java.nio.channels.ServerSocketChannel.bind(SocketAddress local)

监听指定端口下的TCP连接

ServerSocketChannel.accept()

监听新连接。通常用 while(true){} 方式循环监听,获取到新 channel 后根据其事件做对应操作

java.nio.channels.spi.AbstractSelectableChannel.configureBlocking(boolean block)

设置阻塞模式,false 为非阻塞

java.nio.channels.spi.AbstractInterruptibleChannel.close()

关闭此通道


Buffer 主要方法

java.nio.****Buffer.allocate(int capacity)

创建缓冲区并设定容量

java.nio.****Buffer..fwrap(byte[] array)

根据存入数组大小新建/更新缓冲区并重设容量为 array.length

java.nio.Buffer.flip()

将 Buffer 由写模式更改为读模式

java.nio.Buffer.clear()java.nio.Buffer.compact()

两者均为允许数据写入(由读模式更改为写模式)并重置缓冲区标识(capacitypositionlimit),但对待标识参数的具体处理不同,决定了后续操作对数据的影响也不同

capacity 表示缓冲区的最大容量。position 在读模式下指定读取开始位置的索引,由写模式切换过来时 position 会被置0。在写模式下指定写入元素下标,最大可用下标为 capacity -1,可用下标从 0 开始。limit 在读模式下代表本缓冲区内最多可读数据量。写模式下代表本缓冲区最多可用空间

clear()  与 compact() 的区别在于对 Buffer 内现存数据的后续使用造成的影响不同。clear() 会做如下操作: position = 0; limit = capacity; mark = -1; 也就是说数据虽未被删除,但当我们之后再次写入时,将不再关心是否保留它们而直接覆盖。而 compact() 会把所有未读数据拷贝至起始处,将position设为最后一个未读元素后,将limit设置为capacity,也就是说再次写入数据时不会覆盖未读数据,这些数据将被继续缓存并等待之后的使用


Selector 主要方法

java.nio.channels.Selector.select()

准备一组已准备好进行 I/O 操作的 channel 。以阻塞线程的方式,直到返回至少一个符合要求的 channel

java.nio.channels.Selector.select(long timeout)

准备一组已准备好进行 I/O 操作的 channel 。以阻塞线程的方式,直到返回至少一个符合要求的 channel 或者给定的超时时间到期

java.nio.channels.Selector.selectNow()

准备一组已准备好进行 I/O 操作的 channel 。以非阻塞线程的方式,如果没有符合要求的通道,则直接返回 0

java.nio.channels.Selector.selectedKeys()

准备好可进行 I/O 操作的 channel 后,调用此方法获取已就绪的 channel,之后遍历并判断 channel 对应的事件即可


SelectionKey 主要方法

java.nio.channels.SelectionKey.channel();

返回当前 SelectionKey 对应的 Channel ,即使其已关闭,也同样返回

java.nio.channels.SelectionKey.selector();

返回管理此 SelectionKey 的 Selectoe,即使其已关闭,也同样返回 Selectoe

java.nio.channels.SelectionKey.isValid();

返回当前 SelectionKey 是否有效的状态。SelectionKey  在创建时有效并持续保持,在被取消cancel()或删除removeKey(key)后变为无效,在 Channel 或 Seelctor 被关闭后同样变为无效

java.nio.channels.SelectionKey.cancel();

取消此 key 对应的 channel 在 selector 内取消。valid 也将更新为 false,此 key 将被添加至 cancelledkey 集合内

java.nio.channels.SelectionKey.interestOps();

获取 SelectionKey 中包含的 interest set, 存储的是我们设定的事件

java.nio.channels.SelectionKey.interestOps(int ops)

将此 key 对应的interst设置为指定值,此操作会对 ops 和 channel.validOps 进行校验,如果此ops不被当前channel支持,将抛出异常

java.nio.channels.SelectionKey.readyOps();

获取 SelectionKey 中包含的 ready set,存储的是准备就绪的事件。每次 select() 时,选择器都会对 ready set 进行更新,外部程序无法修改此集合.

java.nio.channels.SelectionKey.isReadable();

检测此键是否为"read"事件,等效于:k.,readyOps() & OP_READ != 0;还有isWritable(),isConnectable(),isAcceptable()

java.nio.channels.SelectionKey.attach(Object ob)

添加附件。若需要向 SelectionKey 添加更多数据信息,方便之后的操作,可通过 attach(Object ob) 或 register channel 时存入

java.nio.channels.SelectionKey.attachment();

获取附件。附件可在 Channe 生命周期中共享,但不可作为 socket 数据实现网络传输


应用源码实例

服务端

/**
 * NIO Server
 * @author liyongli 20191029
 * */

public class NIOServerDemo {

    // IO复用器丨选择器
    private Selector NIOServerSelector;

    // 数据通道
    private ServerSocketChannel NIOServerSocketChannel;

    public static void main(String[] args{
        // 先运行本类代码,再运行客户端代码
        NIOServerDemo NIOServer = new NIOServerDemo();

        // 初始化服务器配置
        NIOServer.initServerSocketChannel("localhost"8081);

        // 启动监听
        NIOServer.startNIOServerSelectorListener();
    }

    /**
     * 注册 Channel
     * @param hostname 主机地址
     * @param port 端口号
     * */

    private void initServerSocketChannel(String hostname, int port){
        try {
            // 初始化IO复用器丨选择器
            NIOServerSelector = Selector.open();

            // 打开通道
            NIOServerSocketChannel = ServerSocketChannel.open();

            // 调整模式为非阻塞
            NIOServerSocketChannel.configureBlocking(false);

            // 设置端口
            NIOServerSocketChannel.socket().bind(new InetSocketAddress(hostname, port));

            // 注册此通道
            NIOServerSocketChannel.register(NIOServerSelector, SelectionKey.OP_ACCEPT);
            System.out.println("服务端准备就绪");

        } catch (IOException e) {
            System.out.println("服务端初始化失败");
            e.printStackTrace();
        }
    }

    /**
     * 启动 Selector
     * */

    private void startNIOServerSelectorListener(){
        while(true){
            try {
                // 选中通道
                NIOServerSelector.select();

                // 获取所有 key
                Iterator NIOServerSelectorIterator = NIOServerSelector.selectedKeys().iterator();

                while(NIOServerSelectorIterator.hasNext()){

                    // 获取 key
                    SelectionKey selectionKey = NIOServerSelectorIterator.next();

                    // 判断当前 channel 是否可接收 socket 连接
                    if(selectionKey.isAcceptable()){
                        // 复用
                        SocketChannel socketChannel = NIOServerSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(NIOServerSelector, SelectionKey.OP_READ);

                        // 判断当前通道是否可读取
                    }else if(selectionKey.isReadable()){
                        callClient(selectionKey);
                    }

                    // 删除已处理完成的 key 
                    NIOServerSelectorIterator.remove();
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 自动回复 Client
     * */

    private void callClient(SelectionKey selectionKey){
        try {
            // 获取对应通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

            // 新建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            // 读取通道数据并存入缓冲区
            int index = socketChannel.read(byteBuffer);

            // 若通道内有数据
            if(index != -1){
                System.out.println("服务端接收:" + new String(byteBuffer.array()));

                // 自动回复(此处可添加对应业务逻辑)
                socketChannel.write(ByteBuffer.wrap("hello client,im waiting for you!".getBytes()));
                System.out.println("服务端回复:" + "hello client,im waiting for you!");

            }else{
                // 通道内无数据
                socketChannel.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端

/**
 * NIO Client
 * @author liyongli 20191029
 * */

public class NIOClientDemo {

    // 定义通道
    private SocketChannel socketChannel;

    public static void main(String[] args){
        // 先运行服务端代码,再运行本类
        NIOClientDemo NIOClient = new NIOClientDemo();

        // 初始化连接服务器配置
        NIOClient.initClientChannel("localhost"8081);

        // 发送消息
        NIOClient.callServer("hello server,what are you doing?");
    }

    /**
     * 初始化客户端 NIO Channel
     * */

    public void initClientChannel(String hostname, int port){
        try {
            // 初始化 socket
            InetSocketAddress inetSocketAddress = new InetSocketAddress(hostname, port);

            // 建立通道
            socketChannel = SocketChannel.open(inetSocketAddress);
            System.out.println("客户端准备就绪");

        } catch (IOException e) {
            System.out.println("初始化客户端失败");
            e.printStackTrace();
        }
    }

    /**
     * 通信 Server
     * */

    public void callServer(String callStr){
        // 将字符串转换为 byte 数组,便于稍后传输
        byte[] requestByte = new String(callStr).getBytes();

        // 创建一个1024 容量的 ByteBuffer 
        ByteBuffer byteBuffer = ByteBuffer.wrap(requestByte);
        System.out.println("客户端发送:" + new String(byteBuffer.array()));

        if(null != socketChannel){
            try {
                // 向通道写入数据
                socketChannel.write(byteBuffer);

                // 清空缓冲区(数据并未被删除,但位置、标记、限制被重置)
                byteBuffer.clear();

                // 读取被服务器更新的数据
                socketChannel.read(byteBuffer);
                System.out.println("客户端接收:" + new String(byteBuffer.array()));

                // 关闭通道
                socketChannel.close();

            } catch (IOException e) {
                System.out.println("通信出错");
                e.printStackTrace();
            }
        }else{
            System.out.println("请初始化客户端");
        }
    }
}

运行结果

服务端准备就绪
服务端接收:hello server,what are you doing?
服务端回复:hello client,im waiting for you!
客户端准备就绪
客户端发送:hello server,what are you doing?
客户端接收:hello client,im waiting for you!

本篇将持续更新 Java NIO 相关知识,一起查漏补缺学个痛快!欢迎点赞留香丨留言鼓励丨指出不足!

长按订阅更多精彩▼