公众号后台回复 “
资料
”
获取作者独家秘制学习资料
这篇文章,我们从源码级别来聊聊线程池的运行原理,帮大家彻底吃透线程池的底层!
ThreadPoolExecutor
在深入源码之前先来看看J.U.C包中的线程池类图:
它们的最顶层是一个Executor接口,它只有一个方法:
public interface Executor {
void execute(Runnable command);
}
它提供了一个运行新任务的简单方法,Java线程池也称之为Executor框架。
ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等。
此外,它扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。
ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,也是这篇文章重点分析的对象
一般我们使用线程池,如没有特殊要求,直接创建ThreadPoolExecutor,初始化一个线程池
如果需要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能。
比如ScheduledThreadPoolExecutor,它是一个具有定时执行任务的线程池。
接下来,我们就开始ThreadPoolExecutor的源码分析(以下源码为JDK8版本)
ctl变量
ctl是一个Integer值,它是对线程池运行状态和线程池中有效线程数量进行控制的字段
Integer值一共有32位,
其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"
我们看看Doug Lea大神是如何实现的:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 通过位运算获取线程池运行状态
private
static int runStateOf(int c) { return c & ~CAPACITY; }
// 通过位运算获取线程池中有效的工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 初始化ctl变量值
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池一共有状态5种状态,分别是:
1.
Running
:线程池初始化时默认的状态,表示线程正处于运行状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务;
2.
SHUTDOWN
:调用shutdown()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,但是还会处理阻塞队列中的任务;
3.
STOP
:调用shutdownNow()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,同时不再处理阻塞队列中的任务;
4.
TIDYING
:如果线程池中workerCount=0,即有效线程数量为0时,会进入该状态;
5.
TERMINATED
:在terminated()方法执行完后进入该状态,只不过terminated()方法需要我们自行实现。
我们再来看看位运算:
-
COUNT_BITS
表示ctl变量中表示有效线程数量的位数,这里COUNT_BITS=29;
-
CAPACITY
表示最大有效线程数,根据位运算得出;
-
COUNT_MASK
=11111111111111111111111111111,折算成十进制大约是5亿,在设计之初就已经想到不会开启超过5亿条线程,所以完全够用了;
线程池状态的位运算得到以下值:
RUNNING
:高三位值1112.
SHUTDOWN
:高三位值0003.
STOP
:高三位值0014.
TIDYING:
高三位值0105.
TERMINATED:
高三位值011
这里简单解释一下Doug Lea大神为什么使用一个Integer变量表示两个值:
很多人会想,一个变量表示两个值,就节省了存储空间。
但是这里很显然不是为了节省空间而设计的,即使将这两个值拆分成两个Integer值,一个线程池也就多了4个字节而已
为了这4个字节而去大费周章地设计一通,显然不是Doug Lea大神的初衷。
在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况。
如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的。
Worker
Worker类继承了AQS,并实现了Runnable接口,它有两个重要的成员变量:
firstTask
和
thread
firstTask用于保存第一次新建的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
如何在线程池中添加任务?
线程池要执行任务,那么必须先添加任务,execute()虽说是执行任务的意思,但里面也包含了添加任务的步骤在里面。
来看下面源码:
java.util.concurrent.ThreadPoolExecutor#execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
我在这里画一下execute执行任务的流程图:
继续往下看,addWorker添加任务,方法源码有点长,我按照逻辑拆分成两部分讲解:
java.util.concurrent.ThreadPoolExecutor#addWorker:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
这里特别强调:
firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。
以上for循环代码主要作用是判断 ctl 变量当前的状态是否可以添加任务,
特别说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;
同时增加工作线程数量使用了AQS作同步,如果同步失败,则继续循环执行。
// 任务是否已执行
boolean workerStarted = false;
// 任务是否已添加
boolean workerAdded = false;
// 任务包装类,我们的任务都需要添加到Worker中
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
以上源码主要的作用是创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中
这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁
。
添加步骤做完后就启动线程来执行任务了,继续往下看。
如何执行任务?
我们注意到上面的代码中:
// 启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
这里的t是w.thread得到的,即是Worker中用于执行任务的线程,该线程由ThreadFactory创建,我们再看看生成Worker的构造方法:
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
newThread传的参数是Worker本身,而Worker实现了Runnable接口
所以当我们执行t.start()时,执行的是Worker的run()方法
,找到入口了:
java.util.concurrent.ThreadPoolExecutor.Worker#run:
public void run() {
runWorker(this);
}
java.util.concurrent.ThreadPoolExecutor#runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}