目前的定时任务框架已经很成熟,从QuartZ
到xxl-job
,再到近几年出现的PowerJob
,既然有这么多的好的实现,为什么还是选择重写一个定时任务框架呢?
开发中遇到这样的场景,业务层面需要频繁的创建修改定时任务,在考虑分布式的架构下,对于目前可以实现该功能的框架中:
- redis的过期策略需要保存太久的key且可能会有BigKey
- xxljob没有原生的openAPI,其基于数据库锁的调度只是实现server的高可用而不是高性能;
- powerjob的openAPI又是基于http的同步阻塞调度,并且对于server的负载均衡,由于其分组隔离设计,需要开发者手动配置,在高并发下的定时任务操作下,并不能很好的调度server集群。
主流框架往往为了适配更多的场景,支持足够多的功能,往往体积大,且不易动态扩展,为了对项目有最大的控制,在解决以上业务场景的前提下,进行部分功能的修剪,也希望能更好的从中学习主流框架的设计思想,于是决定重写一个定时任务框架。
本文章主要介绍该项目相对于目前主流定时任务框架的特性,对于定时任务调度和发现的详细可以见源码,文章末尾也给出了流程图方便理解
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 视频教程:https://doc.iocoder.cn/video/
这是一个基于 PowerJob 的重写和重构版本,修改和扩展了原始项目的功能,以更好地适配业务需求。
- 支持定时任务频繁创建和任务参数频繁动态变动的场景(提供轻量API,并使用内置消息队列异步处理)
- 支持大量定时任务并发执行的场景,实现负载均衡(分组隔离+应用级别的锁实现)
- 主要针对小型任务 ,无需过多配置,不对任务实例进行操作
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://github.com/YunaiV/yudao-cloud
- 视频教程:https://doc.iocoder.cn/video/
通信 : gRPC(基于netty的nio)
序列化 :Protobuf编码格式编解码
负载均衡 :自己实现的注册中心NameServer
|___ 策略 : 服务端最小调度次数策略
|___ 交互 :pull+push
消息队列 : 自己实现的简易消息队列
|___ 消息发送 : 异步+超时重试
|___ 持久化 :mmap+同步刷盘策略
|___ 消息重试 :多级延时队列+死信队列
定时调度 : 时间轮算法
├── LICENSE
├── k-job-common // 各组件的公共依赖,开发者无需感知
├── k-job-nameServer // server和worker的注册中心,提供负载均衡
├── k-job-producer //普通Jar包,提供 OpenAPI,内置消息队列的异步发送
├── k-job-server // 基于SpringBoot实现的调度服务器
├── k-job-worker-boot-starter // kjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 kjob-server
├── k-job-worker // 普通Jar包,接入kjob-server的应用需要依赖该Jar包
└── pom.xml
对于worker的负载均衡策略有许多且已经由较好的解决(轮询,健康值等),但是,我们目前的系统存在大量的定时任务,考虑server层面,可能会存在以下情况:
- server一次调度从DB中获取太多任务,可能会OOM
- 发起调度请求是由线程池负责,可能会有性能瓶颈,我们的系统对时间是敏感的,对时间精度高要求
- 我们的OpenAPI同样也不希望大量请求落在同一个server上
在分布式系统下,解决定时任务并发执行往往考虑server集群的负载均衡(这里的负载均衡特指server集群能够根据自身负载,动态调度worker集群),但是对于定时任务框架,需要关注集群下的任务重复调度问题,目前的定时任务框架大都为了解决该问题而不能较好实现负载均衡。
通过查看源码,xxljob的调度,在每次查询数据库获取任务前,通过数据库行锁进行了全局加锁,保证同一时刻只有一个server在进行调度来避免重复调度,但是无法发挥集群server的调度能力
对于powerjob的调度,通过分组隔离机制(详细可以看官方文档)避免了重复调度,但是同样带来了问题:同一app下的worker集群只能被一台server调度,如果该server的任务太多了呢?如果只有一个业务对应的app,如何用server集群来负载均衡呢?
基于以上问题,增加了一个注册中心nameServer
模块来负责负载均衡:
最小调度次数策略:NameServer记录server集群状态并维护各个server的分配任务次数,由于server是否调度某个worker由表中数据决定,worker会在每次pull判断是否发起请求更新server中的调度关系表,并将目前分组交由最小调度次数的server来调度,当且仅当以下发生:
- 同一app分组下的
workerNum > threshold
- 该分组对应的server的
scheduleTimes > minServerScheduleTime x 2
考虑到server的地理位置,通信效率等因素,后续可以考虑增加每个server的权重来更优分配
关键代码如下:
public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName) {
// first req, serverAddress is empty
if(serverAddress.isEmpty()){
ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
reBalanceInfo.setSplit(false);
reBalanceInfo.setServerIpList(new ArrayList(serverAddressSet));
reBalanceInfo.setSubAppName("");
return reBalanceInfo;
}
ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
// get sorted scheduleTimes serverList
List newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(new Comparator() {
@Override
public int compare(String o1, String o2) {
return (int) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));
}
}).collect(Collectors.toList());
// see if split
if(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 1){
// return new serverIpList
reBalanceInfo.setSplit(true);
reBalanceInfo.setChangeServer(false);
reBalanceInfo.setServerIpList(newServerIpList);
reBalanceInfo.setSubAppName(appName + ":" + appName2WorkerNumMap.size());
return reBalanceInfo;
}
// see if need change server
Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 1));
Long comparedScheduleTimes = lestScheduleTimes == 0 ? 1 : lestScheduleTimes;
if(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 2){
reBalanceInfo.setSplit(false);
reBalanceInfo.setChangeServer(true);
// first server is target lest scheduleTimes server
reBalanceInfo.setServerIpList(newServerIpList);
reBalanceInfo.setSubAppName("");
return reBalanceInfo;
}
// return default list
reBalanceInfo.setSplit(false);
reBalanceInfo.setServerIpList(new ArrayList(serverAddressSet));
reBalanceInfo.setSubAppName("");
return reBalanceInfo;
}
实现功能:
- app组自动拆分: 可以为app设置组内worker数量阈值,超过阈值自动拆分subApp并分配负载均衡后的server
- worker动态分配: 对于每一个subApp,当触发pull时,根据最小调度次数策略,可以分配至负载均衡后的server,开发者无需感知subApp
以上,解决PowerJob
中同一worker
分组只能被一个server调度问题,且subApp分组可以根据server的负载,实现动态依附至不同server
,对于可能的重复调度问题,我们只需加上App级别的锁,相对于xxl-job的全局加锁性能更好。
其实一开始用powerjob
作为项目中的中间件,业务中的任务操作使用其openAPI。过程中感受最大的就是,我的业务只是根据任务id修改了任务参数,并不需要server的响应,为什么要同步阻塞?可靠性应由server保证而不是客户端的大量重试及等待。对于业务中频繁创建定时任务和改动,更应是异步操作。
一开始的想法是,使用grpc的futureStub
进行异步发送,请求由Reactor
线程监听事件,当事件可读时分配给业务线程池进行处理(gRPC内部已经实现)。所以需要做的似乎只是做一个Producer
服务,并把stub全换成Future类型,对于jobId,我们用雪花算法拿到一个全局id就可以,无需server分配。
但是以上设计有一个致命的问题------阻塞在BlockingQueue
的请求无法ack,且server宕机存在消息丢失的可能!这违背了消息队列的设计(入队--ack--持久化--消费
),意味着只有被分配到线程(消费者)消费时,才能被ack,而活跃的线程数并不多。故不能仅仅依赖gRPC的内部实现,需要自己实现消息队列
以rocketMQ为例,producer
的消息会先到达broker中的队列后返回ack,consumer
再轮询从broker中pull重平衡处理后的消息消费。
考虑到本项目的设计无需路由,所有的server都可以接受消息,于是不再设计broker,将server和broker结合,每个server维护自己的队列,且消费自己队列的消息,这样还能减少一次通信。
这样可靠消息的解决就变成了:
- producer到server的消息丢失------失败或者超时则依次遍历所有的server,一定能保证消息抵达,不再阐述
- server的队列消息丢失(机器宕机)------持久化,采用同步刷盘策略,百分之百的可靠
持久化:同步刷盘机制借鉴了rocketMQ的mmap和commitLog/consumerQueue
设计,将磁盘的文件映射到内存进行读写,每次消息进来先存到buffer后触发刷盘,成功后执行写响应的回调;用consumerQueue
文件作为队列,server定时pull消费消息,详细见k-job-server.consumer.DefaultMessageStore
,有详细注释
// 和rocketMQ一样,读写都是用mmap,因为内存buffer就是文件的映射,只是有刷盘机制
private MappedByteBuffer commitLogBuffer; // 映射到内存的commitlog文件
private MappedByteBuffer consumerQueueBuffer; // 映射到内存的consumerQueue文件
private final AtomicLong commitLogBufferPosition = new AtomicLong(0);// consumerLog的buffer的位置,同步刷盘的情况下与consumerLog文件的位置保持一致
private final AtomicLong commitLogCurPosition = new AtomicLong(0);// consumerLog文件的目前位置,每次刷盘后就等于buffer位置
private final AtomicLong lastProcessedOffset = new AtomicLong(0);// consumerQueue的buffer拉取commitLog的位置,与commitLog相比,重启时就是consumerQueue文件最后一条消息的索引位置
private final AtomicLong currentConsumerQueuePosition = new AtomicLong(0); // consumerQueue文件的目前位置
private final AtomicLong consumerPosition = new AtomicLong(0); // 记录消费者在consumerQueue中的消费位置,这个只在目前的系统中有,类似于rocketMQ通过pull远程拉取
对于producer,前面提到,为了应对大量定时任务的场景,对于任务的操作,应全部是异步的,我们引入超时机制即可,当超过一定的时间未收到ack,或者返回错误响应,选择下一个server发起重试
对于consumer(server)
,使用多级延时队列,当某个消息消费失败后,投递至下一级延迟更久的延时队列,若全都消费失败则进入死信队列,需要人工干预
private static final Deque deadMessageQueue = new ArrayDeque<>();
private static final List> delayQueueList = new ArrayList<>(2);
/**
* 逆序排序,因为重试次数到0则不再重试
*/
private static List delayTimes = Lists.newArrayList(10000L, 5000L);
public static void init(Consumer consumer) {
delayQueueList.add(new DelayQueue<>());
delayQueueList.add(new DelayQueue<>());
Thread consumerThread1 = new Thread(() -> {
try {
while (true) {
// 从延时队列中取出消息(会等待直到消息到期)
DelayQueue delayQueue = delayQueueList.get(0);
if(!delayQueue.isEmpty()) {
DelayedMessage message = delayQueue.take();
consumer.consume(message.message);
delayQueue.remove(message);
System.out.println("Consumed: " + message.getMessage() + " at " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Consumer thread interrupted");
}
});
// 其他等级的延时队列
consumerThread1.start();
}
public static void reConsume(MqCausa.Message msg) {
if (msg.getRetryTime() == 0) {
log.error("msg : {} is dead", msg);
deadMessageQueue.add(msg);
return;
}
MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 1).build();
DelayedMessage delayedMessage = new DelayedMessage(build, delayTimes.get(build.getRetryTime()));
delayQueueList.get(msg.getRetryTime() - 1).add(delayedMessage);
}
// 定义一个延时消息类,实现 Delayed 接口
static class DelayedMessage implements Delayed {
private final MqCausa.Message message;
private final long triggerTime; // 到期时间
public DelayedMessage(MqCausa.Message message, long delayTime) {
this.message = message;
// 当前时间加上延时时间,设置消息的触发时间
this.triggerTime = System.currentTimeMillis() + delayTime;
}
// 获取剩余的延时时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 比较方法,用于确定消息的顺序
@Override
public int compareTo(Delayed other) {
if (this.triggerTime return -1;
} else if (this.triggerTime > ((DelayedMessage) other).triggerTime) {
return 1;
}
return 0;
}
public MqCausa.Message getMessage() {
return message;
}
}
最终实现如图所示:
实现功能:
附上个人总结的对于worker和server之间服务发现以及调度的流程图
https://github.com/karatttt/k-job