专栏名称: hryou0922
目录
相关文章推荐
CEO管理秘籍  ·  DeepSeek统计最暴利的10个行业1:殡 ... ·  15 小时前  
CEO管理秘籍  ·  DeepSeek统计最暴利的10个行业1:殡 ... ·  15 小时前  
金融街老裘  ·  累了,困了 ·  2 天前  
51好读  ›  专栏  ›  hryou0922

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

hryou0922  · 掘金  ·  · 2018-07-31 02:25

正文

阅读 20

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

概述

上一篇文章 Spring Boot系列21 Spring Websocket实现websocket集群方案讨论 里详细介绍了WebSocket集群的三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案。

第三个方案如下图

这里写图片描述

在方案一的基础进行如下修改,新的架构图流程如下:

  1. 服务A增加WS模块,当websocket连接过来时,将此用户的连接信息(主要是websocket sesionId值)存储redis中
  2. 消息生产者发送消息到的交换机,这些服务不直接推送服务A/B
  3. 增加新的模块dispatch,此模块接收推送过来的信息,并从redis中读取消息接收用户对应的websocket sesionId值,然后根据上面的规则计算出用户对应的路由键,然后将消息发送到用户订阅的队列上
  4. 前端接收消息

详细实现的代码

工程名称:mvc 本文在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础进行修改。

在pom.xml中引入redis,rabbitmq相关的jar

<!--  webscoekt 集群 需要 引入支持RabbitMQ, redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码

rabbitmq, redis的配置

application-wscluster.properties

# websocket集群需要配置RabbitMQ
spring.rabbitmq.host:192.168.21.3
spring.rabbitmq.virtual-host: /icc-local
spring.rabbitmq.username: icc-dev
spring.rabbitmq.password: icc-dev

# 配置redis
spring.redis.database=0
spring.redis.host=192.168.21.4
# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0  
spring.redis.pool.max-active=8  
spring.redis.pool.max-wait=-1
复制代码

IRedisSessionService及实现

接口IRedisSessionService定义了对redis的操作 IRedisSessionService实现类将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询 IRedisSessionService

public interface IRedisSessionService {
    void add(String name, String wsSessionId);
    boolean del(String name);
    String get(String name);
}
复制代码

SimulationRedisSessionServiceImpl 将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询

@Component
public class SimulationRedisSessionServiceImpl implements IRedisSessionService {

    @Autowired
    private RedisTemplate<String, String> template;

    // key = 登录用户名称, value=websocket的sessionId
    private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32);

    /**
     * 在缓存中保存用户和websocket sessionid的信息
     * @param name
     * @param wsSessionId
     */
    public void add(String name, String wsSessionId){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS);
    }

    /**
     * 从缓存中删除用户的信息
     * @param name
     */
    public boolean del(String name){
        return template.execute(new RedisCallback<Boolean>() {

            @Override
            public Boolean doInRedis(RedisConnection connection)
                    throws DataAccessException {
                byte[] rawKey = template.getStringSerializer().serialize(name);
                return connection.del(rawKey) > 0;
            }
        }, true);
    }

    /**
     * 根据用户id获取用户对应的sessionId值
     * @param name
     * @return
     */
    public String get(String name){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        return






请到「今天看啥」查看全文