User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Web   ›   WebSocket   ›   正文
WebSocket

Websocket通讯(底层的协议)

2020-12-22 00:20:15
1154  0 0
参考目录 隐藏
1) spring 中需要引入的包
2) spring boot中引入的包
3) 配置websocket的方式
4) 使用Java原生+Spring混合
5) 配置websocket
6) javax.websocket 扩展协议配置
7) 转换HttpSession
8) websocket业务接口
9) 方法的实现类
10) websocket-controller层
11) html触发
12) 使用SpringBoot实现websocket
13) @EnableWebSocket配置
14) WebSocketHandler 拦截器
15) HttpSessionHandshakeInterceptor源码
16) WebSocketHandler 消息处理中心
17) WebSocketConfig 配置

阅读完需:约 19 分钟

在spring和spring boot中配置websocket的代码基本一样的,只是pom引入的包不一样,需要注意的是不同的tomcat版本对websocket的支持可能有区别,造成了代码的区别。

spring 中需要引入的包

        <!-- 使用spring websocket依赖的jar包 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
            <version>${spring.version}</version>
        </dependency>   

spring boot中引入的包

        <!-- websocket 支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

配置websocket的方式

  • 配置websocket首先是需要运行的容器支持,这个是前提,我们常用的容器,tomcat,jetty,undertow都支持websocket,spring boot 对内嵌的tomcat(7,8),jetty9,undertow提供了支持,源码在spring-websocket-4.3.6.RELEASE.jar包中。
  • websocket是通过一个socket来实现双向异步通讯的,websocket属于(sockJs:websocket协议的模拟,用作浏览器使用,增加了当浏览器不支持websocket的时候的兼容性支持,也属于底层协议)。
  • 底层协议配置有两种方式,使用javax.websocket包中的配置,属于JavaEE 7中出了JSR-356:Java API for WebSocket规范。还有一种是使用spring websocket api中提供的底层协议使用@EnableWebSocket注解,实现org.springframework.web.socket.config.annotation.WebSocketConfigurer;。
  • 使用底层协议比较繁琐,需要自己写大量的代码进行支持,不过更加灵活,当然我们也可以websocket的子协议STOMP来,它是一个更高级的协议,STOMP是基于帧(frame)格式来定义消息,与http的request和response类似(具有类似@RequestMapping的@MessageMapping),使用@EnableWebSocketMessageBroker 源码也在org.springframework.web.socket下。

使用Java原生+Spring混合

配置websocket

一个公用的websocket类,存一些连接用到的基本信息,可以根据业务添加删除属性

package com.wzh.demo.domain;

import javax.websocket.Session;

/**
 * <websocket信息对象>
 * <用于存储secket连接信息>
 * @author wzh
 * @version 2018-07-08 18:49
 * @see [相关类/方法] (可选)
 **/
public class WebSocketBean {

    /**
     * 连接session对象
     */
    private Session session;

    /**
     * 连接错误次数
     */
    private AtomicInteger erroerLinkCount = new AtomicInteger(0);

    public int getErroerLinkCount() {
        // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
        return erroerLinkCount.getAndIncrement();
    }

    public void cleanErrorNum()
    {
        // 清空计数
        erroerLinkCount = new AtomicInteger(0);
    }

    //...... 省略get set toSting方法
}

javax.websocket 扩展协议配置

基于Spring搭建,一个公用的websocket配置,使用@ServerEndpoint创立websocket endpoint

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
 
}

转换HttpSession

因为websocket的session和我们常用的httpsession不一样,所以我们要转换一下,部分场景会用到httpsession

package com.wzh.config.utils;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;

/**
 * <websocket获取HttpSession>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-10 01:02
 * @see [相关类/方法] (可选)
 **/
public class GetHttpSessionConfigurator extends Configurator{

    @Override
    public void modifyHandshake(ServerEndpointConfig sec,HandshakeRequest request, HandshakeResponse response) {

        HttpSession httpSession=(HttpSession) request.getHttpSession();
        sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
    }
}

websocket业务接口

websocket业务接口,抽一些共用的方法出来

package com.wzh.demo.websocket.service;

import javax.websocket.EndpointConfig;
import javax.websocket.Session;

/**
 * <基于javax websocket通讯>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-08 17:11
 * @see [相关类/方法] (可选)
 **/
public interface WebSocketServer {

    /**
     * 连接建立成功调用的方法
     * @param session session 对象
     */
    public void onOpen(Session session,EndpointConfig config);

    /**
     * 断开连接方法
     */
    public void onClose(Session session);

    /**
     * 收到客户端消息后调用的方法
     * @param session session 对象
     * @param message 返回客户端的消息
     */
    public void onMessage(Session session, String message);

    /**
     * 发生异常时触发的方法
     * @param session session 对象
     * @param throwable 抛出的异常
     */
    public void onError(Session session,Throwable throwable);

    /**
     * 向单个客户端发送消息
     * @param session session 对象
     * @param message 发送给客户端的消息
     */
    public void sendMessage(Session session, String message);

    /**
     * 向所有在线用户群发消息
     * @param message 发送给客户端的消息
     */
    public void batchSendMessage(String message);
}

方法的实现类

package com.wzh.demo.websocket.service.impl;

import com.wzh.config.utils.GetHttpSessionConfigurator;
import com.wzh.demo.domain.WebSocketBean;
import com.wzh.demo.websocket.service.WebSocketServer;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

/**
 * <基于javax websocket通讯>
 * <各个方法的参数都是可以根据项目的实际情况改的>
 * @author wzh
 * @version 2018-07-08 17:11
 * @see [相关类/方法] (可选)
 **/
@ServerEndpoint(value = "/javax/websocket",configurator=GetHttpSessionConfigurator.class)
@Component("webSocketService")
public class WebSocketServiceImpl implements WebSocketServer{

    private Logger log = Logger.getLogger(WebSocketServiceImpl.class);

    /**
     * 错误最大重试次数
     */
    private static final int MAX_ERROR_NUM = 10;

    /**
     * 用来存放每个客户端对应的webSocket对象。
     */
    private static Map<String,WebSocketBean> webSocketInfo;

    static
    {
        // concurrent包的线程安全map
        webSocketInfo = new ConcurrentHashMap<String, WebSocketBean>();
    }

    @OnOpen
    @Override
    public void onOpen(Session session,EndpointConfig config) {

        // 如果是session没有激活的情况,就是没有请求获取或session,这里可能会取出空,需要实际业务处理
        HttpSession httpSession= (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
        if(httpSession != null)
        {
            log.info("获取到httpsession" + httpSession.getId());
        }else {
            log.error("未获取到httpsession");
        }

        // 连接成功当前对象放入websocket对象集合
        WebSocketBean bean = new WebSocketBean();
        bean.setSession(session);
        webSocketInfo.put(session.getId(),bean);

        log.info("客户端连接服务器session id :"+session.getId()+",当前连接数:" + webSocketInfo.size());
    }

    @OnClose
    @Override
    public void onClose(Session session) {

        // 客户端断开连接移除websocket对象
        webSocketInfo.remove(session.getId());
        log.info("客户端断开连接,当前连接数:" + webSocketInfo.size());

    }

    @OnMessage
    @Override
    public void onMessage(Session session, String message) {

        log.info("客户端 session id: "+session.getId()+",消息:" + message);

        // 此方法为客户端给服务器发送消息后进行的处理,可以根据业务自己处理,这里返回页面
        sendMessage(session, "服务端返回" + message);

    }

    @OnError
    @Override
    public void onError(Session session, Throwable throwable) {

        log.error("发生错误"+ throwable.getMessage(),throwable);
    }

    @Override
    public void sendMessage(Session session, String message) {

        try
        {
            // 发送消息
            session.getBasicRemote().sendText(message);

            // 清空错误计数
            webSocketInfo.get(session.getId()).cleanErrorNum();
        }
        catch (Exception e)
        {
            log.error("发送消息失败"+ e.getMessage(),e);
            int errorNum = webSocketInfo.get(session.getId()).getErroerLinkCount();

            // 小于最大重试次数重发
            if(errorNum <= MAX_ERROR_NUM)
            {
                sendMessage(session, message);
            }
            else{
                log.error("发送消息失败超过最大次数");
                // 清空错误计数
                webSocketInfo.get(session.getId()).cleanErrorNum();
            }
        }
    }

    @Override
    public void batchSendMessage(String message) {
        Set<Map.Entry<String, WebSocketBean>> set = webSocketInfo.entrySet();
        for (Map.Entry<String, WebSocketBean> map : set)
        {
            sendMessage(map.getValue().getSession(),message);
        }
    }
}

websocket-controller层

触发websocket通讯推送的方式很多,这里做个最简单的按钮,写个简单的controller和一个html

package com.wzh.demo.controller;



import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import javax.servlet.http.HttpSession;


/**
 * <websocket测试用MVC控制器>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-09 22:53
 * @see [相关类/方法] (可选)
 **/
@Controller
@RequestMapping("/websocket")
public class WebSocketController {

    @RequestMapping(value = "socket.do",method = RequestMethod.GET)
    public String toWebSocket(HttpSession session, Model model)
    {
        model.addAttribute("address","/javax/websocket");
        return "/test/webSocket";
    }
}

html触发

html,主要是socketjs触发

<#import "spring.ftl" as spring />
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
        "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
    <title>Title</title>
    <script src="${request.contextPath}/js/jquery-3.3.1.min.js"></script>
    <script type="text/javascript">
        $(document).ready(function () {
            // 页面加websocket
            websocketClient();

        });
        var sock;
        function websocketClient() {

            var url = 'ws://' + window.location.host +"${request.contextPath + address!}";
            sock = new WebSocket(url);

            // 打开连接,打开连接后的回调
            sock.onopen = function () {
                console.log("连接打开");

            };

            // 客户端发送消息给服务器,回调
            sock.onmessage = function(data)
            {
                console.info(data.data);
            };

            sock.onclose = function() {
                console.info("close");
            };

        }
        // 测试客户端与服务器通讯
        function sendMessage() {
            sock.send("test");
        };



    </script>
</head>
<body>

<input type="button" value="按钮" onclick="sendMessage();">
</body>
</html>

例子是通过JAVA的扩展JAR实现的,既然是SpringBoot项目,可定也能用框架提供的方法进行websocket通讯。

使用SpringBoot实现websocket

@EnableWebSocket配置

在 spring 中 使用较低层级的 API 来处理消息,可以通过以下几个步骤

  • 一个HandshakeInterceptor拦截器,实现org.springframework.web.socket.server.HandshakeInterceptor,在次拦截器中可以做以下握手前后的处理,此步骤可以省略,此拦截器可以在springMVC中的websocket配置类中注册使用,做一下前置或者后置操作
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new WebSocketHander(),"/xxxx").addInterceptors(new HandshakeInterceptor());
    }
}
  • WebSocketHandler 一个消息处理中心,用于处理websocket的通讯具体服务,可以继承AbstractWebSocketHandler,也可以实现WebSocketHandler
public interface WebSocketHandler {
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
void handleTransportError(WebSocketSession session,
 Throwable exception) throws Exception; 
void afterConnectionClosed(WebSocketSession session,
 CloseStatus closeStatus) throws Exception; 
boolean supportsPartialMessages();
}
public class ChatTextHandler extends AbstractWebSocketHandler {
 
 @Override
 protected void handleTextMessage(WebSocketSession session,
   TextMessage message) throws Exception {
  session.sendMessage(new TextMessage("xxxx"));
 }
}
  • 一个SpringMVC的配置,其中registerWebSocketHandlers注册消息处理器,此方法可以完成websocket路径的注册,消息处理器的注册,拦截器的注册
@Configuration
@EnableWebSocket//开启websocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // WebSocketHander 为消息处理器,HandshakeInterceptor为拦截器
        registry.addHandler(new WebSocketHander(),"/xxx").addInterceptors(new HandshakeInterceptor());
    }
}

下面我们来开始正式配置基于Spring底层API的websocket通讯

WebSocketHandler 拦截器

在这一步可以做一些初始化操作,例如获取httpSession,此步骤不是开启websocket的必要步骤,根据自身的业务逻辑决定是否添加拦截器。拦截器我们可以直接使用HttpSessionHandshakeInterceptor这个Spring提供的拦截器,也可以实现HandshakeInterceptor 这个接口进行自定义。
拦截器HttpSessionHandshakeInterceptor将HttpSession中的值保存到了一个Map里面,在后期的WebSocketHandler消息处理类中可以获取存入httpsession中的信息,通过WebSocketSession的getAttributes()下提供get方法获取。

HttpSessionHandshakeInterceptor源码

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.web.socket.server.support;

import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
import javax.servlet.http.HttpSession;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

public class HttpSessionHandshakeInterceptor implements HandshakeInterceptor {
    public static final String HTTP_SESSION_ID_ATTR_NAME = "HTTP.SESSION.ID";
    private final Collection<String> attributeNames;
    private boolean copyAllAttributes;
    private boolean copyHttpSessionId = true;
    private boolean createSession;

    public HttpSessionHandshakeInterceptor() {
        this.attributeNames = Collections.emptyList();
        this.copyAllAttributes = true;
    }

    public HttpSessionHandshakeInterceptor(Collection<String> attributeNames) {
        this.attributeNames = Collections.unmodifiableCollection(attributeNames);
        this.copyAllAttributes = false;
    }

    public Collection<String> getAttributeNames() {
        return this.attributeNames;
    }

    public void setCopyAllAttributes(boolean copyAllAttributes) {
        this.copyAllAttributes = copyAllAttributes;
    }

    public boolean isCopyAllAttributes() {
        return this.copyAllAttributes;
    }

    public void setCopyHttpSessionId(boolean copyHttpSessionId) {
        this.copyHttpSessionId = copyHttpSessionId;
    }

    public boolean isCopyHttpSessionId() {
        return this.copyHttpSessionId;
    }

    public void setCreateSession(boolean createSession) {
        this.createSession = createSession;
    }

    public boolean isCreateSession() {
        return this.createSession;
    }

    // 在握手完成前(连接建立阶段)
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        HttpSession session = this.getSession(request);
        if (session != null) {
            if (this.isCopyHttpSessionId()) {
                // 保存 sessionid
                attributes.put("HTTP.SESSION.ID", session.getId());
            }

            Enumeration names = session.getAttributeNames();

            while(true) {
                String name;
                do {
                    if (!names.hasMoreElements()) {
                        return true;
                    }

                    name = (String)names.nextElement();
                } while(!this.isCopyAllAttributes() && !this.getAttributeNames().contains(name));
                // 保存HttpSession中的信息
                attributes.put(name, session.getAttribute(name));
            }
        } else {
            return true;
        }
    }

    // 获取HttpSession
    private HttpSession getSession(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest)request;
            return serverRequest.getServletRequest().getSession(this.isCreateSession());
        } else {
            return null;
        }
    }

    // 完成握手后业务
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
    }
}

spring 框架提供的拦截器在org.springframework.web.socket.server.support下,如果不能满足业务需求,我们也可以直接去实现接口

实现HandshakeInterceptor接口,这里因为是操作httpsession,就演示继承 HttpSessionHandshakeInterceptor 并重写beforeHandshake 方法

package com.wzh.demo.websocket.interceptor;

import org.apache.log4j.Logger;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import javax.servlet.http.HttpSession;
import java.util.Map;

/**
 * <websocket通讯拦截器>
 * <建立websocket连接前后的业务处理>
 * @author wzh
 * @version 2018-07-21 20:05
 * @see [相关类/方法] (可选)
 */
public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor
{
    
    private Logger log = Logger.getLogger(WebSocketHandshakeInterceptor.class);

    @Override
    public boolean beforeHandshake(ServerHttpRequest request,
        ServerHttpResponse response,
        WebSocketHandler webSocketHandler, Map<String, Object> map)
        throws Exception
    {
        // websocket握手建立前调用,获取httpsession
        if(request instanceof ServletServerHttpRequest)
        {
            ServletServerHttpRequest servletRequset = (ServletServerHttpRequest) request;

            // 这里从request中获取session,获取不到不创建,可以根据业务处理此段
            HttpSession httpSession = servletRequset.getServletRequest().getSession(false);
            if (httpSession != null)
            {
                // 这里打印一下session id 方便等下对比和springMVC获取到httpsession是不是同一个
                log.info("httpSession key:" + httpSession.getId());

                // 获取到httpsession后,可以根据自身业务,操作其中的信息,这里只是单纯的和websocket进行关联
                map.put("HTTP_SESSION",httpSession);

            }
            else
            {
                log.warn("httpSession is null");
            }
        }

        // 调用父类方法
        return super.beforeHandshake(request,response,webSocketHandler,map);
    }
    
    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest,
        ServerHttpResponse serverHttpResponse,
        WebSocketHandler webSocketHandler, Exception e)
    {
        // websocket握手建立后调用
        log.info("websocket连接握手成功");
    }
}

WebSocketHandler 消息处理中心

建立一个websocket消息处理中心,我们可以编写一个类实现WebSocketHandler接口,此接口提供5个方法,用于处理websocket的消息。

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.web.socket;

public interface WebSocketHandler {
    // 在WebSocket协商成功并且WebSocket连接打开并准备好使用后调用。
    void afterConnectionEstablished(WebSocketSession var1) throws Exception;
    
    // 在新的WebSocket消息到达时调用,也就是接受客户端信息并发发送
    void handleMessage(WebSocketSession var1, WebSocketMessage<?> var2) throws Exception;
    
    // 处理底层WebSocket消息传输中的错误,连接出现异常时触发
    void handleTransportError(WebSocketSession var1, Throwable var2) throws Exception;

    // 在任何一方关闭WebSocket连接之后或在发生传输错误之后调用。
    void afterConnectionClosed(WebSocketSession var1, CloseStatus var2) throws Exception;
    
    // WebSocketHandler是否处理部分消息,API文档描述说是拆分消息,多次处理,没有实际使用过
    boolean supportsPartialMessages();
}

官网:

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/socket/WebSocketHandler.html

除了实现接口外,我们也可以继承Spring已经给我提供的实现类,简化操作,因为有的时候我们只需要此接口中的一个或几个方法,并不需要全部关注,spring提供的handler都在org.springframework.web.socket.handler这个路径下

这里我们继承一个抽象类AbstractWebSocketHandler,重写我们关注的方法,并扩展我们自己的业务方法

一个公用的websocket类,存一些连接用到的基本信息,可以根据业务添加删除属性

package com.wzh.demo.domain;

import org.springframework.web.socket.WebSocketSession;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * <websocket信息对象>
 * <用于存储secket连接信息>
 * @author wzh
 * @version 2018-07-29 18:24
 * @see [相关类/方法] (可选)
 **/
public class WebSocketBeanSpring
{
    
    private WebSocketSession session;
    
    /**
     * 连接错误次数
     */
    private AtomicInteger erroerLinkCount = new AtomicInteger(0);
    
    public int getErroerLinkCount()
    {
        // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
        return erroerLinkCount.getAndIncrement();
    }
    
    public void cleanErrorNum()
    {
        // 清空计数
        erroerLinkCount = new AtomicInteger(0);
    }
    
   // 省略get set 方法
}

一个简单的消息处理中心,继承AbstractWebSocketHandler

package com.wzh.demo.websocket.handler;

import com.wzh.demo.domain.WebSocketBeanSpring;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <消息处理中心>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-24 23:11
 * @see [相关类/方法] (可选)
 **/
@Component("webSocketHander")
public class WebSocketHander extends AbstractWebSocketHandler{

    private Logger log = Logger.getLogger(WebSocketHander.class);

    /**
     * 用来存放每个客户端对应的webSocket对象。
     */
    private static Map<String,WebSocketBeanSpring> webSocketInfo;

    static
    {
        // concurrent包的线程安全map
        webSocketInfo = new ConcurrentHashMap<String, WebSocketBeanSpring>();
    }

    // 服务器与客户端初次websocket连接成功执行
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {

        log.debug("websocket 连接成功......");

        // 连接成功当前对象放入websocket对象集合
        WebSocketBeanSpring bean = new WebSocketBeanSpring();
        bean.setSession(session);

        webSocketInfo.put(session.getId(),bean);

        log.info("客户端连接服务器session id :"+session.getId()+",当前连接数:" + webSocketInfo.size());

    }

    // 接受消息处理消息
    @Override
    public void handleMessage(WebSocketSession webSocketSession,
        WebSocketMessage<?> webSocketMessage)
        throws Exception
    {
        /*
        获取客户端发送的消息,这里使用文件消息,也就是字符串进行接收
        消息可以通过字符串,或者字节流进行接收
        TextMessage String/byte[]接收均可以
        BinaryMessage byte[]接收
        */
        log.info("客户端发送消息" + webSocketMessage.getPayload().toString());
        TextMessage message = new TextMessage(webSocketMessage.getPayload().toString());
        /*
        这里直接是字符串,做群发,如果要指定发送,可以在前台平均ID,后台拆分后获取需要发送的人。
        也可以做一个单独的controller,前台把ID传递过来,调用方法发送,在登录的时候把所有好友的标识符传递到前台,
        然后通过标识符发送私信消息
        */
        this.batchSendMessage(message);

    }

    // 连接错误时触发
    @Override
    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
        if(webSocketSession.isOpen()){
            webSocketSession.close();
        }

        log.debug("链接出错,关闭链接......");
        webSocketInfo.remove(webSocketSession.getId());
    }

    // 关闭websocket时触发
    @Override
    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {

        log.debug("链接关闭......" + closeStatus.toString());
        webSocketInfo.remove(webSocketSession.getId());
    }

    /**
     * 给所有在线用户发送消息(这里用的文本消息)
     * @param message
     */
    public void batchSendMessage(TextMessage message)
    {
        
        Set<Map.Entry<String, WebSocketBeanSpring>> setInfo =
            webSocketInfo.entrySet();
        for (Map.Entry<String, WebSocketBeanSpring> entry : setInfo)
        {
            WebSocketBeanSpring bean = entry.getValue();
            try
            {
                bean.getSession().sendMessage(message);
            }
            catch (IOException e)
            {
                log.error(e.getMessage(),e);
            }
        }
    }

    /**
     * 给指定用户发送消息
     * @param userId
     * @param message
     */
    public void sendMessage(String userId, TextMessage message)
    {
        WebSocketBeanSpring bean = webSocketInfo.get(userId);
        try
        {
            bean.getSession().sendMessage(message);
        }
        catch (IOException e)
        {
            log.error(e.getMessage(), e);
        }
    }

}

WebSocketConfig 配置

此步骤是在SpringMVC中注册消息处理中心,因为基于SpringBoot搭建,这里使用@Configuration注解配置,当然也可以xml配置,这个根据自身项目风格进行配置,这里我们实现WebSocketConfigurer接口

package com.wzh.demo.websocket.config;

import com.wzh.demo.websocket.handler.WebSocketHander;
import com.wzh.demo.websocket.interceptor.WebSocketHandshakeInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * <websocket配置类>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-05 22:59
 * @see [相关类/方法] (可选)
 **/
@Configuration //标记为spring 配置类
@EnableWebSocket //开启websocket支持
public class WebSocketConfig implements WebSocketConfigurer{

    // 注册消息处理器,并映射连接地址
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry)
    {
        // 注册消息处理器,并添加自定义拦截器,支持websocket的连接访问
        registry.addHandler(new WebSocketHander(), "/spring/websocket")
            .addInterceptors(new WebSocketHandshakeInterceptor());

        /*
        注册消息处理器,并添加自定义拦截器,添加不支持websocket的连接访问
        SockJs是一个WebSocket的通信js库,Spring对这个js库进行了后台的自动支持,
        也就是说,如果使用SockJs,那么我们就不需要对后台进行更多的配置,只需要加上withSockJS()这一句就可以了
         */
        registry.addHandler(new WebSocketHander(), "/spring/sockjs/websocket")
                .addInterceptors(new WebSocketHandshakeInterceptor()).withSockJS();
    }
}

代码都准备好了,下面进行测试,这里测试就做两个简单的方法,一个通过消息处理中心公告,一个通过controller进行私信

一个简单的controller 用于跳转到html页面

// 跳转websocket界面
    @RequestMapping(value = "/spring/socket.do",method = RequestMethod.GET)
    public String toSpringWebSocket(HttpSession session, Model model)
    {
        model.addAttribute("address","/spring/websocket");
        System.out.println("进入websocket");
        return "/test/springWebSocket";
    }

html页面的调用可以参考最上面的html调用。

写个测试私信的controller,这里为了简单就不写页面了,直接get请求访问控制器,携带websession id

package com.wzh.demo.controller;

import com.wzh.demo.websocket.handler.WebSocketHander;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.socket.TextMessage;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;


/**
 * <websocket测试用MVC控制器>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-09 22:53
 * @see [相关类/方法] (可选)
 **/
@Controller
@RequestMapping("/websocket")
public class WebSocketController {

    @Resource
    private WebSocketHander webSocketHander;

    // 测试私信发送
    @RequestMapping(value = "/spring/socketById.do",method = RequestMethod.GET)
    public void toSpringWebSocketByid(HttpSession session, HttpServletRequest request, Model model)
    {
        String id = request.getParameter("id");
        webSocketHander.sendMessage(id,new TextMessage("测试指定人员发送"));

    }
}

浏览器直接访问http://localhost:8080/SpringBootDemo/websocket/spring/socketById.do?id=1

相关博客

https://www.cnblogs.com/guoapeng/p/17020317.html#3-%E4%BD%BF%E7%94%A8java%E5%8E%9F%E7%94%9Fspringboot%E6%B7%B7%E5%90%88

Netty版本

https://github.com/YeautyYE/netty-websocket-spring-boot-starter/blob/master/README_zh.md

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

0 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 297点赞 612186浏览

随机文章
SpringMVC—PathPattern路径模式
3年前
SpringBoot—想用xml文件
5年前
SpringBoot—自动配置原理
5年前
Java—IO读写文件的方式
5年前
Java—ASM字节码编程
3年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1954 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 612186 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付