正文
前言
本文接着上文
应用限流
进行讨论。
之前谈到的限流方案只能针对于单个 JVM 有效,也就是单机应用。而对于现在普遍的分布式应用也得有一个分布式限流的方案。
基于此尝试写了这个组件:
github.com/crossoverJi…
DEMO
以下采用的是
github.com/crossoverJi…
来做演示。
在 Order 应用提供的接口中采取了限流。首先是配置了限流工具的 Bean:
@Configuration
public class RedisLimitConfig {
@Value("${redis.limit}")
private int limit;
@Autowired
private JedisConnectionFactory jedisConnectionFactory;
@Bean
public RedisLimit build() {
RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();
JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
.limit(limit)
.build();
return redisLimit;
}
}
接着在 Controller 使用组件:
@Autowired
private RedisLimit redisLimit ;
@Override
@CheckReqNo
public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
BaseResponse<OrderNoResVO> res = new BaseResponse();
//限流
boolean limit = redisLimit.limit();
if (!limit){
res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
return res ;
}
res.setReqNo(orderNoReq.getReqNo());
if (null == orderNoReq.getAppId()){
throw new SBCException(StatusEnum.FAIL);
}
OrderNoResVO orderNoRes = new OrderNoResVO() ;
orderNoRes.setOrderId(DateUtil.getLongTime());
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
res.setDataBody(orderNoRes);
return res ;
}
为了方便使用,也提供了注解:
@Override
@ControllerLimit
public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
BaseResponse<OrderNoResVO> res = new BaseResponse();
// 业务逻辑
return res ;
}
该注解拦截了 http 请求,会再请求达到阈值时直接返回。
普通方法也可使用:
@CommonLimit
public void doSomething(){}
会在调用达到阈值时抛出异常。
为了模拟并发,在
User
应用中开启了 10 个线程调用 Order(
限流次数为5
) 接口(也可使用专业的并发测试工具 JMeter
等)。
@Override
public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
//调用远程服务
OrderNoReqVO vo = new OrderNoReqVO();
vo.setAppId(1L);
vo.setReqNo(userReq.getReqNo());
for (int i = 0; i < 10; i++) {
executorService.execute(new Worker(vo, orderServiceClient));
}
UserRes userRes = new UserRes();
userRes.setUserId(123);
userRes.setUserName("张三");
userRes.setReqNo(userReq.getReqNo());
userRes.setCode(StatusEnum.SUCCESS.getCode());
userRes.setMessage("成功");
return userRes;
}
private static class Worker implements Runnable {
private OrderNoReqVO vo;
private OrderServiceClient orderServiceClient;
public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {
this.vo = vo;
this.orderServiceClient = orderServiceClient;
}
@Override
public void run() {
BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
logger.info("远程返回:" + JSON.toJSONString(orderNo));
}
}
为了验证分布式效果启动了两个 Order 应用。
效果如下:
实现原理
实现原理其实很简单。既然要达到分布式全局限流的效果,那自然需要一个第三方组件来记录请求的次数。
其中 Redis 就非常适合这样的场景。
-
每次请求时将当前时间(精确到秒)作为 Key 写入到 Redis 中,超时时间设置为 2 秒,Redis 将该 Key 的值进行自增。
-
当达到阈值时返回错误。
-
写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性。
Lua 脚本如下:
--lua 下标从 1 开始
-- 限流 key
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])
-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
if curentLimit + 1 > limit then
-- 达到限流大小 返回
return 0;
else
-- 没有达到阈值 value + 1
redis.call("INCRBY", key, 1)
redis.call("EXPIRE", key, 2)
return curentLimit + 1
end
Java 中的调用逻辑:
public boolean limit() {
String key = String.valueOf(System.currentTimeMillis() / 1000);
Object result = null;
if (jedis instanceof Jedis) {
result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
} else if (jedis instanceof JedisCluster) {
result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
} else {
//throw new RuntimeException("instance is error") ;
return false;
}
if (FAIL_CODE != (Long) result) {
return true;
} else {
return false;
}
}
所以只需要在需要限流的地方调用该方法对返回值进行判断即可达到限流的目的。
当然这只是利用 Redis 做了一个粗暴的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。
Builder 构建器
在设计这个组件时想尽量的提供给使用者清晰、可读性、不易出错的 API。