专栏名称: 阿里开发者
阿里巴巴官方技术号,关于阿里的技术创新均将呈现于此
目录
相关文章推荐
白鲸出海  ·  PixVerse完成新一轮融资、官宣MAU突 ... ·  14 小时前  
白鲸出海  ·  休闲游戏,逆势疯狂融资? ·  14 小时前  
阿里开发者  ·  满血上阵!DeepSeek x ... ·  昨天  
滴滴招聘  ·  滴滴2025春招Q 合集来啦! ·  3 天前  
51好读  ›  专栏  ›  阿里开发者

JDK21有没有什么稳定、简单又强势的特性?

阿里开发者  · 公众号  · 科技公司  · 2025-03-05 08:30

主要观点总结

文章介绍了Java虚拟线程的发展、实现和优化,并着重讲述了在AJDK21.0.5及以上版本中虚拟线程的实现。文章从虚拟线程的基本概念开始,详细描述了Java虚拟线程与操作系统线程、Java线程的关系,以及虚拟线程在提升Java应用性能方面的作用。同时,还解释了虚拟线程pin问题及其解决方案,并介绍了AJDK21.0.5中的解决方案。此外,文章还介绍了虚拟线程诊断工具的使用和效果,并最后强调了虚拟线程在企业级应用中的实际应用和调度管理。

关键观点总结

关键观点1: 虚拟线程的概念和发展

Java虚拟线程是轻量级的线程,由Oracle发起Loom项目引入,旨在提高Java应用的并发性能。虚拟线程基于协程实现,减少线程创建和上下文切换,提高应用性能。

关键观点2: 虚拟线程的实现和优化

AJDK21.0.5及以上版本实现了基于Loom项目的虚拟线程,优化了性能,减少了死锁问题。虚拟线程与操作系统线程、Java线程的关系,以及虚拟线程在提升Java应用性能方面的作用。

关键观点3: 虚拟线程pin问题及解决方案

虚拟线程pin问题是由于Java虚拟线程在某些情况下无法从当前的Carrier Thread移除,导致应用无法继续运行。AJDK21.0.5中提供了解决方案,如修改VM内部对om相关的处理,以及使用JavaCall主动触发FJP补偿线程的逻辑。

关键观点4: 虚拟线程诊断工具

AJDK21.0.5增强了虚拟线程的诊断工具,可以输出非挂载运行中的虚拟线程的栈信息,为开发者提供调试帮助。

关键观点5: 虚拟线程在企业级应用中的应用和调度管理

虚拟线程在企业级应用中可用于定时处理、文件处理、报表生成等任务,通过SchedulerX等工具可以实施企业级定时任务的调度与管理,提供高效、稳定和灵活的解决方案。


正文

阿里妹导读


这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。

阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上的版本,想要用更加稳定的Java虚拟线程还请升级哦:-)

一、基本介绍

作为开发者,想必大家对协程/纤程的概念并不陌生。在Java的Loom项目(虚拟线程)出现之前,协程已经在其他编程语言中得到了广泛应用,比如Go语言就以其内置的轻量级线程——goroutines而闻名。

在AJDK11/AJDK8/Dragonwell11/Dragonwell8中,我们JVM团队则是自研了wisp特性,其实现原理与其他语言的协程类似,通过减少线程创建和上下文切换帮助Java应用提升性能。

 Java Loom项目是由Oracle发起的一个旨在为Java平台引入轻量级线程(也称为纤程,Fiber[2])的开发项目。在Java中我们使用的名词——虚拟线程、协程、纤程指代的都是这个概念,其目标是让开发者能够更容易地编写出高并发的应用程序,而无需担心传统线程所带来的资源消耗和复杂性问题。

出于与上游同步一致的目的,从JDK21开始,我们不再支持wisp,而是采用loom项目的实现来继续优化、研发Java虚拟线程特性。目前,AJDK21.0.5包含了openjdk21中loom的所有内容,并在这基础上进一步优化了虚拟线程,以减少用户在使用虚拟线程时遇到死锁问题(也称为虚拟线程pin问题)的情况。

二、性能参考

稳定性:去年双十一期间,tpp已经灰度上线,目前没有出现问题反馈。当前版本已经在tpp大规模应用,并即将在polardb上线。

性能提升:以下提供一些实验数据供用户参考。

表格中以ajdk21不使用协程为baseline,对比了ajdk21协程、ajdk11以及ajdk11开启wisp协程的情况。(csw表示context switch)

三、使用虚拟线程

虽然Java虚拟线程使用起来是很简单的,但是对比于其他只要开一个选项的特性来说,它还是需要开发者做一点适配改动的。此前很多开发者可能会被“需要修改所有的synchronized的代码块”劝退,但对使用AJDK21.0.5的开发者来说,现在已经可以从修改synchronized的工作中解脱出来啦!开发者只需要去修改应用中有关创建线程的部分,就可以享受到JDK21虚拟线程带来的提升。 


3.1. 单一线程转为虚拟线程

// 传统 java thread,每一个线程都对应于底层操作系统的一个线程Thread javaThread = new Thread(()->{    // some tasks});// 轻量的虚拟线程 virtual thread,它会被底层一个线程携带运行Thread virtualThread = Thread.ofVirtual().start(()->{    // some tasks});

3.2. 线程池转为虚拟线程池

线程池中变成虚拟线程池的核心:将线程池的工厂转成虚拟线程工厂 Thread.ofVirtual().factory()

用户可以找到自己应用代码中的线程池进行相应的修改,以下给出两种示例。

// 1. 官方推荐的每一个任务一个虚拟线程的使用方式//   (因为创建虚拟线程所需要的资源很少,虚拟线程不需要被缓存,但是没有限流处理可能会加剧GC(因为所有被切出去的虚拟线程都是GC Root)ExecutorService es = Executors.newVirtualThreadPerTaskExecutor();// 2. 传统线程池改造成虚拟线程池//    官方并不推荐这种使用方式,但如果想要为应用限流,可以通过虚拟线程池的方式使用,实际应用升级也有很多采用这种方式ThreadFactory factory = Thread.ofVirtual().factory();ExecutorService executorService =                new ThreadPoolExecutor(MAX_WORKER_THREADS, MAX_WORKER_THREADS,                                       10L, TimeUnit.MINUTES,                                       new LinkedBlockingQueue(),                                       factory);

官方推荐方式的隐患

虽然openjdk官方推荐使用 Executors.newVirtualThreadPerTaskExecutor() ; 且明确不推荐使用虚拟线程池,但这里必须要提醒用户:过多的虚拟线程任务有可能会导致持续不断地FGC,甚至OOM应用退出。这种现象后的根因是,被切出的虚拟线程都会在GC中视为根对象,这些虚拟线程引用的所有对象都必须继续保留在堆上。

以下是一个简单的用例以说明这种情况,感兴趣的同学可以尝试一下。(实验中堆设置成4G大小)

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class TestFGCVT {    public static void main(String[] args){        // 1024 * 1024 * (1024 * 8) bytes -> 8G        ThreadFactory factory = Thread.ofVirtual().factory();        // 不限流的方式可能会造成连续不断地FGC,甚至OOM退出        ExecutorService es = Executors.newThreadPerTaskExecutor(factory);        // 限流,最多有1024*100个虚拟线程任务        // ExecutorService es = new ThreadPoolExecutor(1024*100, 1024*100, // 1024 * 100 * 1024 * 8 = 800M        //                                        10L, TimeUnit.MINUTES,        //                                        new LinkedBlockingDeque(),        //                                        factory);        for(int i = 0 ; i < 1024 * 1024; i++) {            System.out.println("execute: " + i);            es.execute(new




    
 Task(i));        }        try {            es.shutdown();            while(!es.awaitTermination(10, TimeUnit.SECONDS)) {                System.out.println("still waiting...");            }            es.close();        } catch (Exception e) {            e.printStackTrace();        }    }}class Task implements Runnable {    int num;    Task(int i) {        num = i;    }    @Override    public void run() {        Integer[] largeInt = new Integer[1024];        for(int j = 0 ; j < largeInt.length; j++) {            largeInt[j] = j * 100;        }        try {            Thread.sleep(30_000); // yield this vthread 这里虽然线程被切出去了,但是持有的超大Integer数组中的所有Integer对象都会一直存活在堆上,至少占用1024 * 8 bytes        } catch (Exception e) {            e.printStackTrace();        }        int sum = 0;        for(int j = 0 ; j < largeInt.length; j++) {            sum += largeInt[j];        }        System.out.println(num + ":" + sum);    }}

3.3. 重要参数介绍

虚拟线程本身无需开任何参数就可以使用,但是对于Java应用来说,还是需要关心关于虚拟线程的两个核心参数,以控制底层实际工作线程数量,主要参数如下:

1. -Djdk.virtualThreadScheduler.parallelism=N 这个参数是并行线程数的设置,虚拟机会在大部分情况下维持的工作线程数量与N一致,这个参数需要用户根据自己的机器决定,一般情况下与机器的CPU核数一致(或略小于机器CPU核数)。

2. -Djdk.virtualThreadScheduler.maxPoolSize=M 这个参数是最大允许创建的线程数量,实际应用中可以将M设置比较大,(M>>N,比如设置成M=1024),遇到pin情况(在第4小节中会详细介绍)后触发的补偿机制在当前线程数量未超过M时都会生效。在补偿结束后,线程池会尽量恢复到N个线程。这个补偿机制主要的目的是解决虚拟线程中的pin问题,这种问题遇到的概率本身非常低,且处理此情况的补偿机制带来的开销几乎可以忽略不计,因此用户无需担心该参数带来的负面影响。

此外, -Djdk.virtualThreadScheduler.minRunnable 已经开放,默认是 max(parallelism / 2, 1) 。其余的参数暂时没有开放给用户。目前虚拟线程的工作线程池默认的corePoolSize实际与parallelism一致,keepAliveTime是30s。



3.4. 不推荐的做法

如果不使用 Thread.ofVirtual().factory() 而选择自行创建虚拟线程工厂,需要用户对当前的虚拟线程机制有较高的理解。这种工厂 创建线程的调用链 上使用到的类( )以及对象初始化操作( ), 需要用户自行确保全部提前加载和初始化 ,否则可能会导致JVM崩溃或者应用死锁。以下给出一个自主创建虚拟线程池的例子。

public static void main(String[] args) {    ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinWorkerThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {    @Override    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {         return new CarrierThread(pool);    }    // newThread这个调用链上的所有类和初始化都需要用户保证遇到pin问题前全部加载和初始化。    // 用户可以选择在运行任务前主动触发一次newThread。如果newThread中有选择逻辑,用户需要保证路径全覆盖。    // 这里的newThread最佳使用jdk.internal.misc.CarrierThread,否则无法在遇到pinned问题时生成多余线程。    // 当然,使用其余的普通线程编码上并不会出现问题。};    ForkJoinPool scheduler = new ForkJoinPool(4, forkJoinWorkerThreadFactory, (t, e) -> { }, true,    4, 10, 1, pool -> true, 30, TimeUnit.SECONDS);    // 4 parallelism, 4 corePoolSize, 10 maximumPoolSize, 1 minimumRunnable, 30s keepAliveTime    Thread.Builder builder = virtualThreadBuilder(scheduler);    // 单个任务用法和Thread.ofVirtual()一致    builder.start(()->{});    // es用法    ExecutorService es = Executors.newThreadPerTaskExecutor(builder.factory());}// java.lang.ThreadBuilders$VirtualThreadBuilder(Executor) 本身被JDK中标记成只为测试使用的方法,需要反射public static Thread.Builder.OfVirtual virtualThreadBuilder(Executor scheduler) {    try {        Class> clazz = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");        Constructor> ctor = clazz.getDeclaredConstructor(Executor.class);        ctor.setAccessible(true);        return (Thread.Builder.OfVirtual) ctor.newInstance(scheduler);    } catch (InvocationTargetException e) {        Throwable cause = e.getCause();        if (cause instanceof RuntimeException re) {            throw re;        }        throw new RuntimeException(e);    } catch (Exception e) {        throw new RuntimeException(e);    }}

上述例子只是一个示范,但是我们不推荐使用。用户应该要理解,这种独立创建虚拟线程工厂的方式下,底层工作线程数量是由用户自行编码控制的,不受到 -Djdk.virtualThreadScheduler.parallelism -Djdk.virtualThreadScheduler.maxPoolSize=M 的影响。因此,如果混用自行创建的虚拟线程工厂和默认的虚拟线程工厂,底层的工作线程数量应当是这两种工厂的工作线程之和。这会影响到虚拟线程使用的效果。 

四、jdk21虚拟线程pin问题以及AJDK的解决方案

在此之前使用过jdk21虚拟线程的开发者可能会有这种体会:应用升级到21并使用虚拟线程后确实变快了,但是总是遇到小概率机器“卡死”的现象,更严重的则是百分之百会遇到一段时间后应用不再响应。

这就是前文中提到过的虚拟线程pin问题。如果不想了解这个问题原因,而只想确定自己的应用是否是这种情况的,可以直接尝试AJDK21.0.5版本进行解决。想继续了解原理的同学,可以继续阅读此小节。 

4.1. 什么是虚拟线程pin问题

首先,我们在介绍虚拟线程pin问题前,我们先介绍一下Java虚拟线程提升性能的原理。 

Java虚拟线程(Virtual Thread),Java线程(Carrier Thread),以及操作系统线程(OS Thread)是 N:1:1的关系。在虚拟线程调度器的配合下,Java虚拟线程会被调度到任意一个空闲的Carrier Thread上。当Carrier Thread遇到应该让渡出资源的事件时(比如Thread.sleep,nio read等等),会在调度器的帮助下更换虚拟线程任务。由于这种切换不需要在用户态与内核态切换,因此更加高效。此外,创建一个虚拟线程所需要的资源远远小于Java线程,因此使用资源的方式也更加高效。

下图是虚拟线程池/线程池的原理示意图。灰色代表虚拟线程,黑色代表Java线程,黄色代表暂时补偿的Java线程/虚拟线程。

 

虚拟线程pin则是指,当前的Virtual Thread在遇到需要被调度出去的事件时,由于JVM内部实现的原因无法从当前的Carrier Thread移除,而是选择一直占用当前的Carrier Thread。也就是上图中的灰色可执行任务一直占用黑色线程,而调度器无法将之切换到其他的灰色可执行任务。虚拟线程pin问题的场景就是,所有的Carrier Thread都被某些Virtual Thread占用了,从而没有任何任务能够继续进行下去。

典型的虚拟线程pin问题主要由以下几个原因导致:

1. 遇到了类加载、初始化事件

2. 遇到了synchronized代码块

3. 应用中调用了native c代码

这些事件导致Virtual Thread无法移除的原理其实是一致的——这些当前Carrier Thread的线程栈是Java Frame和C/C++ native Frame混杂的,含有C/C++ native Frame的虚拟线程不可移除。其不可移除的原因是当前Loom Virtual Thread的冻结、恢复设计中,其线程栈的地址不是原封不动的。比如原来在0x1000的栈,恢复后可能是从0x2000的地址开始,而栈中的内容和之前完全一致。C/C++代码中会有一些操作(比如取地址)没有办法保证恢复后的正确性。 

为了保证程序的正确性,这些虚拟线程就被pin在的底层的Carrier Thread上。当所有的Carrier Thread都被这种虚拟线程占用,同时没有任何事件能被满足从而继续运行下去的话,Java应用就卡死了。下面我们将具体分析几个pin的场景,并介绍AJDK21.0.5的解决方案。 (AJDK21.0.5已经做的工作会用成橙色标记出来。) 

4.2. synchronized导致的pin问题

只说理论还是很抽象,有没有什么更直观具体的例子呢?接下来,我们将结合 AJDK21.0.5对synchronized代码块的解决方案 以及一些小的测试用例来说明。在此前,开发者需要将这个代码块修改成使用juc lock来减少被pin的情况。

关注openjdk issue的同学可能知道,最新上游中的JEP491[3]是解决了synchronized问题的。但这其实只是LOOM开发分支中的一部分代码,其只解决了使用LM_LIGHTWEIGHT模式下的问题,如果使用LM_LEGACY锁模式,这个问题依旧存在。AJDK21.0.5则是包含了更多的LOOM项目中的解决方案代码,并额外增加了解决类加载、类初始化事件导致pin问题的临时方案。 

4.2.1 运行synchronized模块中的代码时无法被切换(yield失败)—— pin when hold om

现象

当虚拟线程在执行synchronized中的代码时,遇到需要等待资源的情况时,不会将线程资源让出给其他协程执行任务。

synchronized(object) {    try {        semaphore.acquire(); // this should yield but pinned    } catch (Exception e) {        // TODO    }}

现在我们假设

1. 能够进行semaphore.release操作的是一个新建的协程任务

2. 目前所有的线程资源(简单起见,可以假设只有一个线程)都被上述代码pin住,停在第三行

那么,程序就会完全停止运行。

实际的应用情况会比这个更复杂,会有多种pin problem组合导致应用程序无法继续运行。

原因

概括来说,当前的JVM中加锁其绑定的是线程资源,而不是协程,因此如果切换协程,其锁的持有信息会出现问题。

TestSynchronizedPinCase1.java

import java.util.concurrent.*;public class TestSynchronizedPinCase1 {    public static void main(String[] args) throws Exception{        Object object = new Object();        Semaphore semaphore = new Semaphore(1);        semaphore.acquire();        Thread t1 = Thread.ofVirtual().start(() -> {            System.out.println("thread1 is reaching synchronzed, running on " + Thread.currentThread());            synchronized(object) {                try{                    System.out.println("thread1 is acquiring semaphore");                    semaphore.acquire(); // this should yield but pinned now                    System.out.println("thread1 acquired");                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    semaphore.release();                    System.out.println("thread1 released");                }            }        });        // ensure the t1 reaches semaphore.acquire();        System.out.println("sleep 1000ms to ensure the t1 reaches `semaphore.acquire();`");        Thread.sleep(1000);        System.out.println("sleep finished");                Thread t2 = Thread.ofVirtual().start(()->{            System.out.println("thread2 is running on " + Thread.currentThread());            semaphore.release();            System.out.println("thread2 released the semaphore");        });        t1.join();        t2.join();    }}

这个小示例模拟了这种情况下的pin问题。执行用例时,请注意设置参数 -Djdk.virtualThreadScheduler.parallelism=1 ,其保证了底层的Carrier Thread数量为1。(如果用户使用最新的openjdk(包含JEP491)是可以正常运行的,但是如果额外指定LM_LEGACY模式(-XX:LockingMode=1)下会失败,感兴趣的同学可以尝试;-)) 

 

解决方案

修改VM内部对om相关的处理,将om的owner记录为virtual thread。 锁的所有者记录修正后,不含有C/C++ Frame的CarrierThread就可以顺利选择新的虚拟线程任务切换运行了。

4.2.2 进入synchronized失败时无法移除 —— pin when enter om

现象

当虚拟线程在进入synchronized时,如果获取om失败,会直接选择等待,不会将线程资源让出给其他协程执行任务。

synchronized(object) { // object has been locked and need to wait    }

现在我们假设

1. 虚拟线程A成功获取object om,但是需要等待某种资源,因此pin住;

2. 其余许多虚拟线程在等待object om,占用了剩余的线程,因此pin住;

3. 能够释放某种资源的虚拟线程任务没有资源能够执行,无法唤醒虚拟线程A;

那么,程序就会完全停止。

这里的某种资源可以是juc,object monitor,或者是触发了类加载逻辑中需要获取的锁等等。这个问题和1.1的pin problem组合在一起就是当前synchronized关键字会带来的虚拟线程卡死问题。

原因

进入synchronized失败后无法yield的原因,主要是ObjectMonitor::enter其相关逻辑是vm中的C/C++所写的逻辑,其栈是C native frame,含有这种栈的协程不能够进行上下文切换。

TestSynchronizedPinCase2.java

import java.util.concurrent.*;public class TestSynchronizedPinCase2 {    public static void main(String[] args) throws Exception{        Object object = new Object();        Semaphore semaphore = new Semaphore(1);        semaphore.acquire();        Thread t1 = Thread.ofVirtual().start(() -> {            System.out.println("thread1 is reaching synchronzed, running on " + Thread.currentThread());            synchronized(object) {                try{                    System.out.println("thread1 is holding object monitor");                    semaphore.acquire(); // this should yield but pinned now                    System.out.println("thread1 is goinng to releasing object monitor");                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    semaphore.release();                    System.out.println("thread1 released");                }            }            System.out.println("thread1 released object monitor");        });        // ensure the t1 reaches semaphore.acquire();        System.out.println("sleep 1000ms");        Thread.sleep(1000);        System.out.println("sleep finished");                Thread t2 = Thread.ofVirtual().start(() -> {            System.out.println("thread2 is reaching synchronzed, running on " + Thread.currentThread());            synchronized(object) { // this should yield but pinned on contended monitorenter now                System.out.println("thread2 is holding object monitor");                semaphore.release(); // this should not be called before t1 released monitor                System.out.println("thread2 released the semaphore");                System.out.println("thread2 is goinng to releasing object monitor");            }            System.out.println("thread2 released object monitor");        });        // ensure the t2 reaches synchronized(object);        System.out.println("sleep 1000ms");        Thread.sleep(1000);        System.out.println("sleep finished");        Thread t3 = Thread.ofVirtual().start(() -> {            System.out.println("thread3 is running on " + Thread.currentThread());            semaphore.release();            System.out.println("thread3 released object monitor");        });        System.out.println("before join()");        t1.join();        t2.join();        t3.join();        System.out.println("after join()");        System.out.println("reach end");    }}

同样,我们给出一个小示例。它模拟了4.2.1和4.2.2两种pin问题。执行这条命令式注意设置参数







请到「今天看啥」查看全文