专栏名称: macrozheng
专注Java技术分享,解析优质开源项目。涵盖SpringBoot、SpringCloud、Docker、K8S等实用技术,作者Github开源项目mall(50K+Star)。
目录
相关文章推荐
禽报网  ·  鸭报•2-7\\苗继续下跌,仍有议价;市场走 ... ·  15 小时前  
禽报网  ·  行业\\担心对中国征收进口关税 ... ·  昨天  
宛央女子  ·  关于大S,大家到底在意难平什么? ·  昨天  
宛央女子  ·  用永远天真无畏的杉菜送别大S ·  3 天前  
51好读  ›  专栏  ›  macrozheng

SpringBoot 整合 Websocket 轻松实现IM及时通讯!

macrozheng  · 公众号  ·  · 2024-09-26 10:32

主要观点总结

本文介绍了Springboot如何集成Websocket,IM及时通讯所需的模块,开发和部署过程中遇到的问题,以及实现小型IM及时通讯的代码。文章包含方案实践、小型及时通讯的核心模块、遇到的问题及解决方法和完整代码示例。

关键观点总结

关键观点1: Springboot集成Websocket

介绍如何在Springboot项目中集成Websocket,包括添加依赖、配置类和消息核心类的步骤。

关键观点2: IM及时通讯的模块

阐述IM及时通讯需要的核心模块,包括消息对象模型、消息存储模块、消息发送模块、消息推送模块等。

关键观点3: 开发和部署过程中的问题

列出在开发和部署过程中可能遇到的问题,包括连接自动断开、Session无法被序列化、对象无法自动注入、分布式场景消息如何发给客户端、部署时Nginx配置问题等,并提供相应的解决方案。

关键观点4: 实现小型IM及时通讯的代码示例

提供页面效果、代码结构、代码地址,分享实现小型IM及时通讯的完整代码示例。

关键观点5: 总结和其他推荐内容

对文章内容进行总结,并推荐相关的视频教程和实战项目,如mall-swarm视频教程、基于Spring Boot 3+JDK17的实战项目等。


正文

微服务项目学习: cloud.macrozheng.com

项目中碰到需要及时通讯的场景,使用Springboot集成Websocket,即可瞬间破局。本文介绍Springboot如何集成Websocket、IM及时通讯需要哪些模块、开发和部署过程中遇到的问题、以及实现小型IM及时通讯的代码。

一、方案实践

集成分为三步:添加依赖、增加配置类和消息核心类、前端集成。

1.1、添加依赖

<dependency>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-websocketartifactId>
    <version>2.1.13.RELEASEversion>
dependency>

1.2、增加WebSocket配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * WebSocket配置
 */

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

这或许是一个对你有用的开源项目 ,mall项目是一套基于 SpringBoot3 + JDK 17 + Vue 实现的电商系统( Github标星60K ),采用Docker容器化部署,后端支持多模块和微服务架构。包括前台商城项目和后台管理系统,能支持完整的订单流程!涵盖商品、订单、购物车、权限、优惠券、会员、支付等功能!

  • Boot项目: https://github.com/macrozheng/mall
  • Cloud项目: https://github.com/macrozheng/mall-swarm
  • 视频教程: https://www.macrozheng.com/video/

项目演示:

1.3、增加消息核心类WebSocketServer

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
    // 消息存储
    private static MessageStore messageStore;
    // 消息发送
    private static MessageSender messageSender;

    public static void setMessageStore(MessageStore messageStore) {
        WebSocketServer.messageStore = messageStore;
    }

    public static void setMessageSender(MessageSender messageSender) {
        WebSocketServer.messageSender = messageSender;
    }

    /**
     * 连接建立成功调用的方法
     */

    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        messageStore.saveSession(session);
    }

    /**
     * 连接关闭调用的方法
     */

    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        messageStore.deleteSession(session);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @ Param message 客户端发送过来的消息
     */

    @OnMessage
    public void onMessage(String message, Session session) throws Exception {
        log.warn("=========== 收到来自窗口" + session.getId() + "的信息:" + message);
        handleTextMessage(session, new TextMessage(message));
    }

    /**
     * @param session
     * @param error
     */

    @OnError
    public void onError(Session session, @PathParam("userId") String userId, Throwable error) {
        log.error("=========== 发生错误");
        error.printStackTrace();
//        msgStore.deleteSession(session);
    }

    public void handleTextMessage(Session session, TextMessage message) throws Exception {
        log.warn("=========== Received message: {}", message.getPayload());
    }
}

1.4、前端页面加入socket

html>
<html xmlns="http://www.w3.org/1999/html">
  <head>
    <title>WebSocket Exampletitle>
  head>
  <body>
    登录用户ID:<input type="text" id="sendUserId" />br>
    接受用户ID:<input type="text" id="receivedUserId" />br>

    发送消息内容:"text" id="messageInput" /></br>
    接受消息内容:></br>
    
button>

    <script>
      var socket = new WebSocket("ws://localhost:8080/websocket/aaa" );
      var roomId = "123456";
      // 随机产出六位数字
      var sendUserId = Math.floor(Math.random() * 1000000);

      document.getElementById("sendUserId").value = sendUserId;
      var messageReceive = document.getElementById("messageReceive");


      socket.onopen = function (event{
        console.log("WebSocket is open now.");
        let loginInfo = {
          msgType2//登录消息
          sendUserId: sendUserId,
          bizType1//业务类型
          bizOptModule1//业务模块
          roomId: roomId,
          msgBody: {},
        };
        socket.send(JSON.stringify(loginInfo));
      };

      socket.onmessage = function (event{
        var message = event.data;
        console.log("Received message: " + message);
        messageReceive.value = message;
      };

      socket.onclose = function (event{
        console.log("WebSocket is closed now.");
      };

      function sendMessage({
        var message = document.getElementById("messageInput").value;
        var receivedUserId = document.getElementById("receivedUserId").value;
        let operateInfo = {
          msgType100//业务消息
          sendUserId: sendUserId,
          bizType1//业务类型
          bizOptModule1//业务模块
          roomId: roomId,
          receivedUserId: receivedUserId,
          msgBody: {
            operateType1//操作类型:禁言
            operateContent"1",
          },
        };
        socket.send(JSON.stringify(operateInfo));
      }

      setInterval(() => {
        socket.send("ping");
      }, 30000);
    
script>

  </body>
html>

二、小型及时通讯包含的模块

以上只是集成了Websocket框架,实现了基本的全双工通信,服务器和客户端都可以同时发送和接收数据。要想实现一些小型完整的及时通讯,还需要具备以下几个核心模块。架构图如下:

2.1、架构图

2.2、消息对象模型

组织消息内容,比如消息类型、发送者用户ID、接受者用户ID、具体的消息体等。比如:

public class SocketMsg<T{

    /**
     * 消息类型:1心跳  2登录 3业务操作
     */

    private Integer msgType;

    /**
     * 发送者用户ID
     */

    private String sendUserId;
    /**
     * 接受者用户ID
     */

    private String receivedUserId;

    /**
     * 业务类型
     */

    private Integer bizType;

    /**
     * 业务操作模块
     */

    private Integer bizOptModule;

    /**
     * 消息内容
     */

    private T msgBody;
}

2.3、消息存储模块

负责存储消息内容、用户ID和sessionID的关系,防止数据丢失或者服务器重启等。

2.4、消息发送模块

功能开发完毕,一般部署到分布式集群环境,所以通讯时session会分布在多台服务器。比如用户A的session在机器1,用户B的session在机器2,此时A发送给B,就无法单独通过机器1完成。

因为机器1拿不到机器2里的session,所以消息发不过去。此时只能借助别的中间件来实现,比如借助消息中间件kafka实现。

机器1将消息发送给kafka,然后机器1和机器2都监听kafka,然后查看用户对应的session是否在本机,如果在本机则发送出去。

2.5、消息推送模块

模块3提到的消息发送流程中,消息发送给 消息中间件,然后服务器消费到消费,在通过本机的session推送给客户端。

三、遇到的几个问题

3.1、连接自动断开

WebSocket连接之后,发现一个问题:每隔一段时间如果不传送数据的话,与前端的连接就会自动断开。可以采用心跳消息的方式来解决这个问题。比如客服端每隔30秒自动发送ping消息给服务端,服务端返回pong。

3.2、Session无法被序列化

分布式场景会存在这样的问题:当一次请求负载到第一台服务器时,session在第一台服务器线程上,第二次请求,负载到第二台服务器上,此时通过userId查找当前用户的session时,是查找不到的。

本来想着把session存入到redis中,就可以从redis获取用户的session,希望用这种方式来解决分布式场景下消息发送的问题。但是会出现如下错误:

The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method

翻看了session源码,发现session无法被序列化。所以这个方案只能放弃。解决方案请看下面的 问题5 或者上面的 架构图

3.3、对象无法自动注入

使用了 @ServerEndpoint 注解的类中使用 @Resource @Autowired 注入对象都会失败,并且报空指针异常。

原因是 WebSocket 服务是线程安全的,那么当我们去发起一个 ws 连接时,就会创建一个端点对象。 WebSocket 服务是多对象的,不是单例的。而我们的 Spring Bean 默认就是单例的,在非单例类中注入一个单例的 Bean 是冲突的。

或者说:

Spring管理采用单例模式( singleton ),而 WebSocket 是多对象的,即每个客户端对应后台的一个 WebSocket 对象,也可以理解成 new 了一个 WebSocket,这样当然是不能获得自动注入的对象了,因为这两者刚好冲突。

@Autowired 注解注入对象操作是在启动时执行的,而不是在使用时,而 WebSocket 是只有连接使用时才实例化对象,且有多个连接就有多个对象。所以我们可以得出结论,这个 Service 根本就没有注入到 WebSocket 当中。

如何解决呢?

使用静态对象,并且对外暴露set方法,这样在对象初始化的时候,将其注入到 WebSocketServer 中。比如说这样:

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
  private static MessageStore messageStore;
  private static MessageSender messageSender;

  public static void setMessageStore(MessageStore messageStore) {
      WebSocketServer.messageStore = messageStore;
  }

  public static void setMessageSender(MessageSender messageSender) {
      WebSocketServer.messageSender = messageSender;
  }
}


@Slf4j
@Service
public class MessageStore {
    @Autowired
    private RedisTemplate redisTemplate;

    @PostConstruct
    public void init() {
        WebSocketServer.setMessageStore(this);
    }
}

3.4、分布式场景消息如何发给客户端

问题2 中提到了分布式场景下存在的session不在本机的问题,这种场景可以通过发送消息中间件的方式解决。具体这样解决:

每次连接时,都将userId和对应的session存入到本机,发送消息时,直接发送给MQ-Broker,然后每台应用负载都去消费这个消息,拿到消息之后,判断在本机能根据userId是否能找到session,找到session则推送到客户端。

3.5、部署时Nginx配置问题

代码开发完毕之后,本机跑通后,然后部署到服务器之后,还差很重要的一步:配置nginx代理。

3.5.1、给后端应用部署独立域名

要给后端应用部署独立域名,nginx代理直接转发到应用的独立域名,不要走微服务的gateway网关转发过去。

3.5.2、多层nginx转发问题

当只有一层nginx的时候,配置比较简单,如下:

location ~* ^/api/websocket/* {






请到「今天看啥」查看全文