概述
上一篇文章 Spring Boot系列21 Spring Websocket实现websocket集群方案讨论 里详细介绍了WebSocket集群的三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案。
第三个方案如下图
在方案一的基础进行如下修改,新的架构图流程如下:
- 服务A增加WS模块,当websocket连接过来时,将此用户的连接信息(主要是websocket sesionId值)存储redis中
- 消息生产者发送消息到的交换机,这些服务不直接推送服务A/B
- 增加新的模块dispatch,此模块接收推送过来的信息,并从redis中读取消息接收用户对应的websocket sesionId值,然后根据上面的规则计算出用户对应的路由键,然后将消息发送到用户订阅的队列上
- 前端接收消息
详细实现的代码
工程名称:mvc 本文在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础进行修改。
在pom.xml中引入redis,rabbitmq相关的jar
<!-- webscoekt 集群 需要 引入支持RabbitMQ, redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码
rabbitmq, redis的配置
application-wscluster.properties
# websocket集群需要配置RabbitMQ
spring.rabbitmq.host:192.168.21.3
spring.rabbitmq.virtual-host: /icc-local
spring.rabbitmq.username: icc-dev
spring.rabbitmq.password: icc-dev
# 配置redis
spring.redis.database=0
spring.redis.host=192.168.21.4
# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
复制代码
IRedisSessionService及实现
接口IRedisSessionService定义了对redis的操作 IRedisSessionService实现类将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询 IRedisSessionService
public interface IRedisSessionService {
void add(String name, String wsSessionId);
boolean del(String name);
String get(String name);
}
复制代码
SimulationRedisSessionServiceImpl 将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询
@Component
public class SimulationRedisSessionServiceImpl implements IRedisSessionService {
@Autowired
private RedisTemplate<String, String> template;
// key = 登录用户名称, value=websocket的sessionId
private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32);
/**
* 在缓存中保存用户和websocket sessionid的信息
* @param name
* @param wsSessionId
*/
public void add(String name, String wsSessionId){
BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS);
}
/**
* 从缓存中删除用户的信息
* @param name
*/
public boolean del(String name){
return template.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection)
throws DataAccessException {
byte[] rawKey = template.getStringSerializer().serialize(name);
return connection.del(rawKey) > 0;
}
}, true);
}
/**
* 根据用户id获取用户对应的sessionId值
* @param name
* @return
*/
public String get(String name){
BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
return