专栏名称: hryou0922
目录
相关文章推荐
清华经管学院职业发展中心  ·  招聘 | 嘉实基金2025届校招春季补录正式开启! ·  昨天  
LeaderCareer  ·  2025年英国大学QS学术声誉排名!UCL超 ... ·  2 天前  
人力葵花  ·  人才测评全流程.doc ·  2 天前  
51好读  ›  专栏  ›  hryou0922

中间件系列十一 RabbitMQ之发送者端的消息确认机制

hryou0922  · 掘金  ·  · 2018-03-13 02:06

正文

1. 概述

在消息代理(如RabbitMQ),由于发送消息的AMQP协议方法不能保证消息一定到达对方或被成功处理,所以发布者和消费者都需要一个交付和处理确认的机制。 在上一篇文章 中间件系列十 RabbitMQ之消费者端的消息确认机制 ,我们介绍了消费端的消息确认机制。本篇我们介绍发送端的消息确认机制

RabbitMQ在收到消息后,还需要有一段时间才能将消息存入磁盘之中。RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。为了解决这个问题RabbitMQ引入发送端消息确认机制,主要通过事务和publisher Confirm机制。

本篇的主要内容如下:
1. 通过AMQP事务和publisher Confirm机制保证发送端的消息不丢失
2. 演示RabbitMQ中的事务用法,并通过抓包分析协议,最后说明事务的事务的优点和缺点
3. 演示RabbitMQ中Publisher Confirm模式的用法,并对以下3种方式通过抓包分析协议并说明其缺点和优点

-  同步方式的发送端的单个Publisher Confirm模式       
-  同步方式的发送端的批量Publisher Confirm机制 
-  异步方式的发送端的Publisher Confirm机制 

4. 其他发送者端的注意事项: 否定确认、消息确认的时机、持久化消息的确认延迟、发送顺序

2. 事务机制保证消息不丢失

RabbitMQ支持事务(transaction),通过调用tx.select方法开启事务模式。当开启了事务模式后,只有当一个消息被所有的镜像队列保存完毕后,RabbitMQ才会调用tx.commit-ok返回给客户端。

2.1. 代码

工程名称: rabbitmq
发送端的关键代码 TransactionalSend :通过channel.txSelect()开启事务,发送消息,最后执行channel.txCommit()提交事务。如果发送失败,则使用channel.txRollback()回滚事务

// 开启事务
channel.txSelect();
// 发送消息
while(num-- > 0) {
    // 发送一个持久化消息到特定的交换机
    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    System.out.println(" [TransactionalSend] Sent + [" + num + "] '" + message + "'");
}

// 不注解下面语句,可以进入channel.txRollback()逻辑
//                if(true){
//                    throw new IOException("consumer channel.txRollback() ");
//                }
// 提交事务
channel.txCommit();
}catch(IOException e){
e.printStackTrace();
// 回滚事务
channel.txRollback();
}

测试代码:使用 PublisherConfirmTest 的方法进行测试

2.2. 使用wireshark抓包分析正常的事务提交流程

使用wireshark截获以上测试时产生的包:
所有的包:
这里写图片描述

全部包如下:
496-497帧 :开启事务
496帧 :Tx.Select 客户端向RabbitMQ要求开启事务
这里写图片描述
497帧 : Tx.Select-Ok 服务端处理事务请求并返回成功结果
这里写图片描述
498帧 :发送消息,和普通发送消息同,这里略

499-502帧 :提交事务
499帧 :Tx.Commit 客户端向RabbitMQ要求开启事务
这里写图片描述
502帧 :Tx.Commit-Ok 服务端处理事务提交并返回成功结果
这里写图片描述

2.3. 使用wireshark抓包分析执行事务失败,自动回滚

使用wireshark截获以上测试时产生的包:
如果事务执行失败,则会执行tx.rollback执行回滚操作
由于包信息比较简单,这里略,只上截图
这里写图片描述

2.4. 事务的优点和缺点

事务的实现简单,能够保证消息正确到达RabbitMQ,但是它的效率低,只有一般发送消息的效率的1/250

3. publisher confirms机制保证消息不丢失

在AMPQ-0-9-1中,有定义从消费者到RabbitMQ的处理确认机制。但是没有定义消息代理到生产者的确认机制,在RabbitMQ中对此进行扩展,叫做publisher confirms机制

在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务。但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍。为了解决这个问题,RabbitMQ引入的Publisher Confirms机制,它是模仿AMQP协议中消费者消息确认机制

生产者端可以通过confirm.select来启用方法Publisher Confirms机制,RabbitMQ服务端根据是否设置no-wait的值,返回confirm.select-ok。一旦在通道上使用confirm.select方法,就认为它处于Publisher Confirms模式。事务通道不能进入Publisher Confirms模式 ,一旦通道处于Publisher Confirms模式,不能开启事务。即事务和Publisher Confirms模式只能二选一

Publisher Confirm模式有以下几种使用方式:

  1. 同步方式的发送端的单个Publisher Confirm模式
  2. 同步方式的发送端的批量Publisher Confirm机制
  3. 异步方式的发送端的Publisher Confirm机制

4. 同步方式的发送端的Publisher Confirm模式

4.1. 测试代码

测试工程名称: rabbitmq

关键代码: SimpleConfirmSend
这个代码实现发送者端发送一个持久化消息到特定的交换机,然后等待服务端返回Basic.Ack后,才执行发送消息

while(num-- > 0) {
    // 发送一个持久化消息到特定的交换机
    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    System.out.println(" [SimpleConfirmSend] Sent '" + message + "'");
    // 等待服务端返回Basic.Ack后,才执行下一个循环
    if(!channel.waitForConfirms()){
        System.out.println("message haven't arrived broker");
          // 在这里可以对发送失败的记录进行处理:如重发
    }
}

4.2. 使用wireshark抓包单个确认消息确认

这里我们发送端只发送一个消息,并进行抓包,分析抓到的包,详细如下:
这里写图片描述

和普通发送最大的不同是,在执行发送消息前执行Confirm.Select,RabbitMQ在消息已经收到并处理完毕(如果消息需要,则持久化消息后,才返回Basic.Ok; 如果对应消息的镜像队列,则队列完全同步后,才返回Basic.Ok。总之,必须保证消息不会因为RabbitMQ异常丢失)后返回Basic.Ok给客户端

110 -111 帧
110帧 Confirm.Select: 客户端请求开启Confirm模式
这里写图片描述
111帧 Confirm.Select-Ok:服务端执行完毕,并返回成功结果
这里写图片描述

112 帧 :发送消息
这里我们发现,这个包里没有Delivery-Tag值,但是后面RabbitMQ回送的消息包里有Delivery-Tag。如果我们一次发送大量的消息,即使RabbitMQ对收到前N条消息进行确认,发送者端也不知道RabbitMQ是不是收到发送者端最先发送的N条记录,因为消息到达RabbbitMQ是无序的。只有RabbitMQ对本次发送的所有记录进行确认,我们才知道消息全部发送成功。如果不小心丢失一条消息,我们是不知道那条记录丢失的,唯一的办法是本次所有记录重发







请到「今天看啥」查看全文