大家好,今天教大家撸一个 Java 的多线程永动任务,
这个示例的原型是公司自研的多线程异步任务项目
,我把里面涉及到多线程的代码抽离出来,然后进行一定的改造。
里面涉及的知识点非常多,特别适合有
一定工作经验
的同学学习,或者可以直接拿到项目中使用。
文章结构非常简单:
做这个多线程异步任务,主要是因为我们有很多永动的异步任务,什么是永动呢?就是
任务跑起来后,需要一直跑下去。
比如消息 Push 任务,因为一直有消息过来,所以需要一直去消费 DB 中的未推送消息,就需要整一个 Push 的永动异步任务。
我们的需求其实不难,简单总结一下:
-
-
每个异步任务,支持开
多个线程
去消费这个任务的数据;
-
支持永动异步任务的
优雅关闭
,即关闭后,需要把所有的数据消费完毕后,再关闭。
完成上面的需求,需要注意几个点:
-
-
每个
子任务
,因为需要支持并发,需要用线程池控制;
-
永动任务的关闭,需要通知子任务的并发线程,并
支持永动任务和并发子任务的优雅关闭
。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
-
项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
-
视频教程:https://doc.iocoder.cn/video/
对于子任务,需要支持并发,如果每个并发都开一个线程,用完就关闭,对资源消耗太大,所以引入线程池:
public class TaskProcessUtil {
// 每个任务,都有自己单独的线程池
privatestatic Map executors = new ConcurrentHashMap<>();
// 初始化一个线程池
private static ExecutorService init(String poolName, int poolSize) {
returnnew ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
new ThreadFactoryBuilder().setNameFormat("Pool-" + poolName).setDaemon(false).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
// 获取线程池
public static ExecutorService getOrInitExecutors(String poolName,int poolSize) {
ExecutorService executorService = executors.get(poolName);
if (null == executorService) {
synchronized (TaskProcessUtil.class) {
executorService = executors.get(poolName);
if (null == executorService) {
executorService = init(poolName, poolSize);
executors.put(poolName, executorService);
}
}
}
return executorService;
}
// 回收线程资源
public static void releaseExecutors(String poolName) {
ExecutorService executorService = executors.remove(poolName);
if (executorService != null) {
executorService.shutdown();
}
}
}
这是一个线程池的工具类,这里初始化线程池和回收线程资源很简单,我们主要讨论获取线程池。
获取线程池可能会存在并发情况,所以需要加一个 synchronized 锁,然后锁住后,需要对 executorService 进行二次判空校验。
为了更好讲解单个任务的实现方式,我们的任务主要就是把 Cat 的数据打印出来,Cat 定义如下:
@Data
@Service
public class Cat {
private String catName;
public Cat setCatName(String name) {
this.catName = name;
return this;
}
}
单个任务主要包括以下功能:
-
获取永动任务数据
:这里一般都是扫描 DB,我直接就简单用 queryData() 代替。
-
多线程执行任务
:需要把数据拆分成 4 份,然后分别由多线程并发执行,这里可以通过线程池支持;
-
永动任务优雅停机
:当外面通知任务需要停机,需要执行完剩余任务数据,并回收线程资源,退出任务;
-
永动执行
:如果未收到停机命令,任务需要一直执行下去。
直接看代码:
public class ChildTask {
privatefinalint POOL_SIZE = 3; // 线程池大小
privatefinalint SPLIT_SIZE = 4; // 数据拆分大小
private String taskName;
// 接收jvm关闭信号,实现优雅停机
protectedvolatileboolean terminal = false;
public