前言
说到线程池八股文背的很熟的肯定知道无非就这几个考点:
线程池三大核心参数
corePoolSize
、
maximumPoolSize
、
workQueue
的含义
建议通过
ThreadPoolExecutor
的构造函数来声明,避免使用
Executors
创建线程池
“
以上考点作为线程池面试几乎必问的内容,大部分人应该都是如数家珍,张口就来,但是懂了面试八股文真的就不一定在实际运用中真的就会把线程池用好。且看下面这次真实生产事故还原
1、事故还原
某次一位研发同事写出了下面类似的代码:
List items = getFromDb(); List> completableFutures = items.stream().map(item -> CompletableFuture.supplyAsync(() -> { AppMapStationData data = mapper.copy(item); // 发起价格信息查询的RPC调用 data.setPriceInfo(priceApi.getPriceInfoById(item.getId())) return data; }, apiExecutor)).collect(Collectors.toList()); result = completableFutures.stream().map(e -> { return e.get(); }).filter(Objects::nonNull).collect(Collectors.toList());
上面的代码中,代码首先从数据库里面查出来一堆对象,然后对每一个对象进行模型转换,由于要获取每个对象的价格信息发起了一次RPC调用,由于RPC服务没有提供批量接口,所以代码里面用了线程池并发请求,以求得接口尽可能快的返回数据。
使用的是CompletableFuture 而且自定义了线程池,线程池指定了10个核心线程,20个最大线程,这段代码在上线后的一段时间确实没有任何问题,但是在灰度放量用户量多起来之后发现接口经常超时告警。
请问为什么上面的代码在用户量稍微大一点的时候就运行缓慢了呢?
实际代码问题出现在了这个get方法中,这个get方法没有指定超时时间,当getPriceInfoById这个接口响应变慢的时候,这个主线程的代码get又没有指定超时时间,这时候问题就来了。
由于某次业务查询查到了非常多的数据,每条数据就是个模型转换任务,这个任务就会在队列排队,get方法没有指定超时时间的情况下,其最终耗时就取决于整个线程池中执行最慢的那一个任务,所以当从DB中查出来的数据量越来越大的时候这个转换任务的最大耗时就会逐渐增加,进而引发接口超时。
所以这里改进上述问题需要做到两个点:
此外需要知道get方法设置超时时间的计算方式也需要留意,考虑下面这种场景
提交两个任务 A 和 B 到线程池,A 任务耗时 3 秒,B 任务耗时 4 秒,Future 以 2 秒为超时时间获取任务结果
代码如下:
ExecutorService executorService = Executors.newFixedThreadPool(2 ); Callable taskA = () -> { sleep(3 ); return "A" ; }; Callable taskB = () -> { sleep(4 ); return "B" ; }; List> futures = Stream.of(taskA, taskB) .map(executorService::submit) .collect(Collectors.toList());for (Future future : futures) { try { String s = future.get(2 , TimeUnit.SECONDS); System.out.println(s); } catch (Exception e) { continue ; } }
实际运行情况是第一个任务会超时但是第二个不会,看起来是不是还有点不可思议,耗时时间长的任务B反而没超时。原因就在于
Future.get(long timeout, TimeUnit unit)
,调用 get 时才开始计时,而非任务加入线程池的时间
从图上就可以看出来,在获取B的任务执行结果的时候B任务已经执行了两秒,所以在等待两秒的情况下可以获取到结果
2、线程池不当使用举例
1)不区分业务一把梭哈,全用一个线程池
曾经有一个项目,对接多个租户,每个租户都有各自的任务需要执行,代码中不区分租户的将所有租户的任务全部丢到一个线程池中执行,结果一个租户的任务提交过多导致线程池执行缓慢,但是由于线程池是同一个,影响了所有租户接口的响应时间。如果说上面说的这个场景用一个线程池产生了租户互相影响的问题还不够严重,那么下面的这种场景就问题更大了。
曾经有一段这样的场景,因为共用线程池直接导致线程池任务永远完成不了,请看下面的这种情况:
首先向线程池中提交了一个任务,然后在这个任务的内部实现中又往同一个线程池中再次提交了一个任务,相当于父子任务在同一个线程池中执行,这时候极有可出现线程死锁也就是循环等待的情况
如上图所示,父任务全部处于执行状态,这时候子任务想要执行需要等父任务执行完成,但是父任务都执行不完,因为还有个子任务没完成,即父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了 "死锁"
所以综上所述,在代码中应该避免各种任务都往一个线程池中投放,对每个线程池指定好线程名称,做好分类比较合适,这里在日常开发中比较推荐使用Guava的工具类,来指定线程名称前缀,这样使用jstack分析线程问题也方便排查。
ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(threadNamePrefix + "-%d" ) .setDaemon(true ).build(); ExecutorService threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
2)@Async注解不自己定义线程池
@Async用在方法上标识这是一个异步方法,如果不自己指定线程池这个方法将直接新建一个线程执行,可以翻看spring实现源码知道这个点
@Async的实现其实非常简单就是利用AOP,容器启动的时候会扫描所有被打上@Async注解的方法,并代理这些方法的执行,在执行这个方法的时候,生成Callable任务丢到线程池中执行(核心代码位于
org.springframework.aop.interceptor.AsyncExecutionInterceptor
)
@Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } // 将方法调用封装成 Callable 实例丢入线程池中执行 Callable task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
如果不指定线程池这里就会启用默认的线程池
SimpleAsyncTaskExecutor
然后我们看下这个类的注释
那这个问题就很严重了,假定你的方法执行速度慢,而且qps大,这时候线程数就会直接爆炸,所以建议写一个类继承
AsyncConfigurer
接口并复写
getAsyncExecutor
方法,然后在使用注解的时候指定线程池的名称
//使用注解时指定线程池的Bean名称 @Async ("apiExecutor" )
3)线程池遇上ThreadLocal
线程池和 ThreadLocal 共用,可能会导致线程从 ThreadLocal 获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的 ThreadLocal 值。
比较常规的做法是在任务执行完毕之后的finally代码块里面做清理工作
Runnable runnable = () -> { try { BizThreadLocal.set("xxxx" ); // do sth } finally { BizThreadLocal.remove(); } };
但是其实finally的代码块其实也不是百分百一定执行,事实上
Thread#stop()
方法打断线程执行的时候 finally代码块中的内容就不会执行,比较推荐的还是
# TransmittableThreadLocal
可以查看这篇文章获取更多详细内容
“
https://juejin.cn/post/7286088186848591932
3、再谈线程池,几个关键要点
1)为什么默认线程池的队列长度不能动态调整?
曾经面对生产环境线程池的参数设定问题,我曾经想到一个方案,既然线程池的参数不好定,那咱们直接动态修改就行不行呢,线程池本身提供了很多的set方法可以做到参数修改,比如我们在springBoot项目往往去使用
ThreadPoolTaskExecutor
作为线程池,从下图的set方法列表中可以看出存在很多修改线程池参数的方法
然后实际使用的时候发现核心线程数和最大线程数都能动态修改 但是队列长度却不能,为什么队列长度不能调用
setQueueCapacity
方法进行动态修改呢?
首先我们可以简单理解为spring的
ThreadPoolTaskExecutor
是Java原生
ThreadPoolExecutor
的封装,观察这个类的
setMaxPoolSize
和
setQueueCapacity
代码实现我们就能发现
setQueueCapacity
实际就是一个赋值仅在第一次实例化线程池的使用到了这个参数。
public void setMaxPoolSize (int maxPoolSize) { synchronized (this .poolSizeMonitor) { if (this .threadPoolExecutor != null ) { this .threadPoolExecutor.setMaximumPoolSize(maxPoolSize); } this .maxPoolSize = maxPoolSize; } }public void setQueueCapacity (int queueCapacity) { this .queueCapacity = queueCapacity; }protected BlockingQueue createQueue (int queueCapacity) { return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue()); }
从上面的源码我们还可以看出spring的
ThreadPoolTaskExecutor
使用的队列是
LinkedBlockingQueue
,那么为啥线程池
ThreadPoolExecutor
不支持修改队列长度呢?这个原因就很简单了因为这个队列的capacity是final类型的,自然不能修改。
那如果我一定要修改这个队列长度应该怎么处理?那完全就可以仿照美团的方式,自定义了一个叫做
ResizableCapacityLinkedBlockIngQueue
的队列(把
LinkedBlockingQueue
的capacity 字段的final关键字修饰给去掉了,让它变为可变的)是不是也是很简单。
2)再谈核心线程数的参数设置
核心线程池的参数设置一般各种网络资料中比较推崇的是N+1和2N法,即:
CPU 密集型任务(N+1):
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N):
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型:
简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。
IO 密集型:
涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。