正文
1. 概述
本文通过Fanout exchange(扇型交换机)实现生产者发送一个消息,这个消息同时被传送给所有消费者。Fanout exchange(扇型交换机)的定义见这篇文章
中间件系列三 RabbitMQ之交换机的四种类型和属性
本文主要内容:
-
实现上文定义的场景
-
交换机方法的声明方法、参数说明和实现
-
临时队列的作用、声明方法和实现
-
绑定(Bindings)的方法、参数说明和实现
-
测试
下文先分别介绍生产者和消费者的代码,然后对其进行测试,模拟上面的发布/订阅场景。完整的代码在文末
2. 生产者代码
主要逻辑在这个类中: Publish.java
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个fanout交换机 (而不是声明一个队列)
5. 发送消息
这里和之前文章最大的不同是,多了声明一个fanout交换机,而不是声明队列。这里表明我们不使用默认交换机,在消费端需要执行队列和交换机的绑定操作
声明交换机方法
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException
详细参数如下:
第一个参数exchange
:交换机的名称
第二个参数type
:交换机的类型
第三个参数durable
:是否持久化,如果true,则当前RabbitMQ重启的时候,它依旧存在
第四个参数autoDelete
:当没有生成者/消费者使用此交换机时,此交换机会被自动删除。
第五个参数arguments
:其它的扩展属性
代码如下
// 声明一个fanout交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT, false, false, null);
// 发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
完整代码
发送者代码:
Publish.java
3. 消费者代码
消费者主要逻辑在这个类中: Publish.java
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个fanout交换机
5. 声明一个临时队列
6. 将临时队列绑定到交换机上
7. 接收消息并处理
这里和之前文章最大的不同是,多了声明一个fanout交换机,而不是声明队列。这里表明我们不使用默认交换机,在消费端需要执行队列和交换机的绑定操作
临时队列:
在本例子中,我们使用临时队列。临时队列有以下特点:
1. 消费者每次连接时,都会创建一个新的队列,队列名称随机生成
2. 当消费者断开连接时,队列会自动变删除
在RabbitMQ中,随机使用的队列名称类似amq.gen-
**
声明方法:
Queue.DeclareOk queueDeclare() throws IOException;
此方法等价于创建一个
Queue.DeclareOk queueDeclare(String queue="自动生成类似amq.gen-******的随机名称", boolean durable=false, boolean exclusive=true, boolean autoDelete=true, Map<String, Object> arguments=null) throws IOException;
代码:创建临时队列,得到队列名称,用于后面的绑定使用
String queueName = channel.queueDeclare().getQueue();
绑定(Bindings)