首先是老规矩,推荐一下我的企鹅交流群:
有兴趣交流springboot进行快速开发的同学可以加一下下面的企鹅群。

普通线程
-
1.实现:继承Thread或者实现Runnable接口
- 1.继承Thread,仅仅只能单继承
- 2.实现Runnable接口(可实现内部资源共享),接口可以多实现
- 3.经典问题:窗口卖票
- 2.实例化对象
- 3.执行任务
- 4.销毁线程回收资源
思考:
当多个资源需要开启线程来处理的时候,我们怎么办?是否一直在重复下面的流程:
create -> run -> destroy
复制代码
我们知道计算机的每次运行都是需要大量的资源消耗,5个线程的操作可能没有影响,5w个呢? 五万次创建和销毁才有仅仅五万次的执行吗?执行任务可能花费了大量的时间来处理这些创建和销毁。
线程池
特点
- 1.解决处理器单元内多个线程的执行问题
- 2.减少处理器单元闲置时间
- 3.增加了处理器单元工作时间内的吞吐能力(为什么这么说?我们减少了多个任务每次线程的创建和销毁浪费,提高了任务执行效率)
组成
- 1.线程池管理器(ThreadPool):负责创建、管理、销毁线程池,以及添加任务
- 2.工作线程(PoolWorker):无任务则等待,可循环、重复执行任务
- 3.任务接口(Task):每个任务必须实现接口,工作线程负责调度任务的执行,规定了任务的入口,以及任务完成后的收尾工作以及任务执行状态等等
- 4.任务队列(TaskQueue):存放没有处理的任务,提供任务缓冲机制
eg:超市结账:收营员服务组,单个收营员,收银工作,等待被收银的人群
JDK线程池类:java.util.concurrent.Executors和JDK线程池执行器接口:java.util.concurrent.Executor
在Executors中,jdk提供了一下相关的线程池,如下:
静态方法 | 创建的线程池类型 | 返回值的实际实现 |
---|---|---|
newFixedThreadPool(int) | 固定线程池 | ThreadPoolExecutor |
newWorkStealingPool() | 处理器核心数的并行线程池 | ForkJoinPool |
newSingleThreadExecutor() | 一个线程的单独线程池 | FinalizableDelegatedExecutorService |
newCachedThreadPool() | 缓存线程池 | ThreadPoolExecutor |
newSingleThreadScheduledExecutor() | 单独线程定时线程池 | DelegatedScheduledExecutorService |
newScheduledThreadPool(int) | 定时线程池 | ScheduledThreadPoolExecutor |
newSingleThreadExecutor() 一个线程的线程池
为什么这里我要拿一个线程的线程池来说明呢?其实我们把简单的搞定复杂的也是演变过来的。先上码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
复制代码
我们可以看到上面方法的返回值都是ExecutorService,但实际上实例化的是FinalizableDelegatedExecutorService,我们进去看看源码,如下:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
//构造方法
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
//对象销毁的时候调用
protected void finalize() {
super.shutdown();
}
}
复制代码
上面的代码我们可以明显的看到FinalizableDelegatedExecutorService仅仅是对DelegatedExecutorService的封装,唯一实现的就是在对象销毁的时候将ExecutorService结束。
到这里我们就应该返回来分析DelegatedExecutorService,以及上面的方法中的具体代码。
我们看看默认的单线程线程池的实现,如下:
new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
//此处的代码实现了一个ExecutorService,分别有几个参数?何解?
//
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
//我们可以看到几个参数的字面意思分别是:
//corePoolSize 核心线程数量,包括空闲线程
//maximumPoolSize 最大线程数量
//keepAliveTime 保持活跃时间(参照后续源码,这里应该是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间)
//unit keepAliveTime 参数的时间单位
//workQueue 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务
//Executors.defaultThreadFactory() 默认线程工厂
//defaultHandler 超出线程和任务队列的任务的处理程序,实现为:new AbortPolicy(),当然这里默认是没有处理的,需要我们手动实现
//这里,我们接着看默认的线程工厂,毕竟线程池核心是需要线程来执行任务,所以此处先看线程来源。
static class DefaultThreadFactory implements ThreadFactory {
//池数量,指定原子操作
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程组
private final ThreadGroup group;
//线程数量,指定原子操作
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
//获取系统安全管理器
SecurityManager s = System.getSecurityManager();
//创建线程组,由是否获取系统安全管理器决定
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//构造线程名称
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//创建线程
public Thread newThread(Runnable r) {
//将线程组、Runnable接口(线程实际执行代码块)、线程名、线程所需要的堆栈大小为0
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//如果为守护线程,取消守护状态,必须在线程执行前调用这个setDaemon方法
if (t.isDaemon())
t.setDaemon(false);
//默认任务优先级,值为5
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
//上面的默认线程工厂,提供给了我们一个非守护线程的线程,由原子操作保证线程唯一,任务优先级默认(最低1,最高10,默认5,此处优先级为5)
复制代码
看了上面这些我们可以总结一下:单线程线程池,默认只有一个线程和一个线程池,等待新任务时间为0,添加了原子操作来绑定线程。
是不是到这里就完了? 当然没有,我们现在需要看看更加具体的ThreadPoolExecutor,才能更加深入明白线程池。
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
*所有的构造方法均指向这里,所以我们看一下这个就足够
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//参数检查,说明线程池不能线程=0,也不能最大线程数量不大于0切最大线程数量不能少于核心线程数量,等待任务最长时间不能小于0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//等待任务队列、线程工厂、超任务队列的处理程序
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//上面的判断,可以看做是一种防御式编程,所有的问题预先处理,后续无需考虑类似问题
//构造线程池相关设定阈值
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
//到了这里其实我们不必先追究具体的实现,还是先看看AbstractExecutorService吧。
//抽象的执行服务
public abstract class AbstractExecutorService implements ExecutorService {
//执行方法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
//获取任务数量
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//任务集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//执行完成服务
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 记录异常
ExecutionException ee = null;
//超时时间线
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//使用迭代器获取任务
Iterator<? extends Callable<T>> it = tasks.iterator();
// 确定开始一项任务
futures.add(ecs.submit(it.next()));
//任务数量减少
--ntasks;
//正在执行任务标志
int active = 1;
//循环执行任务
for (;;) {
//获取任务队列中第一个任务
Future<T> f = ecs.poll();
//任务为空,如果还有任务则执行任务(任务数量减1,提交任务到执行队列,正在执行任务数量+1)
//正在执行任务数为0,说明任务执行完毕,中断任务循环
//若有超时检查,则执行超时检查机制
//上述情况都不满足,则取出任务队列头,并将其从队列移除
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
//任务不为空
if (f != null) {
//正在执行标志-1
--active;
try {
//返回执行结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//取消所有任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
//执行方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
//和上面类似,这里也是创建任务队列
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
//迭代进行任务执行
try {
//创建任务,并添加到任务队列
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
//设置超时时间标记
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
//在执行器没有多少多并行性的情况下,交替执行时间检查和调用。
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
//任务超时,返回任务队列
if (nanos <= 0L)
return futures;
}
//遍历任务并返回任务执行结果
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
//超时
if (nanos <= 0L)
return futures;
try {
//给定执行时间等待任务完成并返回结果
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
//未完成则取消执行
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
/**
*创建任务队列
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return