废话少说,直接进入正题。
相信大家对
XXL-JOB
都很了解,故本文对源码不进行过多介绍,侧重的是
看源码过程中想到的几个知识点
,不一定都对,请大神们批评指正。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/ruoyi-vue-pro
视频教程:https://doc.iocoder.cn/video/
XXL-JOB
是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
XXL-JOB
分为调度中心、执行器、数据中心,调度中心负责任务管理及调度、执行器管理、日志管理等,执行器负责任务执行及执行结果回调。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/YunaiV/yudao-cloud
视频教程:https://doc.iocoder.cn/video/
时间轮出自
Netty
中的
HashedWheelTimer
,是一个环形结构,可以用时钟来类比,钟面上有很多
bucket
,每一个
bucket
上可以存放多个任务,使用一个
List
保存该时刻到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应
bucket
上所有到期的任务。任务通过
取模
决定应该放入哪个
bucket
。和
HashMap
的原理类似,
newTask
对应
put
,使用
List
来解决 Hash 冲突。
以上图为例,假设一个
bucket
是1秒,则指针转动一轮表示的时间段为8s,假设当前指针指向 0,此时需要调度一个3s后执行的任务,显然应该加入到(0+3=3)的方格中,指针再走3s次就可以执行了;如果任务要在10s后执行,应该等指针走完一轮零2格再执行,因此应放入2,同时将
round(1)
保存到任务中。检查到期任务时只执行
round
为0的,
bucket
上其他任务的
round
减1。
当然,还有优化的“分层时间轮”的实现,请参考
https://cnkirito.moe/timer/
。
XXL-JOB中的调度方式从
Quartz
变成了自研调度的方式,很像时间轮,可以理解为有60个
bucket
且每个
bucket
为1秒,但是没有了
round
的概念。
XXL-JOB中负责任务调度的有两个线程,分别为
ringThread
和
scheduleThread
,其作用如下。
❝
1、scheduleThread:对任务信息进行读取,预读未来
5s
即将触发的任务,放入时间轮。2、ringThread:对当前
bucket
和前一个
bucket
中的任务取出并执行。
下面结合源代码看下,为什么说是“类时间轮”,关键代码附上了注解,请大家留意观看。
// 环状结构 private volatile static Map> ringData = new ConcurrentHashMap<>();// 任务下次启动时间(单位为秒) % 60 int ringSecond = (int )((jobInfo.getTriggerNextTime()/1000 )%60 );// 任务放进时间轮 private void pushTimeRing (int ringSecond, int jobId) { // push async ring List ringItemData = ringData.get(ringSecond); if (ringItemData == null ) { ringItemData = new ArrayList(); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId); }// 同时取两个时间刻度的任务 List ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0 ; i 2; i++) { List tmpData = ringData.remove( (nowSecond+60 -i)%60 ); if (tmpData != null ) { ringItemData.addAll(tmpData); } }// 运行 for (int jobId: ringItemData) { JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1 , null , null ); }
大家也知道,
XXL-JOB
在执行任务时,任务具体在哪个执行器上运行是根据路由策略来决定的,其中有一个策略是一致性Hash策略(源码在ExecutorRouteConsistentHash.java),自然而然想到了
一致性Hash算法
。
一致性Hash算法
是为了解决分布式系统中负载均衡的问题时候可以使用Hash算法让固定的一部分请求落到同一台服务器上,这样每台服务器固定处理一部分请求(并维护这些请求的信息),起到负载均衡的作用。
普通的余数hash(hash(比如用户id)%服务器机器数)算法伸缩性很差,当新增或者下线服务器机器时候,用户id与服务器的映射关系会大量失效。一致性hash则利用hash环对其进行了改进。
一致性Hash算法
在实践中,当服务器节点比较少的时候会出现上节所说的一致性hash倾斜的问题,一个解决方法是多加机器,但是加机器是有成本的,那么就加
虚拟节点
。
具体原理请参考https://www.jianshu.com/p/e968c081f563。
下图为带有虚拟节点的Hash环,其中ip1-1是ip1的虚拟节点,ip2-1是ip2的虚拟节点,ip3-1是ip3的虚拟节点。
可见
,一致性Hash算法的关键在于
Hash算法
,保证
虚拟节点
及
Hash结果
的均匀性,而均匀性可以理解为
减少Hash冲突
,Hash冲突的知识点本文暂不扩展,历史文章中有。或者将来我再抽时间写。
XXL-JOB中的一致性Hash的Hash函数如下。
// jobId转换为md5 // 不直接用hashCode() 是因为扩大hash取值范围,减少冲突 byte [] digest = md5.digest();// 32位hashCode long hashCode = ((long ) (digest[3 ] & 0xFF ) <24) | ((long ) (digest[2 ] & 0xFF ) <16) | ((long ) (digest[1 ] & 0xFF ) <8) | (digest[0 ] & 0xFF );long truncateHashCode = hashCode & 0xffffffffL ;
看到上图的Hash函数,让我想到了
HashMap
的Hash函数
f(key) = hash(key) & (table.length - 1 ) // 使用>>> 16的原因,hashCode()的高位和低位都对f(key)有了一定影响力,使得分布更加均匀,散列冲突的几率就小了。 hash(key) = (h = key.hashCode()) ^ (h >>> 16 )
同理,将jobId的md5编码的高低位都对Hash结果有影响,使得Hash冲突的概率减小。
XXL-JOB的分片任务实现了任务的分布式执行,其实是笔者调研的重点,日常开发中很多定时任务都是单机执行,对于后续数据量大的任务最好有一个分布式的解决方案。
分片任务的路由策略,源代码作者提出了
分片广播
的概念,刚开始还有点摸不清头脑,看了源码逐渐清晰了起来。
想必看过源码的也遇到过这么一个小插曲,路由策略咋没实现?如下图所示。
public enum ExecutorRouteStrategyEnum { FIRST(I18nUtil.getString("jobconf_route_first" ), new ExecutorRouteFirst()), LAST(I18nUtil.getString("jobconf_route_last" ), new ExecutorRouteLast()), ROUND(I18nUtil.getString("jobconf_route_round" ), new ExecutorRouteRound()), RANDOM(I18nUtil.getString("jobconf_route_random" ), new ExecutorRouteRandom()), CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash" ), new ExecutorRouteConsistentHash()), LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu" ), new ExecutorRouteLFU()), LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru" ), new ExecutorRouteLRU()), FAILOVER(I18nUtil.getString("jobconf_route_failover" ), new ExecutorRouteFailover()), BUSYOVER(I18nUtil.getString("jobconf_route_busyover" ), new ExecutorRouteBusyover()), // 说好的实现呢???竟然是null SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard" ), null );
再继续追查得到了结论,待我慢慢道来,首先分片任务执行参数传递的是什么?看
XxlJobTrigger.trigger
函数中的一段代码。
...// 如果是分片路由,走的是这段逻辑 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null ) && group.getRegistryList() != null && !group.getRegistryList().isEmpty() && shardingParam == null ) { for (int i = 0 ; i // 最后两个参数,i是当前机器在执行器集群当中的index,group.getRegistryList().size()为执行器总数 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } ...
参数经过自研RPC传递到执行器,在执行器中具体负责任务执行的
JobThread.run
中,看到了如下代码。
// 分片广播的参数比set进了ShardingUtil ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); ...// 将执行参数传递给jobHandler执行 handler.execute(triggerParamTmp.getExecutorParams())
接着看
ShardingUtil
,才发现了其中的奥秘,请看代码。