正文
1. 概述
在消息代理(如RabbitMQ),由于发送消息的AMQP协议方法不能保证消息一定到达对方或被成功处理,所以发布者和消费者都需要一个交付和处理确认的机制。 在上一篇文章
中间件系列十 RabbitMQ之消费者端的消息确认机制
,我们介绍了消费端的消息确认机制。本篇我们介绍发送端的消息确认机制
RabbitMQ在收到消息后,还需要有一段时间才能将消息存入磁盘之中。RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。为了解决这个问题RabbitMQ引入发送端消息确认机制,主要通过事务和publisher Confirm机制。
本篇的主要内容如下:
1. 通过AMQP事务和publisher Confirm机制保证发送端的消息不丢失
2. 演示RabbitMQ中的事务用法,并通过抓包分析协议,最后说明事务的事务的优点和缺点
3. 演示RabbitMQ中Publisher Confirm模式的用法,并对以下3种方式通过抓包分析协议并说明其缺点和优点
- 同步方式的发送端的单个Publisher Confirm模式
- 同步方式的发送端的批量Publisher Confirm机制
- 异步方式的发送端的Publisher Confirm机制
4. 其他发送者端的注意事项: 否定确认、消息确认的时机、持久化消息的确认延迟、发送顺序
2. 事务机制保证消息不丢失
RabbitMQ支持事务(transaction),通过调用tx.select方法开启事务模式。当开启了事务模式后,只有当一个消息被所有的镜像队列保存完毕后,RabbitMQ才会调用tx.commit-ok返回给客户端。
2.1. 代码
工程名称:
rabbitmq
发送端的关键代码
TransactionalSend
:通过channel.txSelect()开启事务,发送消息,最后执行channel.txCommit()提交事务。如果发送失败,则使用channel.txRollback()回滚事务
// 开启事务
channel.txSelect();
// 发送消息
while(num-- > 0) {
// 发送一个持久化消息到特定的交换机
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [TransactionalSend] Sent + [" + num + "] '" + message + "'");
}
// 不注解下面语句,可以进入channel.txRollback()逻辑
// if(true){
// throw new IOException("consumer channel.txRollback() ");
// }
// 提交事务
channel.txCommit();
}catch(IOException e){
e.printStackTrace();
// 回滚事务
channel.txRollback();
}
测试代码:使用
PublisherConfirmTest
的方法进行测试
2.2. 使用wireshark抓包分析正常的事务提交流程
使用wireshark截获以上测试时产生的包:
所有的包:
全部包如下:
496-497帧
:开启事务
496帧
:Tx.Select 客户端向RabbitMQ要求开启事务
497帧
:
Tx.Select-Ok 服务端处理事务请求并返回成功结果
498帧
:发送消息,和普通发送消息同,这里略
499-502帧
:提交事务
499帧
:Tx.Commit 客户端向RabbitMQ要求开启事务
502帧
:Tx.Commit-Ok
服务端处理事务提交并返回成功结果
2.3. 使用wireshark抓包分析执行事务失败,自动回滚
使用wireshark截获以上测试时产生的包:
如果事务执行失败,则会执行tx.rollback执行回滚操作
由于包信息比较简单,这里略,只上截图
2.4. 事务的优点和缺点
事务的实现简单,能够保证消息正确到达RabbitMQ,但是它的效率低,只有一般发送消息的效率的1/250
3. publisher confirms机制保证消息不丢失
在AMPQ-0-9-1中,有定义从消费者到RabbitMQ的处理确认机制。但是没有定义消息代理到生产者的确认机制,在RabbitMQ中对此进行扩展,叫做publisher confirms机制
在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务。但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍。为了解决这个问题,RabbitMQ引入的Publisher Confirms机制,它是模仿AMQP协议中消费者消息确认机制
生产者端可以通过confirm.select来启用方法Publisher Confirms机制,RabbitMQ服务端根据是否设置no-wait的值,返回confirm.select-ok。一旦在通道上使用confirm.select方法,就认为它处于Publisher Confirms模式。事务通道不能进入Publisher Confirms模式
,一旦通道处于Publisher Confirms模式,不能开启事务。即事务和Publisher Confirms模式只能二选一
。
Publisher Confirm模式有以下几种使用方式:
-
同步方式的发送端的单个Publisher Confirm模式
-
同步方式的发送端的批量Publisher Confirm机制
-
异步方式的发送端的Publisher Confirm机制
4. 同步方式的发送端的Publisher Confirm模式
4.1. 测试代码
测试工程名称:
rabbitmq
关键代码:
SimpleConfirmSend
这个代码实现发送者端发送一个持久化消息到特定的交换机,然后等待服务端返回Basic.Ack后,才执行发送消息
while(num-- > 0) {
// 发送一个持久化消息到特定的交换机
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [SimpleConfirmSend] Sent '" + message + "'");
// 等待服务端返回Basic.Ack后,才执行下一个循环
if(!channel.waitForConfirms()){
System.out.println("message haven't arrived broker");
// 在这里可以对发送失败的记录进行处理:如重发
}
}
4.2. 使用wireshark抓包单个确认消息确认
这里我们发送端只发送一个消息,并进行抓包,分析抓到的包,详细如下:
和普通发送最大的不同是,在执行发送消息前执行Confirm.Select,RabbitMQ在消息已经收到并处理完毕(如果消息需要,则持久化消息后,才返回Basic.Ok; 如果对应消息的镜像队列,则队列完全同步后,才返回Basic.Ok。总之,必须保证消息不会因为RabbitMQ异常丢失)后返回Basic.Ok给客户端
110 -111 帧
:
110帧 Confirm.Select: 客户端请求开启Confirm模式
111帧 Confirm.Select-Ok:服务端执行完毕,并返回成功结果
112 帧
:发送消息
这里我们发现,这个包里没有Delivery-Tag值,但是后面RabbitMQ回送的消息包里有Delivery-Tag。如果我们一次发送大量的消息,即使RabbitMQ对收到前N条消息进行确认,发送者端也不知道RabbitMQ是不是收到发送者端最先发送的N条记录,因为消息到达RabbbitMQ是无序的。只有RabbitMQ对本次发送的所有记录进行确认,我们才知道消息全部发送成功。如果不小心丢失一条消息,我们是不知道那条记录丢失的,唯一的办法是本次所有记录重发