文章介绍了Java虚拟线程的发展、实现和优化,并着重讲述了在AJDK21.0.5及以上版本中虚拟线程的实现。文章从虚拟线程的基本概念开始,详细描述了Java虚拟线程与操作系统线程、Java线程的关系,以及虚拟线程在提升Java应用性能方面的作用。同时,还解释了虚拟线程pin问题及其解决方案,并介绍了AJDK21.0.5中的解决方案。此外,文章还介绍了虚拟线程诊断工具的使用和效果,并最后强调了虚拟线程在企业级应用中的实际应用和调度管理。
Java虚拟线程是轻量级的线程,由Oracle发起Loom项目引入,旨在提高Java应用的并发性能。虚拟线程基于协程实现,减少线程创建和上下文切换,提高应用性能。
AJDK21.0.5及以上版本实现了基于Loom项目的虚拟线程,优化了性能,减少了死锁问题。虚拟线程与操作系统线程、Java线程的关系,以及虚拟线程在提升Java应用性能方面的作用。
虚拟线程pin问题是由于Java虚拟线程在某些情况下无法从当前的Carrier Thread移除,导致应用无法继续运行。AJDK21.0.5中提供了解决方案,如修改VM内部对om相关的处理,以及使用JavaCall主动触发FJP补偿线程的逻辑。
AJDK21.0.5增强了虚拟线程的诊断工具,可以输出非挂载运行中的虚拟线程的栈信息,为开发者提供调试帮助。
虚拟线程在企业级应用中可用于定时处理、文件处理、报表生成等任务,通过SchedulerX等工具可以实施企业级定时任务的调度与管理,提供高效、稳定和灵活的解决方案。
这篇文章主要介绍了Java虚拟线程的发展及其在AJDK中的实现和优化。
阅前声明:本文介绍的内容基于AJDK21.0.5[1]以及以上的版本,想要用更加稳定的Java虚拟线程还请升级哦:-)
作为开发者,想必大家对协程/纤程的概念并不陌生。在Java的Loom项目(虚拟线程)出现之前,协程已经在其他编程语言中得到了广泛应用,比如Go语言就以其内置的轻量级线程——goroutines而闻名。
在AJDK11/AJDK8/Dragonwell11/Dragonwell8中,我们JVM团队则是自研了wisp特性,其实现原理与其他语言的协程类似,通过减少线程创建和上下文切换帮助Java应用提升性能。
Java Loom项目是由Oracle发起的一个旨在为Java平台引入轻量级线程(也称为纤程,Fiber[2])的开发项目。在Java中我们使用的名词——虚拟线程、协程、纤程指代的都是这个概念,其目标是让开发者能够更容易地编写出高并发的应用程序,而无需担心传统线程所带来的资源消耗和复杂性问题。
出于与上游同步一致的目的,从JDK21开始,我们不再支持wisp,而是采用loom项目的实现来继续优化、研发Java虚拟线程特性。目前,AJDK21.0.5包含了openjdk21中loom的所有内容,并在这基础上进一步优化了虚拟线程,以减少用户在使用虚拟线程时遇到死锁问题(也称为虚拟线程pin问题)的情况。
稳定性:去年双十一期间,tpp已经灰度上线,目前没有出现问题反馈。当前版本已经在tpp大规模应用,并即将在polardb上线。
性能提升:以下提供一些实验数据供用户参考。
表格中以ajdk21不使用协程为baseline,对比了ajdk21协程、ajdk11以及ajdk11开启wisp协程的情况。(csw表示context switch)
虽然Java虚拟线程使用起来是很简单的,但是对比于其他只要开一个选项的特性来说,它还是需要开发者做一点适配改动的。此前很多开发者可能会被“需要修改所有的synchronized的代码块”劝退,但对使用AJDK21.0.5的开发者来说,现在已经可以从修改synchronized的工作中解脱出来啦!开发者只需要去修改应用中有关创建线程的部分,就可以享受到JDK21虚拟线程带来的提升。
Thread javaThread = new Thread(()->{
});
Thread virtualThread = Thread.ofVirtual().start(()->{
});
线程池中变成虚拟线程池的核心:将线程池的工厂转成虚拟线程工厂
Thread.ofVirtual().factory()
。
用户可以找到自己应用代码中的线程池进行相应的修改,以下给出两种示例。
ExecutorService es = Executors.newVirtualThreadPerTaskExecutor();
ThreadFactory factory = Thread.ofVirtual().factory();
ExecutorService executorService =
new ThreadPoolExecutor(MAX_WORKER_THREADS, MAX_WORKER_THREADS,
10L, TimeUnit.MINUTES,
new LinkedBlockingQueue(),
factory);
官方推荐方式的隐患
虽然openjdk官方推荐使用
Executors.newVirtualThreadPerTaskExecutor()
;
且明确不推荐使用虚拟线程池,但这里必须要提醒用户:过多的虚拟线程任务有可能会导致持续不断地FGC,甚至OOM应用退出。这种现象后的根因是,被切出的虚拟线程都会在GC中视为根对象,这些虚拟线程引用的所有对象都必须继续保留在堆上。
以下是一个简单的用例以说明这种情况,感兴趣的同学可以尝试一下。(实验中堆设置成4G大小)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestFGCVT {
public static void main(String[] args){
ThreadFactory factory = Thread.ofVirtual().factory();
ExecutorService es = Executors.newThreadPerTaskExecutor(factory);
for(int i = 0 ; i < 1024 * 1024; i++) {
System.out.println("execute: " + i);
es.execute(new
Task(i));
}
try {
es.shutdown();
while(!es.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("still waiting...");
}
es.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Task implements Runnable {
int num;
Task(int i) {
num = i;
}
@Override
public void run() {
Integer[] largeInt = new Integer[1024];
for(int j = 0 ; j < largeInt.length; j++) {
largeInt[j] = j * 100;
}
try {
Thread.sleep(30_000);
} catch (Exception e) {
e.printStackTrace();
}
int sum = 0;
for(int j = 0 ; j < largeInt.length; j++) {
sum += largeInt[j];
}
System.out.println(num + ":" + sum);
}
}
虚拟线程本身无需开任何参数就可以使用,但是对于Java应用来说,还是需要关心关于虚拟线程的两个核心参数,以控制底层实际工作线程数量,主要参数如下:
1.
-Djdk.virtualThreadScheduler.parallelism=N
这个参数是并行线程数的设置,虚拟机会在大部分情况下维持的工作线程数量与N一致,这个参数需要用户根据自己的机器决定,一般情况下与机器的CPU核数一致(或略小于机器CPU核数)。
2.
-Djdk.virtualThreadScheduler.maxPoolSize=M
这个参数是最大允许创建的线程数量,实际应用中可以将M设置比较大,(M>>N,比如设置成M=1024),遇到pin情况(在第4小节中会详细介绍)后触发的补偿机制在当前线程数量未超过M时都会生效。在补偿结束后,线程池会尽量恢复到N个线程。这个补偿机制主要的目的是解决虚拟线程中的pin问题,这种问题遇到的概率本身非常低,且处理此情况的补偿机制带来的开销几乎可以忽略不计,因此用户无需担心该参数带来的负面影响。
此外,
-Djdk.virtualThreadScheduler.minRunnable
已经开放,默认是
max(parallelism / 2, 1)
。其余的参数暂时没有开放给用户。目前虚拟线程的工作线程池默认的corePoolSize实际与parallelism一致,keepAliveTime是30s。
如果不使用
Thread.ofVirtual().factory()
而选择自行创建虚拟线程工厂,需要用户对当前的虚拟线程机制有较高的理解。这种工厂
创建线程的调用链
上使用到的类(
)以及对象初始化操作(
),
需要用户自行确保全部提前加载和初始化
,否则可能会导致JVM崩溃或者应用死锁。以下给出一个自主创建虚拟线程池的例子。
public static void main(String[] args) {
ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinWorkerThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new CarrierThread(pool);
}
};
ForkJoinPool scheduler = new ForkJoinPool(4, forkJoinWorkerThreadFactory, (t, e) -> { }, true,
4, 10, 1, pool -> true, 30, TimeUnit.SECONDS);
Thread.Builder builder = virtualThreadBuilder(scheduler);
builder.start(()->{});
ExecutorService es = Executors.newThreadPerTaskExecutor(builder.factory());
}
public static Thread.Builder.OfVirtual virtualThreadBuilder(Executor scheduler) {
try {
Class> clazz = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");
Constructor> ctor = clazz.getDeclaredConstructor(Executor.class);
ctor.setAccessible(true);
return (Thread.Builder.OfVirtual) ctor.newInstance(scheduler);
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException re) {
throw re;
}
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
上述例子只是一个示范,但是我们不推荐使用。用户应该要理解,这种独立创建虚拟线程工厂的方式下,底层工作线程数量是由用户自行编码控制的,不受到
-Djdk.virtualThreadScheduler.parallelism
和
-Djdk.virtualThreadScheduler.maxPoolSize=M
的影响。因此,如果混用自行创建的虚拟线程工厂和默认的虚拟线程工厂,底层的工作线程数量应当是这两种工厂的工作线程之和。这会影响到虚拟线程使用的效果。
四、jdk21虚拟线程pin问题以及AJDK的解决方案
在此之前使用过jdk21虚拟线程的开发者可能会有这种体会:应用升级到21并使用虚拟线程后确实变快了,但是总是遇到小概率机器“卡死”的现象,更严重的则是百分之百会遇到一段时间后应用不再响应。
这就是前文中提到过的虚拟线程pin问题。如果不想了解这个问题原因,而只想确定自己的应用是否是这种情况的,可以直接尝试AJDK21.0.5版本进行解决。想继续了解原理的同学,可以继续阅读此小节。
首先,我们在介绍虚拟线程pin问题前,我们先介绍一下Java虚拟线程提升性能的原理。
Java虚拟线程(Virtual Thread),Java线程(Carrier Thread),以及操作系统线程(OS Thread)是 N:1:1的关系。在虚拟线程调度器的配合下,Java虚拟线程会被调度到任意一个空闲的Carrier Thread上。当Carrier Thread遇到应该让渡出资源的事件时(比如Thread.sleep,nio read等等),会在调度器的帮助下更换虚拟线程任务。由于这种切换不需要在用户态与内核态切换,因此更加高效。此外,创建一个虚拟线程所需要的资源远远小于Java线程,因此使用资源的方式也更加高效。
下图是虚拟线程池/线程池的原理示意图。灰色代表虚拟线程,黑色代表Java线程,黄色代表暂时补偿的Java线程/虚拟线程。
虚拟线程pin则是指,当前的Virtual Thread在遇到需要被调度出去的事件时,由于JVM内部实现的原因无法从当前的Carrier Thread移除,而是选择一直占用当前的Carrier Thread。也就是上图中的灰色可执行任务一直占用黑色线程,而调度器无法将之切换到其他的灰色可执行任务。虚拟线程pin问题的场景就是,所有的Carrier Thread都被某些Virtual Thread占用了,从而没有任何任务能够继续进行下去。
典型的虚拟线程pin问题主要由以下几个原因导致:
1.
遇到了类加载、初始化事件
2.
遇到了synchronized代码块
3.
应用中调用了native c代码
这些事件导致Virtual Thread无法移除的原理其实是一致的——这些当前Carrier Thread的线程栈是Java Frame和C/C++ native Frame混杂的,含有C/C++ native Frame的虚拟线程不可移除。其不可移除的原因是当前Loom Virtual Thread的冻结、恢复设计中,其线程栈的地址不是原封不动的。比如原来在0x1000的栈,恢复后可能是从0x2000的地址开始,而栈中的内容和之前完全一致。C/C++代码中会有一些操作(比如取地址)没有办法保证恢复后的正确性。
为了保证程序的正确性,这些虚拟线程就被pin在的底层的Carrier Thread上。当所有的Carrier Thread都被这种虚拟线程占用,同时没有任何事件能被满足从而继续运行下去的话,Java应用就卡死了。下面我们将具体分析几个pin的场景,并介绍AJDK21.0.5的解决方案。
(AJDK21.0.5已经做的工作会用成橙色标记出来。)
4.2. synchronized导致的pin问题
只说理论还是很抽象,有没有什么更直观具体的例子呢?接下来,我们将结合
AJDK21.0.5对synchronized代码块的解决方案
以及一些小的测试用例来说明。在此前,开发者需要将这个代码块修改成使用juc lock来减少被pin的情况。
关注openjdk issue的同学可能知道,最新上游中的JEP491[3]是解决了synchronized问题的。但这其实只是LOOM开发分支中的一部分代码,其只解决了使用LM_LIGHTWEIGHT模式下的问题,如果使用LM_LEGACY锁模式,这个问题依旧存在。AJDK21.0.5则是包含了更多的LOOM项目中的解决方案代码,并额外增加了解决类加载、类初始化事件导致pin问题的临时方案。
4.2.1 运行synchronized模块中的代码时无法被切换(yield失败)—— pin when hold om
现象
当虚拟线程在执行synchronized中的代码时,遇到需要等待资源的情况时,不会将线程资源让出给其他协程执行任务。
synchronized(object) {
try {
semaphore.acquire();
} catch (Exception e) {
}
}
现在我们假设
1.
能够进行semaphore.release操作的是一个新建的协程任务
2.
目前所有的线程资源(简单起见,可以假设只有一个线程)都被上述代码pin住,停在第三行
那么,程序就会完全停止运行。
实际的应用情况会比这个更复杂,会有多种pin problem组合导致应用程序无法继续运行。
原因
概括来说,当前的JVM中加锁其绑定的是线程资源,而不是协程,因此如果切换协程,其锁的持有信息会出现问题。
TestSynchronizedPinCase1.java
import java.util.concurrent.*;
public class TestSynchronizedPinCase1 {
public static void main(String[] args) throws Exception{
Object object = new Object();
Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
Thread t1 = Thread.ofVirtual().start(() -> {
System.out.println("thread1 is reaching synchronzed, running on " + Thread.currentThread());
synchronized(object) {
try{
System.out.println("thread1 is acquiring semaphore");
semaphore.acquire();
System.out.println("thread1 acquired");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println("thread1 released");
}
}
});
System.out.println("sleep 1000ms to ensure the t1 reaches `semaphore.acquire();`");
Thread.sleep(1000);
System.out.println("sleep finished");
Thread t2 = Thread.ofVirtual().start(()->{
System.out.println("thread2 is running on " + Thread.currentThread());
semaphore.release();
System.out.println("thread2 released the semaphore");
});
t1.join();
t2.join();
}
}
这个小示例模拟了这种情况下的pin问题。执行用例时,请注意设置参数
-Djdk.virtualThreadScheduler.parallelism=1
,其保证了底层的Carrier Thread数量为1。(如果用户使用最新的openjdk(包含JEP491)是可以正常运行的,但是如果额外指定LM_LEGACY模式(-XX:LockingMode=1)下会失败,感兴趣的同学可以尝试;-))
解决方案
修改VM内部对om相关的处理,将om的owner记录为virtual thread。
锁的所有者记录修正后,不含有C/C++ Frame的CarrierThread就可以顺利选择新的虚拟线程任务切换运行了。
4.2.2 进入synchronized失败时无法移除 —— pin when enter om
现象
当虚拟线程在进入synchronized时,如果获取om失败,会直接选择等待,不会将线程资源让出给其他协程执行任务。
现在我们假设
1.
虚拟线程A成功获取object om,但是需要等待某种资源,因此pin住;
2.
其余许多虚拟线程在等待object om,占用了剩余的线程,因此pin住;
3.
能够释放某种资源的虚拟线程任务没有资源能够执行,无法唤醒虚拟线程A;
那么,程序就会完全停止。
这里的某种资源可以是juc,object monitor,或者是触发了类加载逻辑中需要获取的锁等等。这个问题和1.1的pin problem组合在一起就是当前synchronized关键字会带来的虚拟线程卡死问题。
原因
进入synchronized失败后无法yield的原因,主要是ObjectMonitor::enter其相关逻辑是vm中的C/C++所写的逻辑,其栈是C native frame,含有这种栈的协程不能够进行上下文切换。
TestSynchronizedPinCase2.java
import java.util.concurrent.*;
public class TestSynchronizedPinCase2 {
public static void main(String[] args) throws Exception{
Object object = new Object();
Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
Thread t1 = Thread.ofVirtual().start(() -> {
System.out.println("thread1 is reaching synchronzed, running on " + Thread.currentThread());
synchronized(object) {
try{
System.out.println("thread1 is holding object monitor");
semaphore.acquire();
System.out.println("thread1 is goinng to releasing object monitor");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println("thread1 released");
}
}
System.out.println("thread1 released object monitor");
});
System.out.println("sleep 1000ms");
Thread.sleep(1000);
System.out.println("sleep finished");
Thread t2 = Thread.ofVirtual().start(() -> {
System.out.println("thread2 is reaching synchronzed, running on " + Thread.currentThread());
synchronized(object) {
System.out.println("thread2 is holding object monitor");
semaphore.release();
System.out.println("thread2 released the semaphore");
System.out.println("thread2 is goinng to releasing object monitor");
}
System.out.println("thread2 released object monitor");
});
System.out.println("sleep 1000ms");
Thread.sleep(1000);
System.out.println("sleep finished");
Thread t3 = Thread.ofVirtual().start(() -> {
System.out.println("thread3 is running on " + Thread.currentThread());
semaphore.release();
System.out.println("thread3 released object monitor");
});
System.out.println("before join()");
t1.join();
t2.join();
t3.join();
System.out.println("after join()");
System.out.println("reach end");
}
}
同样,我们给出一个小示例。它模拟了4.2.1和4.2.2两种pin问题。执行这条命令式注意设置参数