专栏名称: MacroZheng
公众号:macrozheng
目录
相关文章推荐
掌上长春  ·  周末旅行团——余晖暖忆暮程记⏳ ·  昨天  
掌上长春  ·  周末旅行团——余晖暖忆暮程记⏳ ·  昨天  
云南省文化和旅游厅  ·  惊喜云南 | 丽江东巴谷:打卡城市喧嚣外的 ... ·  昨天  
云南省文化和旅游厅  ·  惊喜云南 | 丽江东巴谷:打卡城市喧嚣外的 ... ·  昨天  
璞缇客精品酒店  ·  1.5h直达!比婺源更浪漫的金色秘境!乘船入 ... ·  5 天前  
51好读  ›  专栏  ›  MacroZheng

连RabbitMQ的5种核心消息模式都不懂,也敢说自己会用消息队列!

MacroZheng  · 掘金  ·  · 2020-06-10 00:58

正文

阅读 208

连RabbitMQ的5种核心消息模式都不懂,也敢说自己会用消息队列!

SpringBoot实战电商项目mall(35k+star)地址: github.com/macrozheng/…

摘要

以前看过的关于RabbitMQ核心消息模式的文章都是基于JavaAPI的,最近看了下官方文档,发现这些核心消息模式都可以通过Spring AMQP来实现。于是总结了下RabbitMQ的实用技巧,包括RabbitMQ在Windows和Linux下的安装、5种核心消息模式的Spring AMQP实现,相信对于想要学习和回顾RabbitMQ的朋友都会有所帮助。

简介

RabbitMQ是最受欢迎的开源消息中间件之一,在全球范围内被广泛应用。RabbitMQ是轻量级且易于部署的,能支持多种消息协议。RabbitMQ可以部署在分布式系统中,以满足大规模、高可用的要求。

相关概念

我们先来了解下RabbitMQ中的相关概念,这里以5种消息模式中的 路由模式 为例。

标志 中文名 英文名 描述
P 生产者 Producer 消息的发送者,可以将消息发送到交换机
C 消费者 Consumer 消息的接收者,从队列中获取消息并进行消费
X 交换机 Exchange 接收生产者发送的消息,并根据路由键发送给指定队列
Q 队列 Queue 存储从交换机发来的消息
type 交换机类型 type 不同类型的交换机转发消息方式不同
fanout 发布/订阅模式 fanout 广播消息给所有绑定交换机的队列
direct 路由模式 direct 根据路由键发送消息
topic 通配符模式 topic 根据路由键的匹配规则发送消息

安装及配置

接下来我们介绍下RabbitMQ的安装和配置,提供Windows和Linux两种安装方式。

Windows下的安装

  • 安装完成后,进入RabbitMQ安装目录下的sbin目录;

  • 在地址栏输入cmd并回车启动命令行,然后输入以下命令启动管理功能。
rabbitmq-plugins enable rabbitmq_management
复制代码

Linux下的安装

  • 下载 rabbitmq 3.7.15 的Docker镜像;
docker pull rabbitmq:3.7.15
复制代码
  • 使用Docker命令启动服务;
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \
-d rabbitmq:3.7.15
复制代码
  • 进入容器并开启管理功能;
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_management
复制代码

  • 开启防火墙便于外网访问。
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
复制代码

访问及配置

  • 访问RabbitMQ管理页面地址,查看是否安装成功(Linux下使用服务器IP访问即可): http://localhost:15672/

  • 输入账号密码并登录,这里使用默认账号密码登录:guest guest

  • 创建帐号并设置其角色为管理员:mall mall

  • 创建一个新的虚拟host为:/mall

  • 点击mall用户进入用户配置页面;

  • 给mall用户配置该虚拟host的权限;

  • 至此,RabbitMQ的配置完成。

5种消息模式

这5种消息模式是构建基于RabbitMQ的消息应用的基础,一定要牢牢掌握它们。学过RabbitMQ的朋友应该了解过这些消息模式的Java实现,这里我们使用Spring AMQP的形式来实现它们。

简单模式

简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。

模式示意图

Spring AMQP实现

  • 首先需要在 pom.xml 中添加Spring AMQP的相关依赖;
<!--Spring AMQP依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
  • 然后修改 application.yml ,添加RabbitMQ的相关配置;
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /mall
    username: mall
    password: mall
    publisher-confirms: true #消息发送到交换器确认
    publisher-returns: true #消息发送到队列确认
复制代码
  • 添加 简单模式 相关Java配置,创建一个名为 simple.hello 的队列、一个生产者和一个消费者;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class SimpleRabbitConfig {

	@Bean
	public Queue hello() {
		return new Queue("simple.hello");
	}

	@Bean
	public SimpleSender simpleSender(){
		return new SimpleSender();
	}

	@Bean
	public SimpleReceiver simpleReceiver(){
		return new SimpleReceiver();
	}

}
复制代码
  • 生产者通过 send方法 向队列 simple.hello 中发送消息;
/**
 * Created by macro on 2020/5/19.
 */
public class SimpleSender {

	private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

	@Autowired
	private RabbitTemplate template;

	private static final String queueName="simple.hello";

	public void send() {
		String message = "Hello World!";
		this.template.convertAndSend(queueName, message);
		LOGGER.info(" [x] Sent '{}'", message);
	}

}
复制代码
  • 消费者从队列 simple.hello 中获取消息;
/**
 * Created by macro on 2020/5/19.
 */
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

    @RabbitHandler
    public void receive(String in) {
        LOGGER.info(" [x] Received '{}'", in);
    }

}
复制代码
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private SimpleSender simpleSender;

    @ApiOperation("简单模式")
    @RequestMapping(value = "/simple", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult simpleTest() {
        for(int i=0;i<10;i++){
            simpleSender.send();
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
复制代码
  • 运行后结果如下,可以发现生产者往队列中发送消息,消费者从队列中获取消息并消费。

工作模式

工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

模式示意图

Spring AMQP实现

  • 添加 工作模式 相关Java配置,创建一个名为 work.hello 的队列、一个生产者和两个消费者;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class WorkRabbitConfig {

    @Bean
    public Queue workQueue() {
        return new Queue("work.hello");
    }

    @Bean
    public WorkReceiver workReceiver1() {
        return new WorkReceiver(1);
    }

    @Bean
    public WorkReceiver workReceiver2() {
        return new WorkReceiver(2);
    }

    @Bean
    public WorkSender workSender() {
        return new WorkSender();
    }

}
复制代码
  • 生产者通过 send方法 向队列 work.hello 中发送消息,消息中包含一定数量的 . 号;
/**
 * Created by macro on 2020/5/19.
 */
public class WorkSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

    @Autowired
    private RabbitTemplate template;

    private static final String queueName = "work.hello";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3+1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('.');
        }
        builder.append(index+1);
        String message = builder.toString();
        template.convertAndSend(queueName, message);
        LOGGER.info(" [x] Sent '{}'", message);
    }

}
复制代码
  • 两个消费者从队列 work.hello 中获取消息,名称分别为 instance 1 instance 2 ,消息中包含 . 号越多,耗时越长;
/**
 * Created by macro on 2020/5/19.
 */
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

    private final int instance;

    public WorkReceiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String in) {
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                ThreadUtil.sleep(1000);
            }
        }
    }

}
复制代码
  • 在controller中添加测试接口,调用该接口开始发送消息;
/**
 * Created by macro on 2020/5/19.
 */
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private WorkSender workSender;

    @ApiOperation("工作模式")
    @RequestMapping(value = "/work", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult workTest() {
        for(int i=0;i<10;i++){
            workSender.send(i);
            ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null);
    }
}
复制代码
  • 运行后结果如下,可以发现生产者往队列中发送包含不同数量 . 号的消息, instance 1 instance 2 消费者互相竞争,分别消费了一部分消息。

发布/订阅模式

发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。

模式示意图

Spring AMQP实现

  • 添加 发布/订阅模式 相关Java配置,创建一个名为 exchange.fanout 的交换机、一个生产者、两个消费者和两个匿名队列,将两个匿名队列都绑定到交换机;
/**
 * Created by macro on 2020/5/19.
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("exchange.fanout");
    }

    @Bean
    public Queue fanoutQueue1() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue fanoutQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanout);
    }

    @Bean
    public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanout);
    }

    @Bean
    public FanoutReceiver fanoutReceiver() {
        return new FanoutReceiver();
    }

    @Bean
    public FanoutSender fanoutSender() {
        return new FanoutSender();
    }

}
复制代码
  • 生产者通过 send方法 向交换机 exchange.fanout 中发送消息,消息中包含一定数量的 . 号;
/**
 * Created by macro on 2020/5/19.
 */






请到「今天看啥」查看全文