专栏名称: hryou0922
目录
相关文章推荐
封面新闻  ·  刚刚,中国乒协发声! ·  昨天  
封面新闻  ·  刚刚,中国乒协发声! ·  昨天  
掌上铜山  ·  禁止!禁止!刚刚,乒协发声 ·  昨天  
掌上铜山  ·  禁止!禁止!刚刚,乒协发声 ·  昨天  
Python开发者  ·  北京大学出的第四份 DeepSeek ... ·  2 天前  
天津日报  ·  被禁赛10年!徐克声明—— ·  2 天前  
天津日报  ·  被禁赛10年!徐克声明—— ·  2 天前  
51好读  ›  专栏  ›  hryou0922

中间件系列十 RabbitMQ之消费者端的消息确认机制

hryou0922  · 掘金  ·  · 2018-03-13 02:02

正文

概述

在RabbitMQ中,即使将queue,exchange, message等都设置了持久化之后,还是不能保证100%保证数据不丢失了。为了实现消息不丢失,我们需要从Consumer端和Productor端同时进行处理。本篇文章先介绍Consumer端,在AMPQ-0-9-1中有定义从消费者到RabbitMQ的消息确认机制,通过此机制可以保证消息能够从RabbitMQ正确到达消费者端。本文介绍在RabbitMQ中如何实现消费者端的消息确认机制,包括如下内容

  • 1 消费者的实现机制
  • 2 消费者端的代码实现
  • 3 使用wireshark对消息确认的关键包进行转包,并进行分析
  • 4 在使用消息确认机制的注意点

消费者端投递确认机制

在消费者端确认的方式

RabbitMQ中的两种确认方式:

  • 1 自动确认方式:RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递

    在自动确认模式下,消息发送后即被认为成功投递,又称为”fire-and-forget”
    优点 :这种模式下吞吐量非常高。
    缺点 :A. 有可能出现投递丢失的情况,不同于手动确认模式,如果消费者的TCP连接或通道在消息成功交互之前关闭,则此消息会丢失 B. 消费者端过载的问题。在手动确认模式中,可以设置一次最多同时处理多少消息,而自动模式不能设置此值。因此,消费者有可能因为消息无法及时处理,堆积中内存中,内存耗尽而奔溃 C. 此种模式只推荐在消费者可以快速且稳定处理投递的消息的场景中使用

  • 2 手动处理方式:消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功

    手动消息确认方法有:
    § basic.ack 用于肯定确认
    § basic.nack 用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
    § basic.reject 用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
    消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理,但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息。
    手动的确认模式的投递效率略低于自动,但是可以弥补自动确认模式的不足。

批量手动投递确认

消息手动除了一次确认一条,也可以一次确认多条。为了减少网络流量,可以批量手动确认。在应答时,设置basic.nack的multiple 字段为true,可以同时对delivery_tag和比delivery_tag值小的投递消息进行确认 例如,假设在通道上没有确认消息的delivery_tag是5,6,7和8,当basic.nack中delivery_tag被设置为8并且multiple 被设置为true时,方法执行成功后,从5到8的所有消息将被确认。 如果multiple 设置为false,那么交货5,6和7仍然是未确认的。

投递唯一码: Delivery Tags

当消费者向RabbitMQ注册后,RabbitMQ使用basic.deliver向消费者投递消息时,消息体上会带上delivery tag,这个值会唯一标识本次投递,在同一通道上,此值是唯一的。delivery tag值有64位长度,值从1开始,每发送一次消息值递增1,最大值为9223372036854775807。消费者端在应答消息时,带上此参数,告诉RabbitMQ某次投递已经正确应答。

消费者端消息投递确认代码

工程:

工程: 代码路径:

消费者代码:

代码:关键点
a. channel.basicConsume设置接收非自动确认
b. 在处理完消息后,调用channel.basicAck进行手动消息确认

// 默认消费者实现
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [ConsumerConfirmRecv] Received '" + message + "'");
        // 消息正向确认
        channel.basicAck(envelope.getDeliveryTag(),true);
         // 消息否定确认: 如果设置multiple=false,requeue值启作用,如果设置multiple=true,则requeue无论设置什么值,后台统一处理成true
        // channel.basicNack(envelope.getDeliveryTag(),false, false);
    }
};
// 接收消息:设置非自动确认
channel.basicConsume(QUEUE_NAME, false, consumer);

消费者端抓包分析

执行以上的代码,并进行抓包分析其中关键包。和一般的消息包不同点时,一般的消费者端的包分析见 中间件系列九 RabbmtiMQ 通过wireshark抓包学习AMQP协议

正向确认-Basic.ack

和一般的消息包不同点时,一般的消费者端的包分析见本文,只是多了Basic.ack包
这里写图片描述







请到「今天看啥」查看全文