专栏名称: IT大咖说
大咖干货,不再错过。 让不在大会现场的程序猿、攻城狮也能体验现场的精彩瞬间。
目录
相关文章推荐
植物星球  ·  有些报春花一个省可能只有一个 ·  4 天前  
51好读  ›  专栏  ›  IT大咖说

基于Spring封装一个websocket工具类使用事件发布进行解耦和管理

IT大咖说  · 公众号  ·  · 2024-05-17 20:00

正文

最近工作中,需要将原先的Http请求换成WebSocket,故此需要使用到WebSocket与前端交互。故此这边需要研究一下WebSocket到底有何优点和不可替代性:

WebSocket优点:

WebSocket 协议提供了一种在客户端和服务器之间进行全双工通信的机制,这意味着客户端和服务器可以在任何时候互相发送消息,而不需要预先建立请求。与传统的 HTTP 轮询相比,WebSocket 有以下不可替代的优点:

1. 低延迟 : WebSocket 提供了真正的实时通信能力,因为它允许服务器在数据可用时立即将其推送到客户端。这比 HTTP 轮询的“询问-回答”模式更高效,轮询模式可能会引入不必要的延迟。

2. 减少网络流量 : 在 HTTP 轮询中,客户端需要定期发送请求以检查更新,即使没有更新也是如此。这会产生大量冗余的 HTTP 头部信息和请求响应。相比之下,WebSocket 在建立连接后,只需要非常少的控制开销就可以发送和接收消息。

3. 持久连接 : WebSocket 使用单个持久连接进行通信,而不需要为每个消息或请求重新建立连接。这减少了频繁建立和关闭连接的开销,提高了效率。

4. 双向通信 : WebSocket 支持全双工通信,客户端和服务器可以同时发送消息,而不需要等待对方的响应。这对于需要快速双向数据交换的应用程序来说是非常重要的。

5. 更好的服务器资源利用 : 由于 WebSocket 连接是持久的,服务器可以更有效地管理资源,而不是在每个轮询请求中重新初始化资源。

6. 协议开销小 : WebSocket 消息包含非常少的协议开销,相比之下,HTTP 协议的每个请求/响应都包含了完整的头部信息。

7. 支持二进制数据 : WebSocket 不仅支持文本数据,还支持二进制数据,这使得它可以用于更广泛的应用场景,如游戏、视频流和其他需要高效二进制数据传输的应用。

8. 兼容性 : 尽管是较新的技术,WebSocket 已经得到了现代浏览器的广泛支持,并且可以通过 Polyfills 在不支持的浏览器上使用。

时序图:



这个流程图展示了以下步骤:

  • 握手阶段:客户端向服务器发送 WebSocket 连接请求,服务器响应并切换协议。

  • 连接建立:WebSocket 连接建立后,客户端和服务器可以相互发送消息。

  • 通信循环:客户端和服务器在建立的 WebSocket 连接上进行消息交换。

  • 关闭握手:客户端或服务器发起关闭连接的请求,另一方响应,然后连接关闭。

因为以上优点这边将需要重新构建一套WebSocket工具类实现这边的要求:

工具类实现:

在 Spring 中封装 WebSocket 工具类通常涉及使用 Spring 提供的 WebSocket API。

WebSocketUtils

WebSocket 工具类封装示例,它使用 Spring 的 WebSocketSession 来发送消息给客户端。

  • 异常处理 : 在发送消息时,如果发生异常,我们可以添加更详细的异常处理逻辑。

  • 会话管理 : 我们可以添加同步块或使用 ConcurrentHashMap 的原子操作来确保线程安全。

  • 用户标识符管理 : 提供一个更灵活的方式来管理用户标识符和会话之间的关系。

  • 事件发布 : 使用 Spring 事件发布机制来解耦和管理 WebSocket 事件。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author derek_smart
* @Date 202/5/11 10:05
* @Description WebSocket 工具类
*/

@Component
public class WebSocketUtils extends TextWebSocketHandler {

private final Map sessions = new ConcurrentHashMap<>();
@Autowired
private ApplicationEventPublisher eventPublisher;


public void registerSession(String userIdentifier, WebSocketSession session) {
sessions.put(userIdentifier, session);
// Publish an event when a session is registered
eventPublisher.publishEvent(new WebSocketSessionRegisteredEvent(this, session, userIdentifier));
}

public void removeSession(String userIdentifier) {
WebSocketSession session = sessions.remove(userIdentifier);
if (session != null) {
// Publish an event when a session is removed
eventPublisher.publishEvent(new WebSocketSessionRemovedEvent(this, session, userIdentifier));
}
}

public void sendMessageToUser(String userIdentifier, String message) {
WebSocketSession session = sessions.get(userIdentifier);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
// Handle the exception, e.g., logging or removing the session
handleWebSocketException(session, e);
}
}
}

public void sendMessageToAllUsers(String message) {
TextMessage textMessage = new TextMessage(message);
sessions.forEach((userIdentifier, session) -> {
if (session.isOpen()) {
try {
session.sendMessage(textMessage);
} catch (IOException e) {
// Handle the exception, e.g., logging or removing the session
handleWebSocketException(session, e);
}
}
});
}

private void handleWebSocketException(WebSocketSession session, IOException e) {
// Log the exception
// Attempt to close the session if it's still open
if (session.isOpen()) {
try {
session.close();
} catch (IOException ex) {
// Log the exception during close
}
}
// Remove the session from the map
sessions.values().remove(session);
// Further exception handling...
}

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// This method can be overridden to handle connection established event
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// This method can be overridden to handle connection closed event
}

// Additional methods like handleTextMessage can be overridden if needed

// Custom events
public static class WebSocketSessionRegisteredEvent extends ApplicationEvent {
private final WebSocketSession session;
private final String userIdentifier;

/**
* Create a new WebSocketSessionRegisteredEvent.
* @param source the object on which the event initially occurred (never {@code null})
* @param session the WebSocket session which has been registered
* @param userIdentifier the identifier of the user for whom the session is registered
*/

public WebSocketSessionRegisteredEvent(Object source, WebSocketSession session, String userIdentifier) {
super(source);
this.session = session;
this.userIdentifier = userIdentifier;
}

public WebSocketSession getSession() {
return session;
}

public String getUserIdentifier() {
return userIdentifier;
}
}

public static class WebSocketSessionRemovedEvent extends ApplicationEvent {

private final WebSocketSession session;
private final String userIdentifier;

/**
* Create a new WebSocketSessionRemovedEvent.
* @param source the object on which the event initially occurred (never {@code null})
* * @param session the WebSocket session which has been removed
* * @param userIdentifier the identifier of the user for whom the session was removed
* */

public WebSocketSessionRemovedEvent(Object source, WebSocketSession session, String userIdentifier) {
super(source);
this.session = session;
this.userIdentifier = userIdentifier;
}

public WebSocketSession getSession() {
return session;
}

public String getUserIdentifier() {
return userIdentifier;
}
}
}


在这个工具类中,我们使用了 ConcurrentHashMap 来存储和管理 WebSocket 会话。每个会话都与一个用户标识符相关联,这允许我们向特定用户发送消息。 使用了 ApplicationEventPublisher 来发布会话注册和移除事件,这样可以让其他组件在需要时响应这些事件。

另外,我们让 WebSocketUtils 继承了 TextWebSocketHandler,这样它可以直接作为一个 WebSocket 处理器。这意味着你可以重写
afterConnectionEstablished 和 afterConnectionClosed 方法来处理连接建立和关闭的事件,而不是在一个单独的 WebSocketHandler 中处理它们。

通过这些优化,WebSocketUtils 工具类变得更加健壮和灵活,能够更好地集成到 Spring 应用程序中

工具类提供了以下方法:

  • registerSession: 当新的 WebSocket 连接打开时,将该会话添加到映射中。

  • removeSession: 当 WebSocket 连接关闭时,从映射中移除该会话。

  • sendMessageToUser: 向特定用户发送文本消息。

  • sendMessageToAllUsers: 向所有连接的用户发送文本消息。

  • getSessions: 返回当前所有的 WebSocket 会话。

WebSocketHandler

为了完整地实现一个 WebSocket 工具类,你还需要创建一个 WebSocketHandler 来处理 WebSocket 事件,如下所示:

import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
/**
* @Author derek_smart
* @Date 202/5/11 10:05
* @Description WebSocketHandler
*/

public class MyWebSocketHandler implements WebSocketHandler {

private final WebSocketUtils webSocketUtils;

public MyWebSocketHandler(WebSocketUtils webSocketUtils) {
this.webSocketUtils = webSocketUtils;
}

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userIdentifier = retrieveUserIdentifier(session);
webSocketUtils.registerSession(userIdentifier, session);
}

@Override
public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {
// Handle incoming messages
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
// Handle transport error
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String userIdentifier = retrieveUserIdentifier(session);
webSocketUtils.removeSession(userIdentifier);
}

@Override
public boolean supportsPartialMessages() {
return false;
}

private String retrieveUserIdentifier(WebSocketSession session) {
// Implement logic to retrieve the user identifier from the session
return session.getId(); // For example, use the WebSocket session ID
}
}

在 MyWebSocketHandler 中,我们处理了 WebSocket 连接的建立和关闭事件,并且在这些事件发生时调用 WebSocketUtils 的方法注册或移除会话。

WebSocketConfig

要在 Spring 中配置 WebSocket,你需要在配置类中添加 WebSocketHandler 和 WebSocket 的映射。以下是一个简单的配置示例:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket






请到「今天看啥」查看全文