专栏名称: 阳春三月594
若有恒何必三更眠五更起,最无益莫过一日曝十日寒。
目录
相关文章推荐
钱江晚报  ·  突然爆发!飙涨! ·  昨天  
浙江之声  ·  海宁,猛进如潮再出发! ·  昨天  
教育之江  ·  图说 | ... ·  2 天前  
教育之江  ·  图说 | ... ·  2 天前  
51好读  ›  专栏  ›  阳春三月594

RabbitMQ 特性

阳春三月594  · 简书  ·  · 2021-01-08 11:49

正文

1、消息的可靠投递

两种方式:confirm确认模式、return  退回模式

rabbitmq 整个消息投递的路径为:producer--->rabbitmq broker--->exchange--->queue--->consumer

消息从 producer 到 exchange 则会返回一个 confirmCallback 。

消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

方式一:confirm确认模式

设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息 发送到exchange后 回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

或者写成lombda

方式二:return  退回模式

设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

使用rabbitTemplate.setReturnCallback设置退回函数,当消息 从exchange路由到queue 失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

将routingkey的值改为错误后,即可触发消息确认提示。



在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。

使用channel下列方法,完成事务控制:

txSelect(), 用于将当前channel设置成transaction模式

txCommit(),用于提交事务

txRollback(),用于回滚事务

2、Consumer ACK(客户端消息确认)

有三种确认方式:

自动确认:acknowledge="none"

手动确认:acknowledge="manual"

根据异常情况确认:acknowledge="auto"

自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message 从 RabbitMQ 的消息缓存中移除。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

手动确认:acknowledge="manual"

1)编写监听器,监听队列消息

2)在配置文件中注册

3)测试

3、消费端限流

1)在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息。

2)消费端的确认模式一定为手动确认。acknowledge="manual"

具体实现:

1)编写监听器类

2)编写配置文件

3)结果展示

消息未确认接收

总共11条消息,9条未读,未确认消息2条

消息确认接收

每隔2秒钟进行一次消息拉取

TTL

设置 队列过期时间 使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置队列的过期时间:10s

消息单独过期

消息如果不在队列顶端,过期后不会立即被移除,而是它前面的所有消息被消费之后,再判断当前消息是否过期,如果过期,则移除

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

1. 队列消息长度到达限制;

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

1. 队列消息长度到达限制

2. 消费者拒接消费消息

1)设置死信队列与路由键

2)开启消费者端的手动确认,编写消费者端队列监听器

3. 原队列存在消息过期设置,消息到达超时时间未被消费;


延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

1)定义正常交换机与队列,并绑定    3)实现延迟,TTL+死信

2)定义死信交换机与队列,并绑定

3)发送消息

接收者:

1)定义监听器

2)监听死信队列

3)测试

接收者会在TTL计时结束后,收到信息


日志与监控

RabbitMQ日志

RabbitMQ默认日志存放路径:/var/log/rabbitmq/[email protected]

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

rabbitmqctl管理和监控

查看队列:# rabbitmqctl list_queues

查看exchanges:# rabbitmqctl list_exchanges

查看用户:# rabbitmqctl list_users

查看连接:# rabbitmqctl list_connections

查看消费者信息:# rabbitmqctl list_consumers

查看环境变量:# rabbitmqctl environment

查看未被确认的队列:# rabbitmqctl list_queues  name messages_unacknowledged

查看单个队列的内存使用:# rabbitmqctl list_queues name memory

查看准备就绪的队列:# rabbitmqctl list_queues name messages_ready



消息追踪

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。







请到「今天看啥」查看全文