正文
1. 概述
在这篇文章
Spring Boot系列十三 Spring Boot集成RabbitMQ
中,我们介绍了在Spring Boot如何使用RabbitMQ,本篇文章中,从源码中分析Spring Boot如何集成RabbitMQ。
2. 入口
在spring-boot-autoconfigure.jar中的spring.factories中有如下定义,表示spring启动时,会执行RabbitAutoConfiguration的初始化
…
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\
…
3. RabbitProperties
application_*.yml属性文件
spring:
# 配置rabbitMQspring:
rabbitmq:
host: 10.240.80.134
username: spring-boot
password: spring-boot
virtual-host: spring-boot-vhost
以上的属性文件会被注入到RabbitProperties属性
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
…
}
4. RabbitAutoConfiguration
4.1. 类上的注解分析:
这是一个配置类,在启动时会初始化上面提到RabbitProperties对象,然后它会引入另一个配置类RabbitAnnotationDrivenConfiguration,这个配置类和消息监听有关我们后面再介绍 这个类有3个内部类,且都是配置类,这此配置类会根据条件初始RabbitMQ所需要的类
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
// 会初始化RabbitProperties.class
@EnableConfigurationProperties(RabbitProperties.class)
// 引入@Configuration类RabbitAnnotationDrivenConfiguration
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
…
}
4.2. 内部类RabbitConnectionFactoryCreator
内部类RabbitConnectionFactoryCreator会根据RabbitProperties 配置的参数初始CachingConnectionFactory 实例(它是ConnectionFactory 子类),这个实例是连接RabbitMQ的连接池。
CachingConnectionFactory实例是对RabbitMQ官方提供对com.rabbitmq.client.ConnectionFactory和com.rabbitmq.client.Channel的封装,缓存这两种资源。CachingConnectionFactory有两种缓存模式
1. 如果选择CacheMode#CHANNEL的缓存模式,当我们调用 createConnection()方法时,每次返回相同的Connection。默认情况下,只创建一个Connection,只创建一个Channel(通过配置创建Channel数量参数,可以创建缓存多个Channel)。即可以创建多个Channel,但是所有的Channel共用同一个Connection
2. 如果选择CacheMode#CONNECTION的缓存模式,可以同时配置创建Connection的数量和Channel数据。当调用
createConnection()时,从缓存中获取可用Connection,如果没有且创建的数量没有达到上限,则创建新的Connection。同理Channel
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
throws Exception {
// 根据RabbitProperties 配置RabbitMQ的连接工厂类
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
if (config.determineHost() != null) {
factory.setHost(config.determineHost());
}
…
factory.afterPropertiesSet();
// 连接缓存类
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
factory.getObject());
connectionFactory.setAddresses(config.determineAddresses());
connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
connectionFactory.setPublisherReturns(config.isPublisherReturns());
…
return connectionFactory;
}
}
4.3. 内部类RabbitTemplateConfiguration
内部类RabbitTemplateConfiguration通过类的构造器将RabbitProperties 配置的参数、MessageConverter赋值到类的相应的成员变量上,然后在方法rabbitTemplate()根据RabbitConnectionFactoryCreator创建的CachingConnectionFactory实例 ,创建出RabbitTemplate和RabbitAdmin。
@Configuration
// 引入RabbitConnectionFactoryCreator
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
private final ObjectProvider<MessageConverter> messageConverter;
private final RabbitProperties properties;
// 注入MessageConverter和RabbitProperties
public RabbitTemplateConfiguration(
ObjectProvider<MessageConverter> messageConverter,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.properties = properties;
}
// 初始化RabbitTemplate
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// 创建RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
// 配置MessageConverter
rabbitTemplate.setMessageConverter(messageConverter);
}
// 其它参数配置略
…
return rabbitTemplate;
}
// 初始化AmqpAdmin
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
// 创建RabbitAdmin
return new RabbitAdmin(connectionFactory);
}
}
4.4. 内部配置类:MessagingTemplateConfiguration
内部配置类:MessagingTemplateConfiguration 通过rabbitMessagingTemplate()方法将上面创建的RabbitTemplate 实例注入并创建RabbitMessagingTempla
@Configuration
@ConditionalOnClass(RabbitMessagingTemplate.class)
@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
// 引入RabbitTemplateConfiguration配置类
@Import(RabbitTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {
// 生成实例RabbitMessagingTemplate, 其中RabbitTemplate 由RabbitTemplateConfiguration实例化
@Bean
@ConditionalOnSingleCandidate(RabbitTemplate.class)
public RabbitMessagingTemplate rabbitMessagingTemplate(
RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}
}
通过以上配置就完成的RabbitMQ发送者相关的bean初始化,我们可以使用RabbitTemplate和RabbitAdmin发送消息。如果要监听RabbitMQ消息还需要如下配置,这个配置更加更复杂
5. RabbitAnnotationDrivenConfiguration
此类RabbitAutoConfiguration中引入此类,此类会创建监听消息相关的Bean。我们来详细分析这个类。
5.1. 类的构造方法 :
传入监控需要MessageConverter实例、MessageRecoverer实例、RabbitProperties 实例,做为的类的成员变量
@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<MessageRecoverer> messageRecoverer;
private final RabbitProperties properties;
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<MessageRecoverer> messageRecoverer,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.messageRecoverer = messageRecoverer;
this.properties = properties;
}
…
}
创建SimpleRabbitListenerContainerFactoryConfigurer 对象,此类保存创建RabbitListenerContainer所需要的MessageConverter实例、MessageRecoverer实例、RabbitProperties 实例
// 实例SimpleRabbitListenerContainerFactoryConfigurer 对象,设置MessageConverter、MessageRecovere、RabbitMQ的属性
@Bean
@ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties);
return configurer;
}
5.3. 类中rabbitListenerContainerFactory()方法
创建实例SimpleRabbitListenerContainerFactory (是RabbitListenerContainerFactory的子类),其中SimpleRabbitListenerContainerFactoryConfigurer 来自下面的方法,ConnectionFactory 来自RabbitAutoConfiguration,上面已经解释过了
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
5.4. 启动@EnableRabbit
这个内部类主要看他的@EnableRabbit注解,这个注解会使用RabbitListenerContainer参数,并创建其他相关的Bean实例,并进行监听消息。下节详细介绍@EnableRabbit
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {
}
6. @EnableRabbit
引入配置类RabbitBootstrapConfiguration
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 引入配置类RabbitBootstrapConfiguration
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
7. RabbitBootstrapConfiguration
在这个配置类创建RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry。
@Configuration
public class RabbitBootstrapConfiguration {
// 创建RabbitListenerAnnotationBeanPostProcessor ,@RabbitListener+@RabbitHandler注解的方法,当收到监听消息分发到这些方法进行处理
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
// 创建RabbitListenerEndpointRegistry,供监听节点的注册
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
8. RabbitListenerAnnotationBeanPostProcessor
继承BeanPostProcessor,在Spring创建对象后,会拦截所有的被@RabbitListener+@RabbitHandler注解的方法
8.1. 类的afterSingletonsInstantiated()方法
类在实例化时,会执行初始化,重要操作
1. 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中
2. 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory
3. 调用RabbitListenerEndpointRegistrar.afterPropertiesSet()进行初始化,这个方法内容后面再介绍
// 创建实例
private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
@Override
public void afterSingletonsInstantiated() {
…
// 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
// 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory
if (this.containerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners,初始化RabbitListenerEndpointRegistrar
this.registrar.afterPropertiesSet();
}
8.2. postProcessAfterInitialization()方法
postProcessAfterInitialization()方法会在对象初始化完毕后被执行,此方法会拦截所有的被@RabbitListener和@RabbitHandler注解的方法。
1. @RabbitListener如果注解到方法上,则调用方法processAmqpListener(),此时会使用MethodRabbitListenerEndpoint 封装调用方法
2. @RabbitListener如果注解到类上,且类有方法被@RabbitHandler注解,则调用processMultiMethodListeners(),此时会使用MultiMethodRabbitListenerEndpoint
封装调用方法
MethodRabbitListenerEndpoint 和MultiMethodRabbitListenerEndpoint都是MethodRabbitListenerEndpoint 的子类
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
…
// 处理所有被@RabbitListener注解的方法
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
// 处理所有被@RabbitHandler注解的方法
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
Object bean, String beanName) {
…
for (RabbitListener classLevelListener : classLevelListeners) {
// 创建处理有多个监听方法的类
MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
}
}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
// 创建处理单个监听方法的类
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
8.3. 方法processListener()
无论是方法processMultiMethodListeners()和processMultiMethodListeners()都会进入processListener(),这里做如下内容:
1. 第一步这里先根据监听方法上的@RabbitListener的配置参数,设置MethodRabbitListenerEndpoint 要监听的队列、优先级、排他性等待,
2. 第二步获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint
中
3. 第三步 根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory,默认值为空
4. 第四步将调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。后面会解释这个RabbitListenerEndpointRegistrar类
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
// 这里有设置MethodRabbitListenerEndpoint endpoint的要监听的队列、优先级、排他性等待
…
// 获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint 中
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
}
}
// 根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " +
RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
}
}
// 调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。RabbitListenerEndpointRegistra下面会解释这个类
this.registrar.registerEndpoint(endpoint, factory);
}
9. RabbitListenerEndpointRegistrar
将上文的RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry上的工作类