专栏名称: 互联网后端架构
主要介绍Java后端架构。其中也会掺杂一些前端、GO、Python、Linux,目标:全栈工程师!---好像很牛叉的样子 ^-^
目录
相关文章推荐
51好读  ›  专栏  ›  互联网后端架构

RabbitMQ保障消息 100% 投递成功方案

互联网后端架构  · 公众号  · 架构  · 2019-05-15 21:00

正文

请到「今天看啥」查看全文


扫描二维码 加入架构集结群


对技术感兴趣的同学可进群(备注:Java)

什么是生产端的可靠性投递

  • 保障消息的成功发出

  • 保障MQ节点的成功接收

  • 发送端收到MQ节点(Broker) 确认应答

  • 完善的消息补偿机制

如果想保障消息百分百投递成功,只做到前三步不一定能够保障。有些时候或者说有些极端情况,比如生产端在投递消息时可能就失败了,或者说生产端投递了消息,MQ也收到了,MQ在返回确认应答时,由于网络闪断导致生产端没有收到应答,此时这条消息就不知道投递成功了还是失败了,所以针对这些情况我们需要做一些补偿机制。

互联网大厂的解决方案

  1. 消息落库,对消息状态进行打标

  2. 消息的延迟投递,做二次确认,回调检查

具体使用哪种要根据业务场景和并发量、数据量大小来决定

消息信息落库,对消息状态进行打标


  1. 进行数据的入库
    比如我们要发送一条订单消息,首先把业务数据也就是订单信息进行入库,然后生成一条消息,把消息也进行入库,这条消息应该包含消息状态属性,并设置初始值比如为0,表示消息创建成功正在发送中,这种方式缺陷在于我们要对数据库进行持久化两次。

  2. 首先要保证第一步消息都存储成功了,没有出现任何异常情况,然后生产端再进行消息发送。如果失败了就进行快速失败机制。

  3. MQ把消息收到的结果应答 (confirm) 给生产端

  4. 生产端有一个 Confirm Listener ,去异步的监听 Broker 回送的响应,从而判断消息是否投递成功,如果成功,去数据库查询该消息,并将消息状态更新为1,表示消息投递成功。

假设第二步OK了,在第三步回送响应时,网络突然出现了闪断,导致生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了。

  1. 此时我们需要设置一个规则,比如说消息在入库时候设置一个临界值timeout,5分钟之后如果还是0的状态那就需要把消息抽取出来。这里我们使用的是分布式定时任务,去定时抓取DB中距离消息创建时间超过5分钟的且状态为0的消息。

  2. 把抓取出来的消息进行重新投递 (Retry Send) ,也就是从第二步开始继续往下走

  3. 当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败。

针对这种情况如何去做补偿呢,可以有一个补偿系统去查询这些最终失败的消息,然后给出失败的原因,当然这些可能都需要人工去操作。

第一种可靠性投递,在高并发的场景下是否适合?

对于第一种方案,我们需要做两次数据库的持久化操作,在高并发场景下显然数据库存在着性能瓶颈。其实在我们的核心链路中只需要对业务进行入库就可以了,消息就没必要先入库了,我们可以做消息的延迟投递,做二次确认,回调检查。

当然这种方案不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做都可以。
使用第二种方式主要目的是为了减少数据库操作,提高并发量。

消息的延迟投递,做二次确认,回调检查


Upstream Service 上游服务也就是生产端, Downstream service 下游服务也就是消费端, Callback service 就是回调服务。

  1. 先将业务消息进行入库,然后生产端将消息发送出去,注意一定是等数据库操作完成以后再去发送消息。

  2. 在发送消息之后,紧接着生产端再次发送一条消息 (Second Send Delay Check) ,即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递。

  3. 消费端去监听指定队列,将收到的消息进行处理。

  4. 处理完成之后,发送一个 confirm 消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中。

  5. 上面的 Callback service 是一个单独的服务,其实它扮演了第一种方案的存储消息的DB角色,它通过MQ去监听下游服务发送的 confirm 消息,如果 Callback service 收到 confirm 消息,那么就对消息做持久化存储,即将消息持久化到DB中。

  6. 5分钟之后延迟消息发送到MQ了,然后 Callback service 还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么 Callback service 就需要主动发起RPC通信给上游服务,告诉它延迟检查的这条消息我没有找到,你需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去。

这么做的目的是少做了一次DB的存储,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量。所以能节省数据库的操作就尽量节省,可以异步的进行补偿。

其实在主流程里面是没有这个Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其他的步骤都是一个补偿机制。

第二种方案也是互联网大厂更为经典和主流的解决方案。


点好看的同学,今年加薪^-^↓








请到「今天看啥」查看全文