Skip to content
0

知识 - WebSocket

前言

WebSocket 有两种实现:

  • 实现 JDK 原生的方式,Spring Boot 支持
  • 使用 Spring 自己封装的方式实现

实现 WebSocket 实现步骤:

  1. 实现 HandshakeInterceptor 接口,实现 websocket 握手拦截,该接口提供两个方法,一个是握手成功的前置方法,一个是握手成功的后置方法。在前置方法,获取了用户信息,存入 session 域
  2. 继承 AbstractWebSocketHandler 类,重写 websocket 建立连接,接收消息,关闭连接等方法。其中在建立连接方法中,将 session 存入 WebSocketSessionManager 缓存里,key 是用户信息,value 为 session,方便后续使用
  3. 提供 WebSocketHelper 工具类,调用方法即可发生消息。发送消息时,会从 WebSocketSessionManager 缓存里通过用户信息获取 session,返回利用 session 发送消息

除了实现 WebSocket 之外,模块还内置 Redis 发布订阅模式。发送 Websocket 消息时,同时往 Redis 发布,其他服务可以订阅来获取消息。

实现

依赖

xml
<dependencies>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.32</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>
</dependencies>

常量

java
public interface WebSocketConstant {
    /**
     * websocketSession中的参数的key
     */
    String LOGIN_USER_KEY = "loginUser";

    /**
     * 订阅的频道
     */
    String WEB_SOCKET_TOPIC = "work:websocket";

}

配置项

适配 application 的配置项

java
@Data
@ConfigurationProperties("websocket")
public class WebSocketProperties {

    /**
     * 是否启用
     */
    private Boolean enabled;

    /**
     * 路径
     */
    private String path = "/websocket";

    /**
     *  设置访问源地址
     */
    private String allowedOrigins = "*";
}

消息上下文

Websocket 消息上下文

java
@Data
public class WebSocketMessageContext implements Serializable {
    @Serial
    private static final long serialVersionUID = 1L;

    /**
     * 需要推送到的 session key 列表
     */
    private List<String> sessionKeys;

    /**
     * 需要发送的消息
     */
    private String message;
}

缓存

WebSocketSession 用于保存当前所有在线的会话信息

java
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionManager {

    private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();

    /**
     * 将 WebSocket 会话添加到用户会话 Map 中
     *
     * @param sessionKey 会话键,用于检索会话
     * @param session    要添加的 WebSocket 会话
     */
    public static void addSession(String sessionKey, WebSocketSession session) {
        USER_SESSION_MAP.put(sessionKey, session);
    }

    /**
     * 根据会话键从用户会话 Map 中获取 WebSocket 会话
     *
     * @param sessionKey 要获取的会话键
     * @return 与给定会话键对应的 WebSocket 会话,如果不存在则返回 null
     */
    public static WebSocketSession getSessions(String sessionKey) {
        return USER_SESSION_MAP.get(sessionKey);
    }

    /**
     * 获取存储在用户会话 Map 中所有 WebSocket 会话的会话键集合
     *
     * @return 所有 WebSocket 会话的会话键集合
     */
    public static Set<String> getSessionsAll() {
        return USER_SESSION_MAP.keySet();
    }

    /**
     * 从用户会话 Map 中移除指定会话键对应的 WebSocket 会话
     *
     * @param sessionKey 要移除的会话键
     */
    public static void removeSession(String sessionKey) {
        if (existSession(sessionKey)) {
            USER_SESSION_MAP.remove(sessionKey);
        }
    }

    /**
     * 检查给定的会话键是否存在于用户会话 Map 中
     *
     * @param sessionKey 要检查的会话键
     * @return 如果存在对应的会话键,则返回 true;否则返回 false
     */
    public static boolean existSession(String sessionKey) {
        return USER_SESSION_MAP.containsKey(sessionKey);
    }
}

拦截器

java
public class WebSocketInterceptor implements HandshakeInterceptor {

    /**
     * WebSocket 握手之前执行的前置处理方法
     *
     * @param request    WebSocket握手请求
     * @param response   WebSocket握手响应
     * @param wsHandler  WebSocket处理程序
     * @param attributes 与WebSocket会话关联的属性
     * @return 如果允许握手继续进行,则返回true;否则返回false
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        // 用户信息
        LoginUser loginUser = LoginHelper.getLoginUser();
        attributes.put(WebSocketConstant.LOGIN_USER_KEY, loginUser);
        return true;
    }

    /**
     * WebSocket 握手成功后执行的后置处理方法
     *
     * @param request   WebSocket握手请求
     * @param response  WebSocket握手响应
     * @param wsHandler WebSocket处理程序
     * @param exception 握手过程中可能出现的异常
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        // 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作
    }
}

生命周期实现

WebSocketHandler 实现类,也就是实现 WebSocket 的生命周期

java
@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {

    /**
     * 连接成功后事件
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
        WebSocketSessionManager.addSession(loginUser.getUserId(), session);
        log.info("[connect] sessionId: {}, userId:{}, username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
    }

    /**
     * 处理接收到的文本消息事件
     *
     * @param session WebSocket 会话
     * @param message 接收到的文本消息
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        // 从 WebSocket 会话中获取登录用户信息
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);

        // 创建 WebSocket 消息上下文对象
        WebSocketMessageContext webSocketMessageContext = new WebSocketMessageContext();
        webSocketMessageContext.setSessionKeys(List.of(loginUser.getUserId()));
        webSocketMessageContext.setMessage(message.getPayload());
        WebSocketHelper.publishMessage(webSocketMessageContext);
    }

    /**
     * 处理接收到的二进制消息事件
     *
     * @param session WebSocket 会话
     * @param message 接收到的二进制消息
     */
    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        super.handleBinaryMessage(session, message);
    }

    /**
     * 处理接收到的 Pong 消息(心跳监测)
     *
     * @param session WebSocket 会话
     * @param message 接收到的 Pong 消息
     */
    @Override
    protected void handlePongMessage(WebSocketSession session, PongMessage message) {
        WebSocketHelper.sendPongMessage(session);
    }

    /**
     * 处理 WebSocket 传输错误
     *
     * @param session   WebSocket会话
     * @param exception 发生的异常
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
    }

    /**
     * 在 WebSocket 连接关闭后执行清理操作
     *
     * @param session WebSocket会话
     * @param status  关闭状态信息
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
        if (Objects.isNull(loginUser)) {
            return;
        }
        WebSocketSessionManager.removeSession(loginUser.getUserId());
        log.info("[disconnect] sessionId: {},userId:{},username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
    }

    /**
     * 指示处理程序是否支持接收部分消息
     *
     * @return 如果支持接收部分消息,则返回 true;否则返回 false
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}

工具类

WebSocket 工具类

java
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketHelper {

    /**
     * 向指定的 WebSocket 会话发送消息
     *
     * @param sessionKey 要发送消息的用户 id
     * @param message    要发送的消息内容
     */
    public static void sendMessage(String sessionKey, String message) {
        WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
        sendMessage(session, message);
    }

    /**
     * 订阅 WebSocket 消息主题,并提供一个消费者函数来处理接收到的消息
     *
     * @param listener 处理 WebSocket 消息的消费者
     */
    public static void subscribeMessage(MessageListener listener) {
        RedisUtil.subscribe(WebSocketConstant.WEB_SOCKET_TOPIC, listener);
    }

    /**
     * 发布 WebSocket 订阅消息
     *
     * @param webSocketMessage 要发布的 WebSocket 消息对象
     */
    public static void publishMessage(WebSocketMessageContext webSocketMessage) {
        List<String> unsentSessionKeys = new ArrayList<>();
        // 当前服务内 session,直接发送消息
        if (Objects.isNull(webSocketMessage.getSessionKeys())) {
            return;
        }
        for (String sessionKey : webSocketMessage.getSessionKeys()) {
            if (WebSocketSessionManager.existSession(sessionKey)) {
                WebSocketHelper.sendMessage(sessionKey, webSocketMessage.getMessage());
                // 发生后则移除,防止添加到 unsentSessionKeys,因为下面 unsentSessionKeys 会通过 Redis 发布,监听器订阅到消息后,获取消息发送给 unsentSessionKeys 的 sessionKey
                continue;
            }
            unsentSessionKeys.add(sessionKey);
        }
        // 不在当前服务内 session,发布订阅消息
        if (ListUtil.isNotEmpty(unsentSessionKeys)) {
            WebSocketMessageContext broadcastMessage = new WebSocketMessageContext();
            broadcastMessage.setMessage(webSocketMessage.getMessage());
            broadcastMessage.setSessionKeys(unsentSessionKeys);
            RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, broadcastMessage);
            log.info(" WebSocket 发送主题订阅消息 topic:{},session keys:{},message:{}", WebSocketConstant.WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
        }
    }

    /**
     * 向所有的 WebSocket 会话发布订阅的消息(群发)
     *
     * @param message 要发布的消息内容
     */
    public static void publishAll(String message) {
        WebSocketMessageContext broadcastMessage = new WebSocketMessageContext();
        broadcastMessage.setMessage(message);
        RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, broadcastMessage);
        log.info(" WebSocket 发送主题订阅消息 topic:{},message:{}", WebSocketConstant.WEB_SOCKET_TOPIC, message);
    }

    /**
     * 向指定的 WebSocket 会话发送 Pong 消息
     *
     * @param session 要发送 Pong 消息的 WebSocket 会话
     */
    public static void sendPongMessage(WebSocketSession session) {
        sendMessage(session, new PongMessage());
    }

    /**
     * 向指定的 WebSocket 会话发送文本消息
     *
     * @param session WebSocket 会话
     * @param message 要发送的文本消息内容
     */
    public static void sendMessage(WebSocketSession session, String message) {
        sendMessage(session, new TextMessage(message));
    }

    /**
     * 向指定的 WebSocket 会话发送 WebSocket 消息对象
     *
     * @param session WebSocket 会话
     * @param message 要发送的 WebSocket 消息对象
     */
    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
        if (Objects.isNull(session) || !session.isOpen()) {
            log.warn("[send] session 会话已经关闭");
        } else {
            try {
                session.sendMessage(message);
            } catch (IOException e) {
                log.error("[send] session:{},发送消息:{} 异常", session, message, e);
            }
        }
    }
}

主题订阅监听器

WebSocket Redis 主题订阅监听器

java
@Slf4j
@RequiredArgsConstructor
public class WebSocketTopicListener implements MessageListener {

    private final RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        if (Objects.equals(channel, WebSocketConstant.WEB_SOCKET_TOPIC)) {
            // 反序列化消息体为 WebSocketMessageContext 对象
            WebSocketMessageContext context = (WebSocketMessageContext) redisTemplate.getValueSerializer().deserialize(message.getBody());

            if (Objects.isNull(context)) {
                log.info("WebSocket 主题订阅收到消息为空");
                return;
            }

            log.info("WebSocket 主题订阅收到消息 channel:{},Session Keys:{},message:{}", channel, context.getSessionKeys(), context.getMessage());

            // 如果 key 不为空就按照 key 发消息,如果为空就群发
            if (ListUtil.isNotEmpty(context.getSessionKeys())) {
                context.getSessionKeys().forEach(key -> {
                    if (WebSocketSessionManager.existSession(key)) {
                        WebSocketHelper.sendMessage(key, context.getMessage());
                    }
                });
            } else {
                WebSocketSessionManager.getSessionsAll().forEach(key -> {
                    WebSocketHelper.sendMessage(key, context.getMessage());
                });
            }
        }
    }
}

容器装配

java
@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfiguration {

    @Bean
    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
        // 返回一个 WebSocketConfigurer 对象,用于配置 WebSocket
        return registry -> registry
                // 添加 WebSocket 处理程序和拦截器到指定路径,设置允许的跨域来源
                .addHandler(webSocketHandler, webSocketProperties.getPath())
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins(webSocketProperties.getAllowedOrigins());
    }

    @Bean
    public HandshakeInterceptor handshakeInterceptor() {
        return new WebSocketInterceptor();
    }

    @Bean
    public WebSocketHandler webSocketHandler() {
        return new WebSocketHandler();
    }

    @Bean
    public WebSocketTopicListener webSocketTopicListener(RedisTemplate<String, Object> redisTemplate) {
        return new WebSocketTopicListener(redisTemplate);
    }
}

Spring Boot 3.x 需要在 resource 下建立 META-INF/spring 路径,然后创建 org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,内容为

java
cn.youngkbt.websocket.config.WebSocketConfiguration

这样 Spring 会自动扫描该文件的容器装配类,将里面涉及的类注入到 Spring 容器。

示例

java
@RestController
@RequestMapping("/demo/websocket")
public class DemoWebSocketController {

    /**
     * 发布消息
     *
     */
    @GetMapping("/send")
    public Response<String> send(WebSocketMessageContext webSocketMessageContext) {
        WebSocketHelper.publishMessage(webSocketMessageContext);
        return HttpResult.ok("操作成功");
    }
}
最近更新