专栏名称: 芋道源码
纯 Java 源码分享公众号,目前有「Dubbo」「SpringCloud」「Java 并发」「RocketMQ」「Sharding-JDBC」「MyCAT」「Elastic-Job」「SkyWalking」「Spring」等等
目录
相关文章推荐
芋道源码  ·  新来个阿里 P7,仅花 2 ... ·  17 小时前  
芋道源码  ·  如何应对消息堆积? ·  昨天  
芋道源码  ·  为了DDD 熬夜撸了一套 IDEA 插件) ·  2 天前  
51好读  ›  专栏  ›  芋道源码

新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!

芋道源码  · 公众号  · Java  · 2025-03-21 09:44

正文

👉 这是一个或许对你有用 的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入 芋道快速开发平台 知识星球。 下面是星球提供的部分资料:

👉 这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、CRM 等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本

来源:楼仔

大家好,今天教大家撸一个 Java 的多线程永动任务, 这个示例的原型是公司自研的多线程异步任务项目 ,我把里面涉及到多线程的代码抽离出来,然后进行一定的改造。

里面涉及的知识点非常多,特别适合有 一定工作经验 的同学学习,或者可以直接拿到项目中使用。

文章结构非常简单:

1. 功能说明

做这个多线程异步任务,主要是因为我们有很多永动的异步任务,什么是永动呢?就是 任务跑起来后,需要一直跑下去。

比如消息 Push 任务,因为一直有消息过来,所以需要一直去消费 DB 中的未推送消息,就需要整一个 Push 的永动异步任务。

我们的需求其实不难,简单总结一下:

  1. 能同时执行多个永动的异步任务
  2. 每个异步任务,支持开 多个线程 去消费这个任务的数据;
  3. 支持永动异步任务的 优雅关闭 ,即关闭后,需要把所有的数据消费完毕后,再关闭。

完成上面的需求,需要注意几个点:

  1. 每个 永动任务 ,可以开一个线程去执行;
  2. 每个 子任务 ,因为需要支持并发,需要用线程池控制;
  3. 永动任务的关闭,需要通知子任务的并发线程,并 支持永动任务和并发子任务的优雅关闭

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

2. 多线程任务示例

2.1 线程池

对于子任务,需要支持并发,如果每个并发都开一个线程,用完就关闭,对资源消耗太大,所以引入线程池:

public class TaskProcessUtil {
    // 每个任务,都有自己单独的线程池
    privatestatic Map executors = new ConcurrentHashMap<>();

    // 初始化一个线程池
    private static ExecutorService init(String poolName, int poolSize) {
        returnnew ThreadPoolExecutor(poolSize, poolSize,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue(),
                new ThreadFactoryBuilder().setNameFormat("Pool-" + poolName).setDaemon(false).build(),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }

    // 获取线程池
    public static ExecutorService getOrInitExecutors(String poolName,int poolSize) {
        ExecutorService executorService = executors.get(poolName);
        if (null == executorService) {
            synchronized (TaskProcessUtil.class{
                executorService = executors.get(poolName);
                if (null == executorService) {
                    executorService = init(poolName, poolSize);
                    executors.put(poolName, executorService);
                }
            }
        }
        return executorService;
    }

    // 回收线程资源
    public static void releaseExecutors(String poolName) {
        ExecutorService executorService = executors.remove(poolName);
        if (executorService != null) {
            executorService.shutdown();
        }
    }
}

这是一个线程池的工具类,这里初始化线程池和回收线程资源很简单,我们主要讨论获取线程池。

获取线程池可能会存在并发情况,所以需要加一个 synchronized 锁,然后锁住后,需要对 executorService 进行二次判空校验。

2.2 单个任务

为了更好讲解单个任务的实现方式,我们的任务主要就是把 Cat 的数据打印出来,Cat 定义如下:

@Data
@Service
public class Cat {
    private String catName;
    public Cat setCatName(String name) {
        this.catName = name;
        return this;
    }
}

单个任务主要包括以下功能:

  • 获取永动任务数据 :这里一般都是扫描 DB,我直接就简单用 queryData() 代替。
  • 多线程执行任务 :需要把数据拆分成 4 份,然后分别由多线程并发执行,这里可以通过线程池支持;
  • 永动任务优雅停机 :当外面通知任务需要停机,需要执行完剩余任务数据,并回收线程资源,退出任务;
  • 永动执行 :如果未收到停机命令,任务需要一直执行下去。

直接看代码:

public class ChildTask {

    privatefinalint POOL_SIZE = 3// 线程池大小
    privatefinalint SPLIT_SIZE = 4// 数据拆分大小
    private String taskName;

    // 接收jvm关闭信号,实现优雅停机
    protectedvolatileboolean terminal = false;

    public






请到「今天看啥」查看全文