专栏名称: Java基基
一个苦练基本功的 Java 公众号,所以取名 Java 基基
目录
相关文章推荐
张栋伟  ·  百度文心4.5定档316, ... ·  16 小时前  
张栋伟  ·  百度文心4.5定档316, ... ·  16 小时前  
宝鸡市场监管  ·  养老机器人国际标准来了!我国牵头制定 ·  昨天  
宝鸡市场监管  ·  养老机器人国际标准来了!我国牵头制定 ·  昨天  
AIGC开放社区  ·  GPU效率暴涨!DeepSeek开源Deep ... ·  3 天前  
AIGC开放社区  ·  GPU效率暴涨!DeepSeek开源Deep ... ·  3 天前  
51好读  ›  专栏  ›  Java基基

SpringBoot+Redis自定义注解实现发布订阅

Java基基  · 公众号  · 科技自媒体  · 2024-11-23 11:55

主要观点总结

文章介绍了关于社群交流、开源项目、Redis消息组件以及后台管理系统等方面的内容。

关键观点总结

关键观点1: 社群交流内容

文章提到一个社群,提供一对一交流、面试小册、简历优化、求职解惑等服务,并提供了知识星球的加入方式。

关键观点2: 开源项目介绍

文章介绍了一个破10w+的开源项目,包括前端和后端,涵盖多种功能如RBAC权限、SaaS多租户等,并提供了两个仓库地址和视频教程。

关键观点3: Redis消息组件的设计和实现

文章详细描述了自定义Redis消息组件的设计和实现过程,包括生产消息和消费消息的方法,以及如何通过注解实现消息流转。

关键观点4: 后台管理系统的设计和实现

文章介绍了基于Spring Cloud Alibaba等技术实现的后台管理系统,支持多种功能如RBAC动态权限等,并提供了项目地址和视频教程。


正文

👉 这是一个或许对你有用 的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入 芋道快速开发平台 知识星球。 下面是星球提供的部分资料:

👉 这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本

来源:juejin.cn/post/
7265624177775558716


前言

最近开发了一个内部消息组件,逻辑大体是通过定义注解 @MessageHub ,在启动时扫描全部bean中有使用了该注解的方法后台创建一个常驻线程代理消费数据,当线程消费到数据就回写到对应加了注解的方法里。

@Slf4j
@Service
public class RedisConsumerDemo {
    @MessageHub (topic = "${uptown.topic}", type = "REDIS_PUBSUB")
    public void consumer(Object message) {
        log.info("pubsub info {} ", message);
    }   
}

实现redis的队列、stream方式实现都很简单,唯独发布订阅方式,网上的demo全都是一个固定套路,通过redis容器注入监听器,而且回写非常死板。那么如何将这块的逻辑统一呢。

之前总结过消息组件的代码设计,这里贴一下链接:

https://juejin.cn/post/7204113113699729463

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

常规写法

常规实现reids的发布订阅模式写法一共三步

1.创建消息监听器

@Bean 
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
    return new MessageListenerAdapter(messageListener, "onMessage");
}

2.创建订阅器

@Component
public class TestSubscriber implements MessageListener {
 
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("get data :{}", msg);
    }
 
}

3.向redis容器中添加消息监听器

@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer container(
        RedisConnectionFactory redisConnectionFactory,
        MessageListenerAdapter smsExpirationListener)
 
{
    
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
        return container;
    }
}

这样定义非常简单明了,但是有个问题是太代码僵硬了,创建监听者很不灵活,只能指定内部的onMessage方法,那么怎么才能融入到我们的内部消息流转中间件里呢。

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

自定义注解实现

我们内部组件抽象了两个方法,生产和消费,但这两个方法逻辑截然不同,生产方法是暴露给serverice层接口调用,调用方在调用生产方法后能直接知道生产了几条数据和成功与否。而消费方法是配合Spring生命周期函数服务启动时建立常驻消费线程的。

/**
 * 生产消息
 */

Integer producer(MessageForm messageForm);

/**
 * 消费消息
 */

void consumer(ConsumerAdapterForm adapterForm);

生产消息当然很容易实现,只需要调用已经封装好的 convertAndSend 方法。

stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());

消费方法就有说法了,动态生成监听者的场景下使用redis容器用代码挨个注册已经满足不了了,但仔细过一遍源代码就会发现,监听类的构造方法的入参只有两个,第一个需要回调的代理类,第二个消费到数据后回调的方法。

/**
 * Create a new {@link MessageListenerAdapter} for the given delegate.
 *
 * @param delegate the delegate object
 * @param defaultListenerMethod method to call when a message comes
 * @see #getListenerMethodName
 */

public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
   this(delegate);
   setDefaultListenerMethod(defaultListenerMethod);
}

那么好了好了,方案有了,本质上就是把 RedisMessageListenerContainer 注入进来之后,扫描项目里所有加了 @MessageHub 的bean,包装成监听类加载到容器里就完事了。

怎么扫描的代码就不再赘述了,实现Spring的生命周期函数 BeanPostProcessor#postProcessAfterInitialization ,在这里用 AnnotationUtils 判断是否标注了注解。

MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
    continue;
}

标注了后判断如果是发布订阅,进入发布订阅的实现类。

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {

    @Resource
    RedisMessageListenerContainer redisPubSubContainer;






请到「今天看啥」查看全文