专栏名称: hryou0922
目录
相关文章推荐
深圳发布  ·  原来,是深圳的树啊🥰! ·  22 小时前  
深圳特区报  ·  请看,今天的深圳特区报 ·  21 小时前  
深圳特区报  ·  起猛了,在深圳看见机器人跑步、做早餐…… ·  昨天  
深圳图书馆  ·  穿越千年的衣橱:探索中国服饰的美学密码 ·  2 天前  
深圳大件事  ·  横贯东西!深圳这条城际线有望明年通车→ ·  2 天前  
51好读  ›  专栏  ›  hryou0922

中间件系列七 RabbitMQ之header exchange(头交换机)用法

hryou0922  · 掘金  ·  · 2018-02-06 02:08

正文

1. 概述

header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。
主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值
header Exchange类型用的比较少,但还是知道一点好

2. 本文实现功能说明:

用到的队列说明:

队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,

测试场景:

消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃

消息header数据里有一个特殊值”x-match”,它有两个值:

    all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
    any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

3. 生产者代码

主要业务逻辑如下:
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个headers交换机 
5. 设置要发送消息的headers值(此值由外部传入)
6. 发送消息

第4,5,6步代码如下:

 // 声明一个headers交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
String message = "headers-" + System.currentTimeMillis();

// 生成发送消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties
        .Builder()
        .headers(headers)
        .build();

// 发送消息,并配置消息
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes("UTF-8"));

完整代码
发送者代码: HeaderSend.java

4. 消费者代码

主要业务逻辑如下:

1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个headers交换机 
5. 声明一个临时队列
6. 将队列绑定到指定交换机上,并设置header的参数(此值由外部传入)
7. 接收消息并处理

第4,5,6,7步代码如下:

// 声明一个headers交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);

// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到指定交换机上
channel.queueBind(queueName, EXCHANGE_NAME, "", myHeaders);

System.out.println(" [HeaderRecv ["+ myHeaders +"]] Waiting for messages.");

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [HeaderRecv ["+ myHeaders +"] ] Received '" + properties.getHeaders() + "':'" + message + "'");
    }
};
// 接收消息
channel.basicConsume(queueName, true, consumer);

完整代码
消费者代码: HeaderRecv.java

5. 测试:

BasicTest 启动上文例子中的3个消费者,再发送3个测试消息。结果符合上文的预期

@Test
public void header() throws InterruptedException {

    // 消费者1:绑定 format=pdf,type=report
    executorService.submit(() -> {
        Map<String,Object> headers = new HashMap();
        headers.put("format","pdf");
        headers.put("type","report");
        headers.put("x-match","all");
        HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);
    });

    // 消费者2:绑定  format=pdf,type=log
    executorService.submit(() -> {
        Map<String,Object> headers = new HashMap();
        headers.put("format","pdf");
        headers.put("type","log");
        headers.put("x-match","any");
        HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);
    });

    // 消费者3:绑定  format=zip,type=report
    executorService.submit(() -> {
        Map<String,Object> headers = new HashMap();
        headers.put("format","zip");
        headers.put("type","report");
        headers.put("x-match","all");
     //   headers.put("x-match","any");
        HeaderRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);
    });

    Thread.sleep(2* 1000);
    System.out.println("=============消息1===================");
    // 生产者1 : format=pdf,type=reprot,x-match=all
    executorService.submit(() -> {
        Map<String,Object> headers = new HashMap();
        headers.put("format","pdf");
        headers.put("type","report");
   //     headers.put("x-match","all");
        HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);
    });

    Thread.sleep(5* 100);
    System.out.println("=============消息2===================");
    // 生产者2 : format=pdf,x-match=any
    executorService.submit(() -> {
        Map<String,Object> headers = new HashMap();
        headers.put("format","pdf");
   //     headers.put("x-match","any");
        HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);
    });

    Thread.sleep(5* 100);
    System.out.println("=============消息3===================");
    // 生产者3 : format=zip,type=log,x-match=all
    executorService.submit(() -> {
        Map<String,Object> headers = new HashMap();
        headers.put("format","zip");
        headers.put("type","log");
  //      headers.put("x-match","all");
        HeaderSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, headers);
    });

    // sleep 10s
    Thread.sleep(10 * 1000);
}

输出结果







请到「今天看啥」查看全文