专栏名称: 芋道源码
纯 Java 源码分享公众号,目前有「Dubbo」「SpringCloud」「Java 并发」「RocketMQ」「Sharding-JDBC」「MyCAT」「Elastic-Job」「SkyWalking」「Spring」等等
目录
相关文章推荐
芋道源码  ·  字节一面:kafka为什么这么快? ·  2 天前  
芋道源码  ·  张雪峰公司今年的年终奖... ·  2 天前  
芋道源码  ·  用了 6 年的Spring Boot ... ·  3 天前  
芋道源码  ·  放弃Websocket,使用 SSE ... ·  3 天前  
51好读  ›  专栏  ›  芋道源码

自己写一个分布式定时任务框架+负载均衡+OpenAPI异步调用!!

芋道源码  · 公众号  · Java  · 2025-01-26 12:48

正文

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、ERPCRMAI 大模型等等功能:

  • Boot 多模块架构:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服务架构:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 双版本 

来源:juejin.cn/post/
7436925608943419411


项目背景

目前的定时任务框架已经很成熟,从QuartZxxl-job,再到近几年出现的PowerJob,既然有这么多的好的实现,为什么还是选择重写一个定时任务框架呢?

开发中遇到这样的场景,业务层面需要频繁的创建修改定时任务,在考虑分布式的架构下,对于目前可以实现该功能的框架中:

  • MQ的延时队列无法动态调整任务参数;
  • redis的过期策略需要保存太久的key且可能会有BigKey
  • xxljob没有原生的openAPI,其基于数据库锁的调度只是实现server的高可用而不是高性能;
  • powerjob的openAPI又是基于http的同步阻塞调度,并且对于server的负载均衡,由于其分组隔离设计,需要开发者手动配置,在高并发下的定时任务操作下,并不能很好的调度server集群。

主流框架往往为了适配更多的场景,支持足够多的功能,往往体积大,且不易动态扩展,为了对项目有最大的控制,在解决以上业务场景的前提下,进行部分功能的修剪,也希望能更好的从中学习主流框架的设计思想,于是决定重写一个定时任务框架。

本文章主要介绍该项目相对于目前主流定时任务框架的特性,对于定时任务调度和发现的详细可以见源码,文章末尾也给出了流程图方便理解

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

定位

这是一个基于 PowerJob 的重写和重构版本,修改和扩展了原始项目的功能,以更好地适配业务需求。

  • 支持定时任务频繁创建和任务参数频繁动态变动的场景(提供轻量API,并使用内置消息队列异步处理)
  • 支持大量定时任务并发执行的场景,实现负载均衡(分组隔离+应用级别的锁实现)
  • 主要针对小型任务 ,无需过多配置,不对任务实例进行操作

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

技术选型

通信 : gRPC(基于netty的nio)
序列化 :Protobuf编码格式编解码
负载均衡 :自己实现的注册中心NameServer
    |___ 策略 : 服务端最小调度次数策略
    |___ 交互 :pull+push
消息队列 : 自己实现的简易消息队列
    |___ 消息发送 : 异步+超时重试
    |___ 持久化 :mmap+同步刷盘策略
    |___ 消息重试 :多级延时队列+死信队列
定时调度 : 时间轮算法

项目结构

├── LICENSE
├── k-job-common // 各组件的公共依赖,开发者无需感知
├── k-job-nameServer // server和worker的注册中心,提供负载均衡
├── k-job-producer //普通Jar包,提供 OpenAPI,内置消息队列的异步发送
├── k-job-server // 基于SpringBoot实现的调度服务器
├── k-job-worker-boot-starter // kjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 kjob-server 
├── k-job-worker // 普通Jar包,接入kjob-server的应用需要依赖该Jar包
└── pom.xml

特性

负载均衡(解决大量定时任务并发执行场景)

对于worker的负载均衡策略有许多且已经由较好的解决(轮询,健康值等),但是,我们目前的系统存在大量的定时任务,考虑server层面,可能会存在以下情况:

  • server一次调度从DB中获取太多任务,可能会OOM
  • 发起调度请求是由线程池负责,可能会有性能瓶颈,我们的系统对时间是敏感的,对时间精度高要求
  • 我们的OpenAPI同样也不希望大量请求落在同一个server上

在分布式系统下,解决定时任务并发执行往往考虑server集群的负载均衡(这里的负载均衡特指server集群能够根据自身负载,动态调度worker集群),但是对于定时任务框架,需要关注集群下的任务重复调度问题,目前的定时任务框架大都为了解决该问题而不能较好实现负载均衡。

通过查看源码,xxljob的调度,在每次查询数据库获取任务前,通过数据库行锁进行了全局加锁,保证同一时刻只有一个server在进行调度来避免重复调度,但是无法发挥集群server的调度能力

对于powerjob的调度,通过分组隔离机制(详细可以看官方文档)避免了重复调度,但是同样带来了问题:同一app下的worker集群只能被一台server调度,如果该server的任务太多了呢?如果只有一个业务对应的app,如何用server集群来负载均衡呢?

基于以上问题,增加了一个注册中心nameServer模块来负责负载均衡:

最小调度次数策略:NameServer记录server集群状态并维护各个server的分配任务次数,由于server是否调度某个worker由表中数据决定,worker会在每次pull判断是否发起请求更新server中的调度关系表,并将目前分组交由最小调度次数的server来调度,当且仅当以下发生:

  • 同一app分组下的workerNum > threshold
  • 该分组对应的server的scheduleTimes > minServerScheduleTime x 2

考虑到server的地理位置,通信效率等因素,后续可以考虑增加每个server的权重来更优分配

关键代码如下:

 public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName) {
        // first req, serverAddress is empty
        if(serverAddress.isEmpty()){
            ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
            reBalanceInfo.setSplit(false);
            reBalanceInfo.setServerIpList(new ArrayList(serverAddressSet));
            reBalanceInfo.setSubAppName("");
            return reBalanceInfo;
        }
        ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
        // get sorted scheduleTimes serverList
        List newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(new Comparator() {
            @Override
            public int compare(String o1, String o2) {
                return (int) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));
            }
        }).collect(Collectors.toList());

        // see if split
        if(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 1){
            // return new serverIpList
            reBalanceInfo.setSplit(true);
            reBalanceInfo.setChangeServer(false);
            reBalanceInfo.setServerIpList(newServerIpList);
            reBalanceInfo.setSubAppName(appName + ":" + appName2WorkerNumMap.size());
            return reBalanceInfo;
        }
        // see if need change server
        Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 1));
        Long comparedScheduleTimes = lestScheduleTimes == 0 ? 1 : lestScheduleTimes;
        if(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 2){
            reBalanceInfo.setSplit(false);
            reBalanceInfo.setChangeServer(true);
            // first server is target lest scheduleTimes server
            reBalanceInfo.setServerIpList(newServerIpList);
            reBalanceInfo.setSubAppName("");
            return reBalanceInfo;
        }
        // return default list
        reBalanceInfo.setSplit(false);
        reBalanceInfo.setServerIpList(new ArrayList(serverAddressSet));
        reBalanceInfo.setSubAppName("");
        return reBalanceInfo;

    }

实现功能:

  • app组自动拆分: 可以为app设置组内worker数量阈值,超过阈值自动拆分subApp并分配负载均衡后的server
  • worker动态分配: 对于每一个subApp,当触发pull时,根据最小调度次数策略,可以分配至负载均衡后的server,开发者无需感知subApp

以上,解决PowerJob中同一worker分组只能被一个server调度问题,且subApp分组可以根据server的负载,实现动态依附至不同server,对于可能的重复调度问题,我们只需加上App级别的锁,相对于xxl-job的全局加锁性能更好。

消息队列(解决任务大量创建和频繁更改场景)

其实一开始用powerjob作为项目中的中间件,业务中的任务操作使用其openAPI。过程中感受最大的就是,我的业务只是根据任务id修改了任务参数,并不需要server的响应,为什么要同步阻塞?可靠性应由server保证而不是客户端的大量重试及等待。对于业务中频繁创建定时任务和改动,更应是异步操作。

一开始的想法是,使用grpc的futureStub进行异步发送,请求由Reactor线程监听事件,当事件可读时分配给业务线程池进行处理(gRPC内部已经实现)。所以需要做的似乎只是做一个Producer服务,并把stub全换成Future类型,对于jobId,我们用雪花算法拿到一个全局id就可以,无需server分配。

但是以上设计有一个致命的问题------阻塞在BlockingQueue的请求无法ack,且server宕机存在消息丢失的可能!这违背了消息队列的设计(入队--ack--持久化--消费),意味着只有被分配到线程(消费者)消费时,才能被ack,而活跃的线程数并不多。故不能仅仅依赖gRPC的内部实现,需要自己实现消息队列

  • 可靠消息

以rocketMQ为例,producer的消息会先到达broker中的队列后返回ack,consumer再轮询从broker中pull重平衡处理后的消息消费。

考虑到本项目的设计无需路由,所有的server都可以接受消息,于是不再设计broker,将server和broker结合,每个server维护自己的队列,且消费自己队列的消息,这样还能减少一次通信。

这样可靠消息的解决就变成了:

  • producer到server的消息丢失------失败或者超时则依次遍历所有的server,一定能保证消息抵达,不再阐述
  • server的队列消息丢失(机器宕机)------持久化,采用同步刷盘策略,百分之百的可靠

持久化:同步刷盘机制借鉴了rocketMQ的mmap和commitLog/consumerQueue设计,将磁盘的文件映射到内存进行读写,每次消息进来先存到buffer后触发刷盘,成功后执行写响应的回调;用consumerQueue文件作为队列,server定时pull消费消息,详细见k-job-server.consumer.DefaultMessageStore,有详细注释

// 和rocketMQ一样,读写都是用mmap,因为内存buffer就是文件的映射,只是有刷盘机制
private MappedByteBuffer commitLogBuffer;  // 映射到内存的commitlog文件
private MappedByteBuffer consumerQueueBuffer; // 映射到内存的consumerQueue文件
private final AtomicLong commitLogBufferPosition = new AtomicLong(0);// consumerLog的buffer的位置,同步刷盘的情况下与consumerLog文件的位置保持一致
private final AtomicLong commitLogCurPosition = new AtomicLong(0);// consumerLog文件的目前位置,每次刷盘后就等于buffer位置
private final AtomicLong lastProcessedOffset = new AtomicLong(0);// consumerQueue的buffer拉取commitLog的位置,与commitLog相比,重启时就是consumerQueue文件最后一条消息的索引位置
private final AtomicLong currentConsumerQueuePosition = new AtomicLong(0); // consumerQueue文件的目前位置
private final AtomicLong consumerPosition = new AtomicLong(0); // 记录消费者在consumerQueue中的消费位置,这个只在目前的系统中有,类似于rocketMQ通过pull远程拉取
  • 消息重试

对于producer,前面提到,为了应对大量定时任务的场景,对于任务的操作,应全部是异步的,我们引入超时机制即可,当超过一定的时间未收到ack,或者返回错误响应,选择下一个server发起重试

对于consumer(server),使用多级延时队列,当某个消息消费失败后,投递至下一级延迟更久的延时队列,若全都消费失败则进入死信队列,需要人工干预

 private static final Deque deadMessageQueue = new ArrayDeque<>();

    private static final List> delayQueueList = new ArrayList<>(2);
    /**
     * 逆序排序,因为重试次数到0则不再重试
     */

    private static List delayTimes = Lists.newArrayList(10000L5000L);
    public static void init(Consumer consumer) {
        delayQueueList.add(new DelayQueue<>());
        delayQueueList.add(new DelayQueue<>());
        Thread consumerThread1 = new Thread(() -> {
            try {
                while (true) {
                    // 从延时队列中取出消息(会等待直到消息到期)
                    DelayQueue delayQueue = delayQueueList.get(0);
                    if(!delayQueue.isEmpty()) {
                        DelayedMessage message = delayQueue.take();
                        consumer.consume(message.message);
                        delayQueue.remove(message);
                        System.out.println("Consumed: " + message.getMessage() + " at " + System.currentTimeMillis());
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Consumer thread interrupted");
            }
        });
     //  其他等级的延时队列

        consumerThread1.start();
    }
    public static void reConsume(MqCausa.Message msg) {
        if (msg.getRetryTime() == 0) {
            log.error("msg : {} is dead", msg);
            deadMessageQueue.add(msg);
            return;
        }
        MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 1).build();
        DelayedMessage delayedMessage = new DelayedMessage(build, delayTimes.get(build.getRetryTime()));
        delayQueueList.get(msg.getRetryTime() - 1).add(delayedMessage);
    }


// 定义一个延时消息类,实现 Delayed 接口
static class DelayedMessage implements Delayed {
    private final MqCausa.Message message;
    private final long triggerTime; // 到期时间

    public DelayedMessage(MqCausa.Message message, long delayTime) {
        this.message = message;
        // 当前时间加上延时时间,设置消息的触发时间
        this.triggerTime = System.currentTimeMillis() + delayTime;
    }
    // 获取剩余的延时时间
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    // 比较方法,用于确定消息的顺序
    @Override
    public int compareTo(Delayed other) {
        if (this.triggerTime             return -1;
        } else if (this.triggerTime > ((DelayedMessage) other).triggerTime) {
            return 1;
        }
        return 0;
    }
    public MqCausa.Message getMessage() {
        return message;
    }
}

最终实现如图所示:

实现功能:

  • 对于任务操作请求的异步发送
  • 轮询策略实现消费的负载均衡

其他

附上个人总结的对于worker和server之间服务发现以及调度的流程图

服务发现

调度

项目地址

https://github.com/karatttt/k-job


欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)