专栏名称: 唤之
目录
相关文章推荐
OSC开源社区  ·  谈谈DeepSeek-R1满血版推理部署和优化 ·  2 天前  
51CTO官微  ·  DeepSeek爆火!我们整理了80余页宝典 ... ·  3 天前  
OSC开源社区  ·  AI的三岔路口:专业模型和个人模型 ·  4 天前  
OSC开源社区  ·  DS豆包通义BTY王炸组合,我做了个元宵AI ... ·  3 天前  
程序员的那些事  ·  马斯克开团豪掷 974 ... ·  4 天前  
51好读  ›  专栏  ›  唤之

Java NIO分析(10): JVM堆外内存利用改进: DirectBuffer详解

唤之  · 掘金  · 程序员  · 2018-07-23 08:10

正文

前面我们详细讲了 Java NIO分析(8): 高并发核心Selector详解 Java NIO分析(9): 从BSD socket到SocketChannel , 分别是NIO的事件分发器和非阻塞处理器.

为了支持 Channel 的双向读写和 Scatter/Gather 操作,我们还需要 Buffer ,将I/O数据存储备用。普通的Buffer都是JVM堆内的Buffer, 比较好理解.

接下来我们聊聊JVM使用堆外内存的沧桑历史以及为什么要设计出 DirectBuffer

首先我们要从没有NIO的前JDK1.4时代开始说起,那会儿大家使用的是 SocketInputStream SocketOutputStream .

1. Java传统的Socket是如何收发数据的?

SocketOutputStream 为例,继承类图如下:

其实大家应该比较熟悉, java.io 包里最根本的抽象就是 InputStream OutputStream ,其余的类抽象和实现都是一堆 装饰器
所以我们要跟踪的就是 write方法

// java.io.SocketOutputStream
public void write(byte b[], int off, int len) throws IOException {
    // 委托给本类的socketWrite方法
    socketWrite(b, off, len);
}

    /**
     * Writes to the socket with appropriate locking of the
     * FileDescriptor.
     * @param b the data to be written
     * @param off the start offset in the data
     * @param len the number of bytes that are written
     * @exception IOException If an I/O error has occurred.
     */
private void socketWrite(byte b[], int off, int len) throws IOException {
    ...省略非关键代码
    FileDescriptor fd = impl.acquireFD();
    try {
        // 上面就是做了一些参数判断,然后委托给socketWrite0
        socketWrite0(fd, b, off, len);
    } catch (SocketException se) {
    ...
    } finally {
        impl.releaseFD();
    }
}

    /**
     * Writes to the socket.
     * @param fd the FileDescriptor
     * @param b the data to be written
     * @param off the start offset in the data
     * @param len the number of bytes that are written
     * @exception IOException If an I/O error has occurred.
     */
private native void socketWrite0(FileDescriptor fd, byte[] b, int off,
                                 int len) throws IOException;

最后调用的是一个native方法 socketWrite0 ,打开 jdk/src/solaris/native/java/net/SocketOutputStream.c

/*
 * Class:     java_net_SocketOutputStream
 * Method:    socketWrite0
 * Signature: (Ljava/io/FileDescriptor;[BII)V
 */
JNIEXPORT void JNICALL
Java_java_net_SocketOutputStream_socketWrite0(JNIEnv *env, jobject this,
                                              jobject fdObj,
                                              jbyteArray data,
                                              jint off, jint len) {
    char *bufP;
    char BUF[MAX_BUFFER_LEN];
    int buflen;
    int fd;

    if (IS_NULL(fdObj)) {
        JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
        return;
    } else {
        fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
        /* Bug 4086704 - If the Socket associated with this file descriptor
         * was closed (sysCloseFD), the the file descriptor is set to -1.
         */
        if (fd == -1) {
            JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
            return;
        }

    }

    if (len <= MAX_BUFFER_LEN) {
        bufP = BUF;
        buflen = MAX_BUFFER_LEN;
    } else {
        buflen = min(MAX_HEAP_BUFFER_LEN, len);
        // 初始化一块直接内存
        bufP = (char *)malloc((size_t)buflen);

        /* if heap exhausted resort to stack buffer */
        if (bufP == NULL) {
            bufP = BUF;
            buflen = MAX_BUFFER_LEN;
        }
    }

    while(len > 0) {
        int loff = 0;
        int chunkLen = min(buflen, len);
        int llen = chunkLen;
        // 将堆内的data复制到刚才初始化的内存bufP里
        (*env)->GetByteArrayRegion(env, data, off, chunkLen, (jbyte *)bufP);

        while(llen > 0) {
            // 发送数据
            int n = NET_Send(fd, bufP + loff, llen, 0);
            if (n > 0) {
                llen -= n;
                loff += n;
                continue;
            }
            if (n == JVM_IO_INTR) {
                JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
            } else {
                if (errno == ECONNRESET) {
                    JNU_ThrowByName(env, "sun/net/ConnectionResetException",
                        "Connection reset");
                } else {
                    NET_ThrowByNameWithLastError(env, "java/net/SocketException",
                        "Write failed");
                }
            }
            if (bufP != BUF) {
                free(bufP);
            }
            return;
        }
        len -= chunkLen;
        off += chunkLen;
    }

    if (bufP != BUF) {
        free(bufP);
    }
}

这段比较短,就没有省略代码了。
大致分3步:

  1. 申请一块直接内存bufP, 长度为len
  2. 将堆内的data复制到刚才初始化的内存bufP里
  3. 调用Net_send函数发送数据

Net_send 是一个宏,定义在 net_util_md.h

// net_util_md.h
#define NET_Send        JVM_Send

// jvm.cpp
JVM_LEAF(jint, JVM_Send(jint fd, char *buf, jint nBytes, jint flags))
  JVMWrapper2("JVM_Send (0x%x)", fd);
  //%note jvm_r6
  return os::send(fd, buf, (size_t)nBytes, (uint)flags);
JVM_END

// os_linux.inline.hpp
inline int os::send(int fd, char* buf, size_t nBytes, uint flags) {
  RESTARTABLE_RETURN_INT(::send(fd, buf, nBytes, flags));
}

这个宏替换的是 JVM_Send ,是一个cpp封装方法,最后会调用 os::send ,这个在 os_linux.inline.hpp 里,可以看到,最后调用的是一个全局函数 send

send 是我们的老朋友了,属于POSIX的标准 Socket API , 如果你在类Unix系统上,都可以通过 man send 来翻看它的文档。它的函数签名如下:

#include <sys/socket.h>

 ssize_t
 send(int socket, const void *buffer, size_t length, int flags);

这个函数用来将首地址为buffer, 长度为length的数据发送到 socket fd .

所以传统的Java Socket编程每次发送数据的时候,都会申请一块直接内存(堆外),然后从堆内复制到堆外,最后在调用send发送

为什么要把数据从堆内复制到堆外呢?因为 堆内的对象地址会随着gc改变,在send的时候会崩

在高并发场景下,这是非常费内存的,假如每个链接发送的数据是1k, 那么堆内有1K的数据,堆外还要申请1K的数据,还要做数据拷贝,百万链接就需要2T的内存(极端场景), 无疑是瓶颈之一。

2. NIO的解决方案: DirectBuffer

既然Socket Api一定要用堆外的内存,一个解决思路就是 复用这块内存 ,这样就 不必每次都申请一块新的内存,减少系统调用损耗

计算机世界就是解决一个问题的同时带来新的问题 ,如果复用了这块内存,如何在不用的时候进行回收就是一个问题了,因为这块 内存在堆外,JVM的gc管不到 ,放着不管又迟早触发Linux的 OOM Killer .

jdk1.4以后的NIO带来了解决方案: DirectBuffer

2.1 DirectBuffer

JVM堆内的对象是带gc的,那么将JVM堆内的对象关联到堆外,在回收堆内对象的时候触发一个回收堆外内存的操作,就可以解决这个问题了。

这个设计思路的常用实现就是 DirectByteBuffer , 打开其代码


// Primary constructor
//
DirectByteBuffer(int cap) {                   // package-private
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    // 主要是记录jdk已经使用的直接内存的数量,当分配直接内存时,需要进行增加,当释放时,需要减少
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        // 分配直接内存
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    // 内存清零
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        // Round up to page boundary
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    // 创建Cleaner对象
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

private ByteBuffer putShort(long a, short x) {
    if (unaligned) {
        short y = (x);
        unsafe.putShort(a, (nativeByteOrder ? y : Bits.swap(y)));
    } else {
        Bits.putShort(a, x, bigEndian);
    }
    return this;
}

在构造器里就会申请一块直接内存,大小就是 ByteBuffer.allocateDirect 的时候指定的。

在调用 DirectByteBuffer PutXXX 的时候,都是写入到这块直接内存的,无需再去 malloc .

所以 SocketChannel 使用 DirectBuffer 进行读写的时候,性能远比 SocketOutputStream 高,需要的内存也没有它多, 对 SocketChannel 有疑问的可以看看前面分析的 Java NIO分析(9): 从BSD socket到SocketChannel

2.2 何时回收直接内存?

上面的构造器最后一行是使用了 Cleaner 对象, 这个类是个 虚引用 类,继承自 PhantomReference .

// 创建Cleaner对象
cleaner = Cleaner.create(this,
        new Deallocator(base, size, cap));

create第一个参数是要观察的引用,第二个参数是引用被回收触发的操作。

在JVM中,虚引用是为了实现 更细粒度的内存控制的手段 ,在创建虚引用的时候必须传入一个 引用队列(ReferenceQueue) ,在一个对象的finalize函数被调用之后,这个对象的虚引用会被加入 引用队列 , 通过检查队列就可以知道对象是不是要被回收了。

sun.misc.Cleaner 就是一个自带 ReferenceQueue 的类, 在 create 的时候会将 DirectBuffer 的引用加入观察,一旦引用被回收,JVM将会通知Cleaner去执行回收操作。

public class Cleaner
    extends PhantomReference<Object>
{

    // 引用队列
    private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue<>();

    ...
    private Cleaner(Object referent, Runnable thunk) {
        super(referent, dummyQueue);
        this.thunk = thunk;
    }
    ...
}

2.3 如何回收直接内存?

DirectBuffer 中有个内部类 Deallocator







请到「今天看啥」查看全文