专栏名称: 石杉的架构笔记
专注原创、用心雕琢!十余年BAT一线大厂架构经验倾囊相授
目录
相关文章推荐
军武次位面  ·  IDEX ... ·  昨天  
中国民兵  ·  想成为“兵王”,这一步非常重要!! ·  昨天  
上海证券报  ·  今夜,巴菲特,大超预期 ·  2 天前  
中国证券报  ·  适时增加权益投资!这家险企今年这样干 ·  2 天前  
中国兵器工业集团  ·  兵器工业集团召开2025年度质量安全工作会 ·  3 天前  
51好读  ›  专栏  ›  石杉的架构笔记

吃透本文,面试再被问到线程池底层原理,“狠狠”喷就是了!

石杉的架构笔记  · 公众号  ·  · 2019-05-01 11:00

正文

公众号后台回复 “ 资料

获取作者独家秘制学习资料


这篇文章,我们从源码级别来聊聊线程池的运行原理,帮大家彻底吃透线程池的底层!


ThreadPoolExecutor

在深入源码之前先来看看J.U.C包中的线程池类图:


它们的最顶层是一个Executor接口,它只有一个方法:

public interface Executor {    void execute(Runnable command);}

它提供了一个运行新任务的简单方法,Java线程池也称之为Executor框架。


ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等。


此外,它扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。


ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,也是这篇文章重点分析的对象


一般我们使用线程池,如没有特殊要求,直接创建ThreadPoolExecutor,初始化一个线程池


如果需要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能。


比如ScheduledThreadPoolExecutor,它是一个具有定时执行任务的线程池。


接下来,我们就开始ThreadPoolExecutor的源码分析(以下源码为JDK8版本)


ctl变量

ctl是一个Integer值,它是对线程池运行状态和线程池中有效线程数量进行控制的字段


Integer值一共有32位, 其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"


我们看看Doug Lea大神是如何实现的:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl// 通过位运算获取线程池运行状态private static int runStateOf(int c) { return c & ~CAPACITY; }// 通过位运算获取线程池中有效的工作线程数private static int workerCountOf(int c) { return c & CAPACITY; }// 初始化ctl变量值private static int ctlOf(int rs, int wc) { return rs | wc; }


线程池一共有状态5种状态,分别是:

1. Running :线程池初始化时默认的状态,表示线程正处于运行状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务;

2. SHUTDOWN :调用shutdown()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,但是还会处理阻塞队列中的任务;

3. STOP :调用shutdownNow()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,同时不再处理阻塞队列中的任务;

4. TIDYING :如果线程池中workerCount=0,即有效线程数量为0时,会进入该状态;

5. TERMINATED :在terminated()方法执行完后进入该状态,只不过terminated()方法需要我们自行实现。


我们再来看看位运算:

  • COUNT_BITS 表示ctl变量中表示有效线程数量的位数,这里COUNT_BITS=29;

  • CAPACITY 表示最大有效线程数,根据位运算得出;

  • COUNT_MASK =11111111111111111111111111111,折算成十进制大约是5亿,在设计之初就已经想到不会开启超过5亿条线程,所以完全够用了;


线程池状态的位运算得到以下值:

RUNNING :高三位值1112.

SHUTDOWN :高三位值0003.

STOP :高三位值0014.

TIDYING: 高三位值0105.

TERMINATED: 高三位值011


这里简单解释一下Doug Lea大神为什么使用一个Integer变量表示两个值: 很多人会想,一个变量表示两个值,就节省了存储空间。


但是这里很显然不是为了节省空间而设计的,即使将这两个值拆分成两个Integer值,一个线程池也就多了4个字节而已


为了这4个字节而去大费周章地设计一通,显然不是Doug Lea大神的初衷。


在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况。


如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的。


Worker

Worker类继承了AQS,并实现了Runnable接口,它有两个重要的成员变量: firstTask thread


firstTask用于保存第一次新建的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。


如何在线程池中添加任务?

线程池要执行任务,那么必须先添加任务,execute()虽说是执行任务的意思,但里面也包含了添加任务的步骤在里面。


来看下面源码:

java.util.concurrent.ThreadPoolExecutor#execute:

public void execute(Runnable command) {  // 如果添加订单任务为空,则空指针异常  if (command == null)    throw new NullPointerException();  // 获取ctl值  int c = ctl.get();  // 1.如果当前有效线程数小于核心线程数,调用addWorker执行任务(即创建一条线程执行该任务)  if (workerCountOf(c) < corePoolSize) {    if (addWorker(command, true))      return;    c = ctl.get();  }  // 2.如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列执行  if (isRunning(c) && workQueue.offer(command)) {    int recheck = ctl.get();    if (! isRunning(recheck) && remove(command))      reject(command);    else if (workerCountOf(recheck) == 0)      addWorker(null, false);  }  // 3.如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务)  else if (!addWorker(command, false))    // 如果创建线程失败,则调用线程拒绝策略    reject(command);}


我在这里画一下execute执行任务的流程图:


继续往下看,addWorker添加任务,方法源码有点长,我按照逻辑拆分成两部分讲解:

java.util.concurrent.ThreadPoolExecutor#addWorker:

retry:for (;;) {  int c = ctl.get();  // 获取线程池当前运行状态  int rs = runStateOf(c);
// 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了 // 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务 // 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { // 获取有效线程数量 int wc = workerCountOf(c); // 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务 // 或者有效线程数大于等于当前限制的线程数,也不能添加任务 // 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 使用AQS增加有效线程数量 if (compareAndIncrementWorkerCount(c)) break retry; // 如果再次获取ctl变量值 c = ctl.get(); // Re-read ctl // 再次对比运行状态,如果不一致,再次循环执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop }}


这里特别强调: firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。


以上for循环代码主要作用是判断 ctl 变量当前的状态是否可以添加任务, 特别说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;


同时增加工作线程数量使用了AQS作同步,如果同步失败,则继续循环执行。

// 任务是否已执行




    
boolean workerStarted = false;// 任务是否已添加boolean workerAdded = false;// 任务包装类,我们的任务都需要添加到Worker中Worker w = null;try {  // 创建一个Worker  w = new Worker(firstTask);  // 获取Worker中的Thread值  final Thread t = w.thread;  if (t != null) {    // 操作workers HashSet 数据结构需要同步加锁    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {      // Recheck while holding lock.      // Back out on ThreadFactory failure or if      // shut down before lock acquired.      // 获取当前线程池的运行状态      int rs = runStateOf(ctl.get());      // rs < SHUTDOWN表示是RUNNING状态;      // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。      // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务      // rs是RUNNING状态时,直接创建线程执行任务      // 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也说说明了SHUTDOWN状态时不再接受新任务      if (rs < SHUTDOWN ||          (rs == SHUTDOWN && firstTask == null)) {        if (t.isAlive()) // precheck that t is startable          throw new IllegalThreadStateException();        workers.add(w);        int s = workers.size();        if (s > largestPoolSize)          largestPoolSize = s;        workerAdded = true;      }    } finally {      mainLock.unlock();    }    // 启动线程执行任务    if (workerAdded) {      t.start();      workerStarted = true;    }  }} finally {  if (! workerStarted)    addWorkerFailed(w);}return workerStarted;}


以上源码主要的作用是创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中


这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁


添加步骤做完后就启动线程来执行任务了,继续往下看。


如何执行任务?

我们注意到上面的代码中:

// 启动线程执行任务if (workerAdded) {  t.start();  workerStarted = true;}


这里的t是w.thread得到的,即是Worker中用于执行任务的线程,该线程由ThreadFactory创建,我们再看看生成Worker的构造方法:

Worker(Runnable firstTask) {  setState(-1); // inhibit interrupts until runWorker  this.firstTask = firstTask;  this.thread = getThreadFactory().newThread(this);}


newThread传的参数是Worker本身,而Worker实现了Runnable接口


所以当我们执行t.start()时,执行的是Worker的run()方法 ,找到入口了:


java.util.concurrent.ThreadPoolExecutor.Worker#run:

public void run() {  runWorker(this);}


java.util.concurrent.ThreadPoolExecutor#runWorker:

final void runWorker(Worker w) {  Thread wt = Thread.currentThread();  Runnable task = w.firstTask;  w.firstTask = null;  w.unlock(); // allow interrupts  boolean completedAbruptly = true;  try {    // 循环从workQueue阻塞队列中获取任务并执行    while (task != null || (task = getTask()) != null) {      // 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题      w.lock();      // 如果线程池正在关闭,须确保中断当前线程      if ((runStateAtLeast(ctl.get(), STOP) ||           (Thread.interrupted() &&            runStateAtLeast(ctl.get(), STOP))) &&          !wt.isInterrupted())        wt.interrupt();      try {        // 执行任务前可以做一些操作        beforeExecute(wt, task);        Throwable thrown = null;        try {          // 执行任务          task.run();        } catch (RuntimeException x) {          thrown = x; throw x;        } catch (Error x) {          thrown = x; throw x;        } catch (Throwable x) {          thrown = x; throw new Error(x);        } finally {          // 执行任务后可以做一些操作          afterExecute(task, thrown);        }      } finally {        // 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务        task = null;        // 记录Worker执行了多少次任务        w.completedTasks++;        w.unlock();      }    }






请到「今天看啥」查看全文