正文
概述
在RabbitMQ中,即使将queue,exchange, message等都设置了持久化之后,还是不能保证100%保证数据不丢失了。为了实现消息不丢失,我们需要从Consumer端和Productor端同时进行处理。本篇文章先介绍Consumer端,在AMPQ-0-9-1中有定义从消费者到RabbitMQ的消息确认机制,通过此机制可以保证消息能够从RabbitMQ正确到达消费者端。本文介绍在RabbitMQ中如何实现消费者端的消息确认机制,包括如下内容
-
1 消费者的实现机制
-
2 消费者端的代码实现
-
3 使用wireshark对消息确认的关键包进行转包,并进行分析
-
4 在使用消息确认机制的注意点
消费者端投递确认机制
在消费者端确认的方式
RabbitMQ中的两种确认方式:
批量手动投递确认
消息手动除了一次确认一条,也可以一次确认多条。为了减少网络流量,可以批量手动确认。在应答时,设置basic.nack的multiple 字段为true,可以同时对delivery_tag和比delivery_tag值小的投递消息进行确认 例如,假设在通道上没有确认消息的delivery_tag是5,6,7和8,当basic.nack中delivery_tag被设置为8并且multiple 被设置为true时,方法执行成功后,从5到8的所有消息将被确认。 如果multiple 设置为false,那么交货5,6和7仍然是未确认的。
当消费者向RabbitMQ注册后,RabbitMQ使用basic.deliver向消费者投递消息时,消息体上会带上delivery tag,这个值会唯一标识本次投递,在同一通道上,此值是唯一的。delivery tag值有64位长度,值从1开始,每发送一次消息值递增1,最大值为9223372036854775807。消费者端在应答消息时,带上此参数,告诉RabbitMQ某次投递已经正确应答。
消费者端消息投递确认代码
工程:
工程: 代码路径:
消费者代码:
代码:关键点
a. channel.basicConsume设置接收非自动确认
b. 在处理完消息后,调用channel.basicAck进行手动消息确认
// 默认消费者实现
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [ConsumerConfirmRecv] Received '" + message + "'");
// 消息正向确认
channel.basicAck(envelope.getDeliveryTag(),true);
// 消息否定确认: 如果设置multiple=false,requeue值启作用,如果设置multiple=true,则requeue无论设置什么值,后台统一处理成true
// channel.basicNack(envelope.getDeliveryTag(),false, false);
}
};
// 接收消息:设置非自动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
消费者端抓包分析
执行以上的代码,并进行抓包分析其中关键包。和一般的消息包不同点时,一般的消费者端的包分析见
中间件系列九 RabbmtiMQ 通过wireshark抓包学习AMQP协议
正向确认-Basic.ack
和一般的消息包不同点时,一般的消费者端的包分析见本文,只是多了Basic.ack包