专栏名称: 纯洁的微笑
分享微服务实践与Java技术干货、偶尔讲讲故事。在人工智能的时代,一起学习微服务架构演进和大数据治理。
目录
相关文章推荐
中国能建  ·  闻令而动!中国能建紧急驰援宜宾筠连 ·  昨天  
中国电信  ·  突发山体滑坡!紧急驰援! ·  2 天前  
爱鸢都  ·  最新!山东组建一家新国企 ·  2 天前  
爱鸢都  ·  最新!山东组建一家新国企 ·  2 天前  
中国电信  ·  基建狂魔,挺进=͟͟͞͞中=͟͟͞͞国=͟͟ ... ·  3 天前  
葛洲坝国际  ·  新的Flag已上线~ ·  3 天前  
葛洲坝国际  ·  新的Flag已上线~ ·  3 天前  
51好读  ›  专栏  ›  纯洁的微笑

【真实案例】消息消费失败如何处理?

纯洁的微笑  · 公众号  ·  · 2021-03-15 12:12

正文

每天早上 七点三十 ,准时推送干货

一、介绍

在介绍消息中间件 MQ 之前,我们先来简单的了解一下,为何要引用消息中间件。

例如,在电商平台中,常见的用户下单,会经历以下几个流程。

当用户下单时,创建完订单之后,会调用第三方支付平台,对用户的账户金额进行扣款,如果平台支付扣款成功,会将结果通知到对应的业务系统,接着业务系统会更新订单状态,同时调用仓库接口,进行减库存,通知物流进行发货!

试想一下,从订单状态更新、到扣减库存、通知物流发货都在一个方法内 同步完成 ,假如用户支付成功、订单状态更新也成功,但是在扣减库存或者通知物流发货步骤失败了,那么就会造成一个问题,用户已经支付成功了,只是在仓库扣减库存方面失败,从而导致整个交易失败!

一单失败,老板可以假装看不见,但是如果上千个单子都因此失败,那么因系统造成的业务损失,将是巨大的,老板可能坐不住了!

因此,针对这种业务场景,架构师们引入了 异步通信 技术方案,从而保证服务的高可用,大体流程如下:

当订单系统收到支付平台发送的扣款结果之后,会将订单消息发送到 MQ 消息中间件,同时也会更新订单状态。

在另一端,由仓库系统来异步监听订单系统发送的消息,当收到订单消息之后,再操作扣减库存、通知物流公司发货等服务!

在优化后的流程下,即使扣减库存服务失败,也不会影响用户交易。

正如《人月神话》中所说的,软件工程,没有银弹

当引入了 MQ 消息中间件之后,同样也会带来另一个问题,假如 MQ 消息中间件突然宕机了,导致消息无法发送出去,那仓库系统就无法接受到订单消息,进而也无法发货!

针对这个问题,业界主流的解决办法是采用 集群部署,一主多从模式 ,从而实现服务的高可用,即使一台机器突然宕机了,也依然能保证服务可用,在服务器故障期间,通过运维手段,将服务重新启动,之后服务依然能正常运行!

但是还有另一个问题,假如仓库系统已经收到订单消息了,但是业务处理异常,或者服务器异常,导致当前商品库存并没有扣减,也没有发货!

这个时候又改如何处理呢?

今天我们所要介绍的正是这种场景,假如消息消费失败,我们应该如何处理?

二、解决方案

针对消息消费失败的场景,我们一般会通过如下方式进行处理:

  • 当消息消费失败时,会对消息进行重新推送
  • 如果重试次数超过最大值,会将异常消息存储到数据库,然后人工介入排查问题,进行手工重试

当消息在客户端消费失败时,我们会将异常的消息加入到一个消息重试对象中,同时设置最大重试次数,并将消息重新推送到 MQ 消息中间件里,当重试次数超过最大值时,会将异常的消息存储到 MongoDB 数据库中,方便后续查询异常的信息。

基于以上系统模型,我们可以编写一个公共重试组件,话不多说,直接干!

三、代码实践

本次补偿服务采用 rabbitmq 消息中间件进行处理,其他消息中间件处理思路也类似!

3.1、创建一个消息重试实体类

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 原始消息body
     */

    private String bodyMsg;

    /**
     * 消息来源ID
     */

    private String sourceId;

    /**
     * 消息来源描述
     */

    private String sourceDesc;

    /**
     * 交换器
     */

    private String exchangeName;

    /**
     * 路由键
     */

    private String routingKey;

    /**
     * 队列
     */

    private String queueName;

    /**
     * 状态,1:初始化,2:成功,3:失败
     */

    private Integer status = 1;

    /**
     * 最大重试次数
     */

    private Integer maxTryCount = 3;

    /**
     * 当前重试次数
     */

    private Integer currentRetryCount = 0;

    /**
     * 重试时间间隔(毫秒)
     */

    private Long retryIntervalTime = 0L;

    /**
     * 任务失败信息
     */

    private String errorMsg;

    /**
     * 创建时间
     */

    private Date createTime;

    @Override
    public String toString() {
        return "MessageRetryDTO{" +
                "bodyMsg='" + bodyMsg + '\'' +
                ", sourceId='" + sourceId + '\'' +
                ", sourceDesc='" + sourceDesc + '\'' +
                ", exchangeName='" + exchangeName + '\'' +
                ", routingKey='" + routingKey + '\'' +
                ", queueName='" + queueName + '\'' +
                ", status=" + status +
                ", maxTryCount=" + maxTryCount +
                ", currentRetryCount=" + currentRetryCount +
                ", retryIntervalTime=" + retryIntervalTime +
                ", errorMsg='" + errorMsg + '\'' +
                ", createTime=" + createTime +
                '}';
    }

    /**
     * 检查重试次数是否超过最大值
     *
     * @return
     */

    public boolean checkRetryCount() {
        retryCountCalculate();
        //检查重试次数是否超过最大值
        if (this.currentRetryCount this.maxTryCount) {
            return true;
        }
        return false;
    }

    /**
     * 重新计算重试次数
     */

    private void retryCountCalculate() {
        this.currentRetryCount = this.currentRetryCount + 1;
    }

}

3.2、编写服务重试抽象类

public abstract class CommonMessageRetryService {

    private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MongoTemplate mongoTemplate;


    /**
     * 初始化消息
     *
     * @param message
     */

    public void initMessage(Message message) {
        log.info("{} 收到消息: {},业务数据:{}"this.getClass().getName(), message.toString(), new String(message.getBody()));
        try {
            //封装消息
            MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
            if (log.isInfoEnabled()) {
                log.info("反序列化消息:{}", messageRetryDto.toString());
            }
            prepareAction(messageRetryDto);
        } catch (Exception e) {
            log.warn("处理消息异常,错误信息:", e);
        }
    }

    /**
     * 准备执行
     *
     * @param retryDto
     */

    protected void prepareAction(MessageRetryDTO retryDto) {
        try {
            execute(retryDto);
            doSuccessCallBack(retryDto);
        } catch (Exception e) {
            log.error("当前任务执行异常,业务数据:" + retryDto.toString(), e);
            //执行失败,计算是否还需要继续重试
            if (retryDto.checkRetryCount()) {
                if (log.isInfoEnabled()) {
                    log.info("重试消息:{}", retryDto.toString());
                }
                retrySend(retryDto);
            } else {
                if (log.isWarnEnabled()) {
                    log.warn("当前任务重试次数已经到达最大次数,业务数据:" + retryDto.toString(), e);
                }
                doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
            }
        }
    }

    /**
     * 任务执行成功,回调服务(根据需要进行重写)
     *
     * @param messageRetryDto
     */

    private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
        try {
            successCallback(messageRetryDto);
        } catch (Exception e) {
            log.warn("执行成功回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
        }
    }

    /**
     * 任务执行失败,回调服务(根据需要进行重写)
     *
     * @param messageRetryDto
     */

    private void doFailCallBack(MessageRetryDTO messageRetryDto) {
        try {
            saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
            failCallback(messageRetryDto);
        } catch (Exception e) {
            log.warn("执行失败回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
        }
    }

    /**
     * 执行任务
     *
     * @param messageRetryDto
     */

    protected abstract void execute(MessageRetryDTO messageRetryDto);

    /**
     * 成功回调
     *
     * @param messageRetryDto
     */

    protected abstract void successCallback(MessageRetryDTO messageRetryDto);

    /**
     * 失败回调
     *
     * @param messageRetryDto
     */

    protected abstract void failCallback(MessageRetryDTO messageRetryDto);

    /**
     * 构建消息补偿实体
     * @param message
     * @return
     */

    private MessageRetryDTO buildMessageRetryInfo(Message message){
        //如果头部包含补偿消息实体,直接返回
        Map messageHeaders = message.getMessageProperties().getHeaders();
        if(messageHeaders.containsKey("message_retry_info")){
            Object retryMsg = messageHeaders.get("message_retry_info");
            if(Objects.nonNull(retryMsg)){
                return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
            }
        }
        //自动将业务消息加入补偿实体
        MessageRetryDTO messageRetryDto = new MessageRetryDTO();
        messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
        messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
        messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
        messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
        messageRetryDto.setCreateTime(new Date());
        return messageRetryDto;
    }

    /**
     * 异常消息重新入库
     * @param retryDto
     */

    private void retrySend(MessageRetryDTO retryDto){
        //将补偿消息实体放入头部,原始消息内容保持不变
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
        Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
    }



    /**
     * 将异常消息存储到mongodb中
     * @param retryDto
     */

    private void saveMessageRetryInfo(MessageRetryDTO retryDto){
        try {
            mongoTemplate.save(retryDto, "message_retry_info");
        } catch (Exception e){
            log.error("将异常消息存储到mongodb失败,消息数据:" + retryDto.toString(), e);
        }
    }
}

3.3、编写监听服务类

在消费端应用的时候,也非常简单,例如,针对扣减库存操作,我们可以通过如下方式进行处理!

@Component
public class OrderServiceListener extends CommonMessageRetryService {

    private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);

    /**
     * 监听订单系统下单成功消息
     * @param message
     */

    @RabbitListener(queues = "mq.order.add")
    public void consume(Message message) {
        log.info("收到订单下单成功消息: {}", message.toString());
        super.initMessage(message);
    }


    @Override
    protected void execute(MessageRetryDTO messageRetryDto) {
        //调用扣减库存服务,将业务异常抛出来
    }

    @Override
    protected






请到「今天看啥」查看全文