专栏名称: 亿级流量网站架构
开涛技术点滴
目录
相关文章推荐
OSC开源社区  ·  100%国产AI新成员:壁仞科技成功适配De ... ·  2 天前  
OSC开源社区  ·  谷歌安卓系统“假开源、真垄断”? ·  2 天前  
OSC开源社区  ·  漫谈DeepSeek及其背后的核心技术 ·  昨天  
程序员小灰  ·  这款AI编程工具,将会取代Cursor! ·  4 天前  
51好读  ›  专栏  ›  亿级流量网站架构

spring-retry重试与熔断详解—《亿级流量》内容补充

亿级流量网站架构  · 公众号  · 程序员  · 2017-05-01 11:01

正文

本文是《亿级流量》第6章 超时与重试机制补充内容。


spring-retry项目实现了重试和熔断功能,目前已用于SpringBatch、Spring Integration等项目。


RetryOperations定义了重试的API,RetryTemplate提供了模板实现,线程安全的,同于Spring 一贯的API风格,RetryTemplate将重试、熔断功能封装到模板中,提供健壮和不易出错的API供大家使用。

首先,RetryOperations接口API:

public interface RetryOperations {
<
T , E extends Throwable> T execute(RetryCallback< T , E >retryCallback) throws E ;
<
T , E extends Throwable> T execute(RetryCallback< T , E >retryCallback, RecoveryCallback< T > recoveryCallback) throws E ;
<
T , E extends Throwable> T execute(RetryCallback< T , E >retryCallback, RetryState retryState) throws E , ExhaustedRetryException;
<
T , E extends Throwable> T execute(RetryCallback< T , E >retryCallback, RecoveryCallback< T > recoveryCallback, RetryStateretryState)
throws E ;
}


通过RetryCallback定义需重试的业务服务,当重试超过最大重试时间或最大重试次数后可以调用RecoveryCallback进行恢复,比如返回假数据或托底数据。


那什么时候需重试?spring-retry是当抛出相关异常后执行重试策略,定义重试策略时需要定义需重试的异常(如因远程调用失败的可以重试、而因入参校对失败不应该重试)。只读操作可以重试,幂等写操作可以重试,但是非幂等写操作不能重试,重试可能导致脏写,或产生重复数据。


重试策略有哪些呢?spring-retry提供了如下重试策略。

RetryPolicy提供了如下策略实现:

  • NeverRetryPolicy:只允许调用RetryCallback一次,不允许重试;

  • AlwaysRetryPolicy:允许无限重试,直到成功,此方式逻辑不当会导致死循环;

  • SimpleRetryPolicy:固定次数重试策略,默认重试最大次数为3次,RetryTemplate默认使用的策略;

  • TimeoutRetryPolicy:超时时间重试策略,默认超时时间为1秒,在指定的超时时间内允许重试;

  • CircuitBreakerRetryPolicy:有熔断功能的重试策略,需设置3个参数openTimeout、resetTimeout和delegate,稍后详细介绍该策略;

  • CompositeRetryPolicy:组合重试策略,有两种组合方式,乐观组合重试策略是指只要有一个策略允许重试即可以,悲观组合重试策略是指只要有一个策略不允许重试即可以,但不管哪种组合方式,组合中的每一个策略都会执行。


重试时的退避策略是什么?是立即重试还是等待一段时间后重试,比如是网络错误,立即重试将导致立即失败,最好等待一小段时间后重试,还要防止很多服务同时重试导致DDos。

BackOffPolicy 提供了如下策略实现:

  • NoBackOffPolicy:无退避算法策略,即当重试时是立即重试;

  • FixedBackOffPolicy:固定时间的退避策略,需设置参数sleeper和backOffPeriod,sleeper指定等待策略,默认是Thread.sleep,即线程休眠,backOffPeriod指定休眠时间,默认1秒;

  • UniformRandomBackOffPolicy:随机时间退避策略,需设置sleeper、minBackOffPeriod和maxBackOffPeriod,该策略在[minBackOffPeriod,maxBackOffPeriod之间取一个随机休眠时间,minBackOffPeriod默认500毫秒,maxBackOffPeriod默认1500毫秒;

  • ExponentialBackOffPolicy:指数退避策略,需设置参数sleeper、initialInterval、maxInterval和multiplier,initialInterval指定初始休眠时间,默认100毫秒,maxInterval指定最大休眠时间,默认30秒,multiplier指定乘数,即下一次休眠时间为当前休眠时间*multiplier;

  • ExponentialRandomBackOffPolicy:随机指数退避策略,引入随机乘数,之前说过固定乘数可能会引起很多服务同时重试导致DDos,使用随机休眠时间来避免这种情况。


到此基本的概念就讲完了。接下来先看下RetryTemplate主要流程实现:

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
      RecoveryCallback<
T> recoveryCallback, RetryState state)
     
throws E, ExhaustedRetryException {
   //重试策略
   RetryPolicy retryPolicy = this.retryPolicy;

   //退避策略
   BackOffPolicy backOffPolicy = this.backOffPolicy;
   //重试上下文,当前重试次数等都记录在上下文中
   RetryContext context = open(retryPolicy, state);
  
try {

      //拦截器模式,执行RetryListener#open
      boolean running = doOpenInterceptors(retryCallback, context);

      //判断是否可以重试执行
      while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
        
try {//执行RetryCallback回调
            return retryCallback.doWithRetry(context);
         }
catch (Throwable e) {//异常时,要进行下一次重试准备

            //遇到异常后,注册该异常的失败次数
            registerThrowable(retryPolicy, state, context, e);

            //执行RetryListener#onError
            doOnErrorInterceptors(retryCallback, context, e);

            //如果可以重试,执行退避算法,比如休眠一小段时间后再重试
            if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
               backOffPolicy.backOff(backOffContext);
            }

            //state != null && state.rollbackFor(context.getLastThrowable())

            //在有状态重试时,如果是需要执行回滚操作的异常,则立即抛出异常
            if (shouldRethrow(retryPolicy, context, state)) {
              
throw RetryTemplate.<E>wrapIfNecessary(e);
            }
         }

         //如果是有状态重试,且有GLOBAL_STATE属性,则立即跳出重试终止;当抛出的异常是非需要执行回滚操作的异常时,才会执行到此处,CircuitBreakerRetryPolicy会在此跳出循环;
         if (state != null && context.hasAttribute(GLOBAL_STATE)) {
           
break;
         }
      }

      //重试失败后,如果有RecoveryCallback,则执行此回调,否则抛出异常
      return handleRetryExhausted(recoveryCallback, context, state);
   }
catch (Throwable e) {
     
throw RetryTemplate.<E>wrapIfNecessary(e);
   }
finally {

      //清理环境
      close(retryPolicy, context, state, lastException == null || exhausted);

      //执行RetryListener#close,比如统计重试信息
      doCloseInterceptors(retryCallback, context, lastException);
   }
}

有状态or无状态

无状态重试,是在一个循环中执行完重试策略,即重试上下文保持在一个线程上下文中,在一次调用中进行完整的重试策略判断。


非常简单的情况,如远程调用某个查询方法时是最常见的无状态重试。

RetryTemplate template = new RetryTemplate();
//重试策略:次数重试策略
RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
template.setRetryPolicy(retryPolicy);
//退避策略:指数退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(
100);
backOffPolicy.setMaxInterval(
3000);
backOffPolicy.setMultiplier(
2);
backOffPolicy.setSleeper(
new ThreadWaitSleeper());
template.setBackOffPolicy(backOffPolicy);

//当重试失败后,抛出异常
String result = template.execute(new RetryCallback() {
   
@Override
   
public String doWithRetry(RetryContext context) throws RuntimeException {
       
throw new RuntimeException("timeout");
    }
});
//当重试失败后,执行RecoveryCallback
String result = template.execute(new RetryCallback() {
   
@Override
   
public String doWithRetry(RetryContext context) throws RuntimeException {
        System.
out.println("retry count:" + context.getRetryCount());
       
throw new RuntimeException("timeout");
    }
},
new RecoveryCallback() {
   
@Override
   
public String recover(RetryContext context) throws Exception {
       
return "default";
    }
});

有状态重试,有两种情况需要使用有状态重试,事务操作需要回滚或者熔断器模式。

事务操作需要回滚场景时,当整个操作中抛出的是数据库异常DataAccessException,则不能进行重试需要回滚,而抛出其他异常则可以进行重试,可以通过RetryState实现:

// 当前状态的名称,当把状态放入缓存时,通过该key查询获取

Object key = "mykey";
//是否每次都重新生成上下文还是从缓存中查询,即全局模式(如熔断器策略时从缓存中查询)
boolean isForceRefresh = true;
//对DataAccessException进行回滚
BinaryExceptionClassifier rollbackClassifier =
       
new BinaryExceptionClassifier(Collections.extends Throwable>>singleton(DataAccessException.class));
RetryState state =
new DefaultRetryState(key, isForceRefresh, rollbackClassifier);

String result = template.execute(
new RetryCallback() {
   
@Override
   
public String doWithRetry(RetryContext context) throws RuntimeException {
        System.
out.println("retry count:" + context.getRetryCount());
       
throw new TypeMismatchDataAccessException("");
    }
},
new RecoveryCallback() {
   
@Override
   
public String recover(RetryContext context) throws Exception {
       
return "default";
    }
}, state);

RetryTemplate中在有状态重试时,回滚场景时直接抛出异常处理代码:

//state != null && state.rollbackFor(context.getLastThrowable())

// 在有状态重试时,如果是需要执行回滚操作的异常,则立即抛出异常
if (shouldRethrow(retryPolicy,context, state)) {
throw RetryTemplate.< E > wrapIfNecessary (e);
}

熔断器场景。在有状态重试时,且是全局模式,不在当前循环中处理重试,而是全局重试模式(不是线程上下文),如熔断器策略时测试代码如下所示。

RetryTemplate template = new RetryTemplate();
CircuitBreakerRetryPolicy retryPolicy =
       
new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(3));
retryPolicy.setOpenTimeout(
5000);
retryPolicy.setResetTimeout(
20000);
template.setRetryPolicy(retryPolicy);

for (int i = 0; i < 10; i++) {
   
try {
        Object key =
"circuit";
       
boolean isForceRefresh = false;
        RetryState state =
new DefaultRetryState(key, isForceRefresh);
        String result = template.execute(
new RetryCallback() {
           
@Override
           
public String doWithRetry(RetryContext context) throws RuntimeException {
                System.
out.println("retry count:" + context.getRetryCount());
               
throw new RuntimeException("timeout");
            }
        },
new RecoveryCallback() {
           
@Override
           
public String recover(RetryContext context) throws Exception {
               
return "default";
            }
        }, state);
        System.
out.println(result);
    }
catch (Exception e) {
        System.
out.println(e);
    }
}

为什么说是全局模式呢?我们配置了isForceRefresh为false,则在获取上下文时是根据key “circuit”从缓存中获取,从而拿到同一个上下文。

Object key = "circuit" ;
boolean isForceRefresh = false ;
RetryState state =
new DefaultRetryState(key,isForceRefresh);

如下RetryTemplate代码说明在有状态模式下,不会在循环中进行重试。

if (state != null && context.hasAttribute(GLOBAL_STATE)) {
  
break;
}


熔断器策略配置代码,CircuitBreakerRetryPolicy需要配置三个参数:

  • delegate:是真正判断是否重试的策略,当重试失败时,则执行熔断策略;

  • openTimeout:openWindow,配置熔断器电路打开的超时时间,当超过openTimeout之后熔断器电路变成半打开状态(主要有一次重试成功,则闭合电路);

  • resetTimeout:timeout,配置重置熔断器重新闭合的超时时间。


判断熔断器电路是否打开的代码:

public boolean isOpen() {
  
long time = System.currentTimeMillis() - this.start;
  
boolean retryable = this.policy.canRetry(this.context);
  






请到「今天看啥」查看全文