正文
1、消息的可靠投递
两种方式:confirm确认模式、return 退回模式
rabbitmq 整个消息投递的路径为:producer--->rabbitmq broker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
方式一:confirm确认模式
设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息
发送到exchange后
回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
或者写成lombda
方式二:return 退回模式
设置ConnectionFactory的publisher-returns="true" 开启 退回模式。
使用rabbitTemplate.setReturnCallback设置退回函数,当消息
从exchange路由到queue
失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
将routingkey的值改为错误后,即可触发消息确认提示。
在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(), 用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务
2、Consumer ACK(客户端消息确认)
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto"
自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message 从 RabbitMQ 的消息缓存中移除。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
手动确认:acknowledge="manual"
1)编写监听器,监听队列消息
2)在配置文件中注册
3)测试
3、消费端限流
1)在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息。
2)消费端的确认模式一定为手动确认。acknowledge="manual"
具体实现:
1)编写监听器类
2)编写配置文件
3)结果展示
消息未确认接收
总共11条消息,9条未读,未确认消息2条
消息确认接收
每隔2秒钟进行一次消息拉取
TTL
设置
队列过期时间
使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置队列的过期时间:10s
消息单独过期
消息如果不在队列顶端,过期后不会立即被移除,而是它前面的所有消息被消费之后,再判断当前消息是否过期,如果过期,则移除
死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
1. 队列消息长度到达限制
2. 消费者拒接消费消息
1)设置死信队列与路由键
2)开启消费者端的手动确认,编写消费者端队列监听器
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
1)定义正常交换机与队列,并绑定 3)实现延迟,TTL+死信
2)定义死信交换机与队列,并绑定
3)发送消息
接收者:
1)定义监听器
2)监听死信队列
3)测试
接收者会在TTL计时结束后,收到信息
日志与监控
RabbitMQ日志
RabbitMQ默认日志存放路径:/var/log/rabbitmq/[email protected]
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。
rabbitmqctl管理和监控
查看队列:# rabbitmqctl list_queues
查看exchanges:# rabbitmqctl list_exchanges
查看用户:# rabbitmqctl list_users
查看连接:# rabbitmqctl list_connections
查看消费者信息:# rabbitmqctl list_consumers
查看环境变量:# rabbitmqctl environment
查看未被确认的队列:# rabbitmqctl list_queues name messages_unacknowledged
查看单个队列的内存使用:# rabbitmqctl list_queues name memory
查看准备就绪的队列:# rabbitmqctl list_queues name messages_ready
消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。