User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Web   ›   WebSocket   ›   正文
WebSocket

Websocket通讯(上层的STOMP)

2020-12-22 00:51:24
1009  0 0
参考目录 隐藏
1) 服务端准备工作
2) AbstractWebSocketMessageBrokerConfigurer 抽象类
3) WebSocketMessageBrokerConfigurer接口
4) WebSocketChannelInterceptor 的实现步骤
5) ChannelInterceptor 方法
6) WebSocketChannelInterceptor 拦截登录时消息头中的信息

阅读完需:约 33 分钟

前面两种建立websocket通讯,不管是用javax的包还是spring的包都是用的比较底层的协议,下面我们来看看用上层的STOMP来建立websocket通讯

Websocket通讯(底层的协议)

SockJs+Spring-WebSocket时,由于SockJs与Spring WebSocket之间采用JSON通讯,需要引入jackson 2的相关jar包

        <!-- jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.6.3</version>
        </dependency>

前面已经提到了STOMP是一个上层协议,STOMP 在 WebSocket 之上提供了一个基于帧的线路格式层,用来定义消息语义。
STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧:

SEND
destination:/app/marco
content-length:20
 
{\"message\":\"hello word!\"}
  1. SEND:STOMP命令,表明会发送一些内容;
  2. destination:头信息,用来表示消息发送到哪里;
  3. content-length:头信息,用来表示 负载内容的 大小;
  4. 空行:
  5. 帧内容(负载)内容

要使用STOMP 通讯,服务端,和客户端都必须支持,服务端的准备步骤

服务端准备工作

  1. 我们已经配置了STOMP通讯的配置类 WebSocketStompConfig
  2. 配置了WebSocketChannelInterceptor 和 WebSocketHandshakeInterceptor 两个自定义拦截器
  3. 一个WebSocketStompController 用于接收客户端消息和响应客户端
  4. 一个简单的MVC controller 用于跳转websocket 页面

在Spring中启用STOMP通讯不用我们自己去写原生态的帧,spring的消息功能是基于代理模式构建,其实说得复杂,都是封装好了的,如果需要开启SOMP,只需要在websocket配置类上使用@EnableWebSocketMessageBroker (注解的作用为能够在 WebSocket 上启用 STOMP),并实现WebSocketMessageBrokerConfigurer接口,有些教程在这一步会继承AbstractWebSocketMessageBrokerConfigurer 类,我们看一下AbstractWebSocketMessageBrokerConfigurer类的源码,可以看到都是空方法,也是实现的接口,这里推荐自己实现接口,因为官方API上AbstractWebSocketMessageBrokerConfigurer已经标记为废弃

AbstractWebSocketMessageBrokerConfigurer 抽象类

public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
    public AbstractWebSocketMessageBrokerConfigurer() {
    }

    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
    }

    public void configureClientInboundChannel(ChannelRegistration registration) {
    }

    public void configureClientOutboundChannel(ChannelRegistration registration) {
    }

    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        return true;
    }

    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
    }

    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
    }

    public void configureMessageBroker(MessageBrokerRegistry registry) {
    }
}

WebSocketMessageBrokerConfigurer接口

这个类是一个重点要熟悉的类

public interface WebSocketMessageBrokerConfigurer {

    // 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
    void registerStompEndpoints(StompEndpointRegistry var1);

    // 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
    void configureWebSocketTransport(WebSocketTransportRegistration var1);

    // 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
    void configureClientInboundChannel(ChannelRegistration var1);
    
    // 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
    void configureClientOutboundChannel(ChannelRegistration var1);

    // 添加自定义的消息转换器,spring 提供多种默认的消息转换器,返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
    boolean configureMessageConverters(List<MessageConverter> var1);

    // 配置消息代理,哪种路径的消息会进行代理处理
    void configureMessageBroker(MessageBrokerRegistry var1);
    
    // 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
    void addArgumentResolvers(List<HandlerMethodArgumentResolver> var1);

    // 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
    void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> var1);
}

在registerStompEndpoints 方法中,我们可以设置websocket服务的地址,同样,我们也可以根据自身业务需求,去添加拦截器,例如前文我们写的WebSocketHandshakeInterceptor拦截器,可以获取到httpsession,同样,当我们把信息存入map 后,都可以通过通过WebSocketSession的getAttributes()下提供get方法获取

    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
     * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     * 
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry)
    {
        
        /*
         * 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
         *    用户连接了这个端点后就可以进行websocket通讯,支持socketJs
         * 2. setAllowedOrigins("*")表示可以跨域
         * 3. withSockJS()表示支持socktJS访问
         * 4. 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
         */
        registry.addEndpoint("/stomp/websocketJS")
                .setAllowedOrigins("*")
                .withSockJS()
                .setInterceptors(new WebSocketHandshakeInterceptor())
        ;

        /*
         * 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
         * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
         * 所以可以添加多个端点
         */
        registry.addEndpoint("/stomp/websocket");
    }

如果我们业务关心,用户的数量,在线数量,连接状况等数据,我们也可以通过ChannelRegistration对象的setInterceptors方法添加监听,这里先展示一个完整的实现类,监听接口在后面会介绍,代码中的WebSocketHandshakeInterceptor 拦截器,是上一个例子已经实现的,用于存储httpsession,WebSocketChannelInterceptor 拦截器 ,在这个拦截器中可以做一些在线人数统计等操作,后面会介绍

package com.wzh.demo.websocket.config;

import com.wzh.demo.websocket.handler.MyPrincipalHandshakeHandler;
import com.wzh.demo.websocket.interceptor.WebSocketChannelInterceptor;
import com.wzh.demo.websocket.interceptor.WebSocketHandshakeInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

import java.util.List;

/**
 * <配置基于STOMP的websocket>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-12 18:38
 * @see [相关类/方法] (可选)
 **/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
     * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     * 
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry)
    {
        
        /*
         * 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
         *    用户连接了这个端点后就可以进行websocket通讯,支持socketJs
         * 2. setAllowedOrigins("*")表示可以跨域
         * 3. withSockJS()表示支持socktJS访问
         * 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
         * 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息
         */
        registry.addEndpoint("/stomp/websocketJS")
                //.setAllowedOrigins("*")
                .addInterceptors(new WebSocketHandshakeInterceptor())
                .setHandshakeHandler(new MyPrincipalHandshakeHandler())
                .withSockJS()

        ;

        /*
         * 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
         * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
         * 所以可以添加多个端点
         */
        registry.addEndpoint("/stomp/websocket");
    }

    /**
     * 配置消息代理
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry)
    {
        /*
         *  enableStompBrokerRelay 配置外部的STOMP服务,需要安装额外的支持 比如rabbitmq或activemq
         * 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
         *    我们就可以在配置的域上向客户端推送消息
         * 3. 可以通过 setRelayHost 配置代理监听的host,默认为localhost
         * 4. 可以通过 setRelayPort 配置代理监听的端口,默认为61613
         * 5. 可以通过 setClientLogin 和 setClientPasscode 配置账号和密码
         * 6. setxxx这种设置方法是可选的,根据业务需要自行配置,也可以使用默认配置
         */
        //registry.enableStompBrokerRelay("/topicTest","/userTest")
                //.setRelayHost("rabbit.someotherserver")
                //.setRelayPort(62623);
                //.setClientLogin("userName")
                //.setClientPasscode("password")
                //;

        // 自定义调度器,用于控制心跳线程
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 线程池线程数,心跳连接开线程
        taskScheduler.setPoolSize(1);
        // 线程名前缀
        taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
        // 初始化
        taskScheduler.initialize();

        /*
         * spring 内置broker对象
         * 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
         *    我们就可以在配置的域上向客户端推送消息
         * 2,进行心跳设置,第一值表示server最小能保证发的心跳间隔毫秒数, 第二个值代码server希望client发的心跳间隔毫秒数
         * 3. 可以配置心跳线程调度器 setHeartbeatValue这个不能单独设置,不然不起作用,要配合setTaskScheduler才可以生效
         *    调度器我们可以自己写一个,也可以自己使用默认的调度器 new DefaultManagedTaskScheduler()
         */
        registry.enableSimpleBroker("/topicTest","/userTest")
                .setHeartbeatValue(new long[]{10000,10000})
                .setTaskScheduler(taskScheduler);
        /*
         *  "/app" 为配置应用服务器的地址前缀,表示所有以/app 开头的客户端消息或请求
         *  都会路由到带有@MessageMapping 注解的方法中
         */
        registry.setApplicationDestinationPrefixes("/app");

        /*
         *  1. 配置一对一消息前缀, 客户端接收一对一消息需要配置的前缀 如“'/user/'+userid + '/message'”,
         *     是客户端订阅一对一消息的地址 stompClient.subscribe js方法调用的地址
         *  2. 使用@SendToUser发送私信的规则不是这个参数设定,在框架内部是用UserDestinationMessageHandler处理,
         *     而不是而不是 AnnotationMethodMessageHandler 或  SimpleBrokerMessageHandler
         *     or StompBrokerRelayMessageHandler,是在@SendToUser的URL前加“user+sessionId"组成
         */
        registry.setUserDestinationPrefix("/user");

        /*
         * 自定义路径分割符
         * 注释掉的这段代码添加的分割符为. 分割是类级别的@messageMapping和方法级别的@messageMapping的路径
         * 例如类注解路径为 “topic”,方法注解路径为“hello”,那么客户端JS stompClient.send 方法调用的路径为“/app/topic.hello”
         * 注释掉此段代码后,类注解路径“/topic”,方法注解路径“/hello”,JS调用的路径为“/app/topic/hello”
         */
        //registry.setPathMatcher(new AntPathMatcher("."));

    }

    /**
     * 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        /*
         * 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节
         * 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节
         * 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒
         */
        registration.setMessageSizeLimit(10240)
                    .setSendBufferSizeLimit(10240)
                    .setSendTimeLimit(10000);
    }

    /**
     * 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        /*
         * 配置消息线程池
         * 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务
         * 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程
         * 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒
         */
        registration.taskExecutor().corePoolSize(10)
                    .maxPoolSize(20)
                    .keepAliveSeconds(60);
        /*
         * 添加stomp自定义拦截器,可以根据业务做一些处理
         * springframework 4.3.12 之后版本此方法废弃,代替方法 interceptors(ChannelInterceptor... interceptors)
         * 消息拦截器,实现ChannelInterceptor接口
         */
        registration.setInterceptors(webSocketChannelInterceptor());
    }

    /**
     *设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     * @param registration
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(10)
                    .maxPoolSize(20)
                    .keepAliveSeconds(60);
        //registration.setInterceptors(new WebSocketChannelInterceptor());
    }

    /**
     * 添加自定义的消息转换器,spring 提供多种默认的消息转换器,
     * 返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
     * @param list
     * @return
     */
    @Override
    public boolean configureMessageConverters(List<MessageConverter> list) {
        return true;
    }

    /**
     * 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
     * @param list
     */
    @Override
    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> list) {

    }

    /**
     * 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
     * @param list
     */
    @Override
    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> list) {

    }

    /**
     * 拦截器加入 spring ioc容器
     * @return
     */
    @Bean
    public WebSocketChannelInterceptor webSocketChannelInterceptor()
    {
        return new WebSocketChannelInterceptor();
    }

}

WebSocketChannelInterceptor 的实现步骤

如果需要添加监听,我们的监听类需要实现ChannelInterceptor接口,在 springframework包5.0.7之前这一步我们一般是实现ChannelInterceptorAdapter 抽象类,不过这个类已经废弃了,文档也推荐直接实现接口。

ChannelInterceptor 方法

首先我们看一下,ChannelInterceptor 哪些方法

package org.springframework.messaging.support;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public interface ChannelInterceptor {
    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    Message<?> preSend(Message<?> var1, MessageChannel var2);

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    void postSend(Message<?> var1, MessageChannel var2, boolean var3);

    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    void afterSendCompletion(Message<?> var1, MessageChannel var2, boolean var3, Exception var4);

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    boolean preReceive(MessageChannel var1);

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    Message<?> postReceive(Message<?> var1, MessageChannel var2);

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    void afterReceiveCompletion(Message<?> var1, MessageChannel var2, Exception var3);
}

上面有说道,在ChannelInterceptor接口中的preSend能在消息发送前做一些处理,例如可以获取到用户登录的唯一token令牌,这里的令牌是我们业务传递给客户端的,例如用户在登录成功后跳转到websocket建立连接的页面,我们后台生成的一个标识符,客户端在和服务端建立websocket连接的时候,我们可以从消息头中获取到这种业务参数,并做一系列后续处理。

如果要做这种业务操作,我们还需要一个Authentication对象,这个对象是我们自己写的,这个类必须实现java.security.Principal,这里只是做一个简单的token存储,可以根据实际的业务逻辑进行扩展。

import java.security.Principal;

/**
 * <websocket登录连接对象>
 * <用于保存websocket连接过程中需要存储的业务参数>
 * @author wzh
 * @version 2018-08-26 23:30
 * @see [相关类/方法] (可选)
 **/
public class WebSocketUserAuthentication implements Principal{

    /**
     * 用户身份标识符
     */
    private String token;

    public WebSocketUserAuthentication(String token) {
        this.token = token;
    }

    public WebSocketUserAuthentication() {
    }

    /**
     * 获取用户登录令牌
     * @return
     */
    @Override
    public String getName() {
        return token;
    }
}

一个消息头拦截器,用于获取用户的认证信息


package com.wzh.demo.websocket.handler;

import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.util.Map;

/**
 * <设置认证用户信息>
 * <功能详细描述>
 * @author wzh
 * @version 2018-09-18 23:55
 * @see [相关类/方法] (可选)
 **/
public class MyPrincipalHandshakeHandler extends DefaultHandshakeHandler{

    private static final Logger log = Logger.getLogger(MyPrincipalHandshakeHandler.class);

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {

        HttpSession httpSession = getSession(request);
        // 获取登录的信息,就是controller 跳转页面存的信息,可以根据业务修改
        String user = (String)httpSession.getAttribute("loginName");

        if(StringUtils.isEmpty(user)){
            log.error("未登录系统,禁止登录websocket!");
            return null;
        }
        log.info(" MyDefaultHandshakeHandler login = " + user);
        return new WebSocketUserAuthentication(user);
    }

    private HttpSession getSession(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            return serverRequest.getServletRequest().getSession(false);
        }
        return null;
    }

}

下面我们做个拦截器,在preSend方法中获取封装首次登陆后的令牌信息,在postSend方法中统计在线人数

WebSocketChannelInterceptor 拦截登录时消息头中的信息

package com.wzh.demo.websocket.interceptor;

import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;

import javax.servlet.http.HttpSession;

import static org.springframework.messaging.simp.stomp.StompCommand.CONNECT;

/**
 * <websocke消息监听,用于监听websocket用户连接情况>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-25 23:39
 * @see [相关类/方法] (可选)
 **/
public class WebSocketChannelInterceptor implements ChannelInterceptor {

    public WebSocketChannelInterceptor() {
    }

    Logger log = Logger.getLogger(WebSocketChannelInterceptor.class);

    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {

        StompHeaderAccessor accessor =  MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        /**
         * 1. 判断是否为首次连接请求,如果已经连接过,直接返回message
         * 2. 网上有种写法是在这里封装认证用户的信息,本文是在http阶段,websockt 之前就做了认证的封装,所以这里直接取的信息
         */
        if(StompCommand.CONNECT.equals(accessor.getCommand()))
        {
            /*
             * 1. 这里获取就是JS stompClient.connect(headers, function (frame){.......}) 中header的信息
             * 2. JS中header可以封装多个参数,格式是{key1:value1,key2:value2}
             * 3. header参数的key可以一样,取出来就是list
             * 4. 样例代码header中只有一个token,所以直接取0位
             */
            String token = accessor.getNativeHeader("token").get(0);

            /*
             * 1. 这里直接封装到StompHeaderAccessor 中,可以根据自身业务进行改变
             * 2. 封装大搜StompHeaderAccessor中后,可以在@Controller / @MessageMapping注解的方法中直接带上StompHeaderAccessor
             *    就可以通过方法提供的 getUser()方法获取到这里封装user对象
             * 2. 例如可以在这里拿到前端的信息进行登录鉴权
             */
            WebSocketUserAuthentication user = (WebSocketUserAuthentication) accessor.getUser();

            System.out.println("认证用户:" + user.toString() + " 页面传递令牌" + token);

        }else if (StompCommand.DISCONNECT.equals(accessor.getCommand()))
        {

        }
        return message;
    }

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    @Override
    public void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {

        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        /*
         * 拿到消息头对象后,我们可以做一系列业务操作
         * 1. 通过getSessionAttributes()方法获取到websocketSession,
         *    就可以取到我们在WebSocketHandshakeInterceptor拦截器中存在session中的信息
         * 2. 我们也可以获取到当前连接的状态,做一些统计,例如统计在线人数,或者缓存在线人数对应的令牌,方便后续业务调用
         */
        HttpSession httpSession = (HttpSession) accessor.getSessionAttributes().get("HTTP_SESSION");

        // 这里只是单纯的打印,可以根据项目的实际情况做业务处理
        log.info("postSend 中获取httpSession key:" + httpSession.getId());

        // 忽略心跳消息等非STOMP消息
        if(accessor.getCommand() == null)
        {
            return;
        }

        // 根据连接状态做处理,这里也只是打印了下,可以根据实际场景,对上线,下线,首次成功连接做处理
        System.out.println(accessor.getCommand());
        switch (accessor.getCommand())
        {
            // 首次连接
            case CONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 首次连接");
                break;
            // 连接中
            case CONNECTED:
                break;
            // 下线
            case DISCONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 下线");
                break;
             default:
                break;
        }


    }

    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean b, Exception e) {

    }

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    @Override
    public boolean preReceive(MessageChannel messageChannel) {
        return true;
    }

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
        return message;
    }

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception e) {

    }
}

服务端发送消息大体有两种场景,公告和私信(私聊和群聊),实现的方式蛮多的。

服务端处理消息的场景:

  • 公告就是只要订阅了此路径的的用户都能收到,我们使用@SendTo 注解实现,如果不使用注解指定,
会默认交给broker进行处理,例如@MessageMapping("/demo1/twoWays") 这种,就会拼接代理域+路径

相当于配置了@SendTo("/topicTest/twoWays"),也可以使用SimpMessagingTemplate.convertAndSend
  • 私信就是指定人员才能收到,可以用@SendToUser 注解或者SimpMessagingTemplate 模板类(框架提供)的convertAndSendToUser进行处理
    • @SendToUser 多用于资源的请求,如果我只是想简单的用websocket向服务器请求资源而已,然后服务器你就把资源给我就行了,别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
    • SimpMessagingTemplate.convertAndSendToUser 可以用户发送指定的人员
    • 使用指定人员发送的时候,前缀必须为配置的setUserDestinationPrefix 配置的“/user”,在spring 框架内部以”/user” 为前缀的消息将会通过 UserDestinationMessageHandler 进行处理,而不是 AnnotationMethodMessageHandler 或 SimpleBrokerMessageHandler or StompBrokerRelayMessageHandler。UserDestinationMessageHandler 的主要任务: 是 将用户消息重新路由到 某个用户独有的目的地上。 在处理订阅的时候,它会将目标地址中的 “/user” 前缀去掉,并基于用户 的会话添加一个后缀。如,对 “/user/userTest/notifications” 的订阅最后可能路由到 名为 “/userTest/notifacations-user65a4sdfa” 目的地上

服务端controller 用于接收客户端消息和响应客户端

package com.wzh.demo.controller;

import com.alibaba.fastjson.JSON;
import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Controller;

import java.util.HashMap;
import java.util.Map;

/**
 * <STOMP websocket controller>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-29 22:58
 * @see [相关类/方法] (可选)
 **/
@Controller
public class WebSocketStompController {

    Logger log = Logger.getLogger(WebSocketStompController.class);

    private final SimpMessagingTemplate messagingTemplate;

    /**
     * 实例化Controller的时候,注入SimpMessagingTemplate
     * @param messagingTemplate
     */
    @Autowired
    public WebSocketStompController(SimpMessagingTemplate messagingTemplate)
    {
        this.messagingTemplate = messagingTemplate;
    }

    /**
     * 发送广播消息,所有订阅了此路径的用户都会收到此消息
     * 这里做个restful风格,其实无所谓,根据项目实际情况进行配置
     * restful风格的接口,在springMVC中,我们使用@PathVariable注解,
     * 在websocket stomp接口中,restful要使用@DestinationVariable
     * @param groupId
     * @param json
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsg/{groupId}")
    @SendTo("/topicTest/hello")
    public Map<String, Object> sendChatMsg(@DestinationVariable(value = "groupId") String groupId, String json,
        StompHeaderAccessor headerAccessor)
    {
        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user =(WebSocketUserAuthentication)headerAccessor.getUser();
        log.info("公告controller 中获取用户登录令牌:" + user.getName());
        log.info("公告拿到客户端传递分组参数:" + groupId);
        
        // 这里拿到的json 字符串,其实可以自动绑定到对象上
        System.out.println("公告获取客户端传递过来的JSON 字符串:" + json);
        Map msg = (Map) JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();
        data.put("msg", "公告服务器收到客户端请求,发送广播消息:"+ msg.get("msg"));
        return data;
        
    }

    /**
     * 发送私信消息,只是想简单的用websocket向服务器请求资源而已,
     * 然后服务器你就把资源给我就行了,别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
     * 如果一个帐号打开了多个浏览器窗口,也就是打开了多个websocket session通道,
     * 这时,spring webscoket默认会把消息推送到同一个帐号不同的session,
     * 可以利用broadcast = false把避免推送到所有的session中
     * @param json
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsgByOwn")
    @SendToUser(value = "/userTest/own")
    public Map<String, Object> sendChatMsgByOwn(String json,
        StompHeaderAccessor headerAccessor)
    {
        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user = (WebSocketUserAuthentication)headerAccessor.getUser();

        log.info("SendToUser controller 中获取用户登录令牌:" + user.getName()
                + " socketId:" + headerAccessor.getSessionId());

        // 这里拿到的json 字符串,其实可以自动绑定到对象上
        System.out.println("SendToUser获取客户端传递过来的JSON 字符串:" + json);

        Map msg = (Map)JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();
        data.put("msg", "SendToUser服务器收到客户端请求,发送私信消息:" + msg.get("msg"));
        
        return data;
    }

    /**
     * 根据ID 把消息推送给指定用户
     * 1. 这里用了 @SendToUser 和 返回值 其意义是可以在发送成功后回执给发送放其信息发送成功
     * 2. 非必须,如果实际业务不需要关心此,可以不用@SendToUser注解,方法返回值为void
     * 3. 这里接收人的参数是用restful风格带过来了,websocket把参数带到后台的方式很多,除了url路径,
     *    还可以在header中封装用@Header或者@Headers去取等多种方式
     * @param accountId 消息接收人ID
     * @param json 消息JSON字符串
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsgById/{accountId}")
    @SendToUser(value = "/userTest/callBack")
    public Map<String, Object> sendChatMsgById(
        @DestinationVariable(value = "accountId") String accountId, String json,
        StompHeaderAccessor headerAccessor)
    {
        Map msg = (Map)JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();

        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user = (WebSocketUserAuthentication)headerAccessor.getUser();

        log.info("SendToUser controller 中获取用户登录令牌:" + user.getName()
                + " socketId:" + headerAccessor.getSessionId());

        // 向用户发送消息,第一个参数是接收人、第二个参数是浏览器订阅的地址,第三个是消息本身
        // 如果服务端要将消息发送给特定的某一个用户,
        // 可以使用SimpleMessageTemplate的convertAndSendToUser方法(第一个参数是用户的登陆名username)
        String address = "/userTest/callBack";
        messagingTemplate.convertAndSendToUser(accountId, address, msg.get("msg"));

        data.put("msg", "callBack 消息已推送,消息内容:" + msg.get("msg"));
        return data;
    }

}

一个springMVC的controller 用户跳转websocket页面,并封装简单的认证信息

package com.wzh.demo.controller;

import com.wzh.demo.websocket.handler.WebSocketHander;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.socket.TextMessage;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.Date;


/**
 * <websocket测试用MVC控制器>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-09 22:53
 * @see [相关类/方法] (可选)
 **/
@Controller
@RequestMapping("/websocket")
public class WebSocketController {

    // 跳转stomp websocket 页面
    @RequestMapping(value = "/spring/stompSocket.do",method = RequestMethod.GET)
    public String toStompWebSocket(HttpSession session, HttpServletRequest request, Model model)
    {
        // 这里封装一个登录的用户组参数,模拟进入通讯后的简单初始化
        model.addAttribute("groupId","user_groupId");
        model.addAttribute("session_id",session.getId());
        System.out.println("跳转:" + session.getId());
        session.setAttribute("loginName",session.getId());
        return "/test/springWebSocketStomp";

    }
}

Html 客户端,客户端需要引入额外的两个js,sockjs.js和stomp.js


<#import "spring.ftl" as spring />
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
        "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
    <title>Title</title>
    <script src="${request.contextPath}/js/jquery-3.3.1.min.js"></script>
    <script src="${request.contextPath}/js/sockjs.js"></script>
    <script src="${request.contextPath}/js/stomp.js"></script>
    <script type="text/javascript">

        // 定义全局变量 stomp socket
        var stompClient,socket;

        $(document).ready(function () {
            if (window.WebSocket){
                websocketConfig();
            } else {
                alert("错误","浏览器不支持websocket技术通讯.");
            }
        });

        // websocket 配置
        function websocketConfig() {
            /*
             * 1. 连接url为endpointChat的endpoint,对应后台WebSoccketConfig的配置
             * 2. SockJS 所处理的URL 是 "http://" 或 "https://" 模式,而不是 "ws://" or  "wss://"
             */
            socket = new SockJS("${request.contextPath}/stomp/websocketJS");

            // 通过sock对象监听每个事件节点,非必须,这个必须放在stompClient的方法前面
            sockHandle();

            // 获取 STOMP 子协议的客户端对象
            stompClient = Stomp.over(socket);

            /*
             * 1. 获取到stomp 子协议后,可以设置心跳连接时间,认证连接,主动断开连接
             * 2,连接心跳有的版本的stomp.js 是默认开启的,这里我们不管版本,手工设置
             * 3. 心跳是双向的,客户端开启心跳,必须要服务端支持心跳才行
             * 4. heartbeat.outgoing 表示客户端给服务端发送心跳的间隔时间
             * 5. 客户端接收服务端心跳的间隔时间,如果为0 表示客户端不接收服务端心跳
             */
            stompClient.heartbeat.outgoing = 10000;
            stompClient.heartbeat.incoming = 0;

            /*
             * 1. stompClient.connect(headers, connectCallback, errorCallback);
             * 2. headers表示客户端的认证信息,多个参数 json格式存,这里简单用的httpsessionID,可以根据业务场景变更
             *    这里存的信息,在服务端StompHeaderAccessor 对象调用方法可以取到
             * 3. connectCallback 表示连接成功时(服务器响应 CONNECTED 帧)的回调方法;
             *    errorCallback 表示连接失败时(服务器响应 ERROR 帧)的回调方法,非必须;
             */
            var headers = {token:"${session_id}"};

            stompClient.connect(headers,function (frame) {

                console.log('Connected: ' + frame);

                /*
                 * 1. 订阅服务,订阅地址为服务器Controller 中的地址
                 * 2. 如果订阅为公告,地址为Controller 中@SendTo 注解地址
                 * 3. 如果订阅为私信,地址为setUserDestinationPrefix 前缀+@SendToUser注解地址
                 *    或者setUserDestinationPrefix 前缀 + controller的convertAndSendToUser地址一致
                 * 4. 这里演示为公告信息,所有订阅了的用户都能接受
                 */
                stompClient.subscribe("/topicTest/hello",function (message) {
                    var msg = JSON.parse(message.body).msg;
                    console.log("接收到公告信息:" + msg);
                    alert("接收到公告信息:" + msg);
                });

                /*
                 * 1. 因为推送为私信,必须带上或者setUserDestinationPrefix前缀 /user
                 * 2. 演示自己发送给自己,做websocket向服务器请求资源而已,然后服务器你就把资源给我就行了,
                 *    别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
                 */
                stompClient.subscribe('/user/userTest/own',function (message) {
                    var msg = JSON.parse(message.body).msg;
                    console.log("接收到私信信息SendToUser:" + msg);
                    alert("接收到私信信息SendToUser:" + msg);
                });

                /*
                 * 1. 订阅点对点消息
                 * 2. 很多博文这里的路径会写成"/user/{accountId}/userTest/callBack”这种,是因为
                 *    @SendToUser发送的代理地址是 /userTest/callBack, 地址将会被转化为 /user/{username}/userTest/callBack
                 *    username,为用户的登录名,也是就是Principal或者本文中的WebSocketUserAuthentication对象getName获取的参数
                 *    如果在拦截器中配置了认证路径,可以不带参数,不过推荐用带参数的写法
                 *
                 */
                stompClient.subscribe('/user/userTest/callBack',function (message) {

                    var msg  = message.body;
                    console.log("接收到点对点SendToUser:" + msg);
                    alert("接收到点对点SendToUser:" + msg);
                });

            }, function (error) {
                console.log('STOMP: ' + error);
                //setTimeout(websocketConfig, 10000);
                console.log('STOMP: Reconnecting in 10 seconds');
            });
        }
        
        // 发送公告消息
        function sendMsg() {
            var msg = $("#message").val();
            var data ={"msg":msg};

            /**
             *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
             *  2. headers 为发送信息的header,json格式,JavaScript 对象,
             *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
             *  3. body 为发送信息的 body,字符串,可选参数
             */
            stompClient.send('${"/app/sendChatMsg/" + groupId}',{},JSON.stringify(data));
        }

        // 发送给自己
        function sendMsgOwn() {
            var msg = $("#message").val();
            var data ={"msg":msg};

            /**
             *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
             *  2. headers 为发送信息的header,json格式,JavaScript 对象,
             *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
             *  3. body 为发送信息的 body,字符串,可选参数
             */
            stompClient.send("/app/sendChatMsgByOwn",{},JSON.stringify(data));
        }

        // 发送点对点消息
        function sendMsgById() {
            var msg = $("#message").val();
            var accountId = $("#accountId").val();
            var data ={"msg":msg};

            /**
             *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
             *  2. headers 为发送信息的header,json格式,JavaScript 对象,
             *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
             *  3. body 为发送信息的 body,字符串,可选参数
             *  4. accountId这个参数其实可以通过header传过去,不过因为是restful风格,所以就跟在url上
             */
            stompClient.send("/app/sendChatMsgById/" + accountId,{},JSON.stringify(data));
        }

        // 通过sock对象监听每个事件节点,非必须,这里开启了stomp的websocket 也不会生效了
        function sockHandle() {

            // 连接成功后的回调函数
            socket.onopen = function () {
                console.log("------连接成功------");
            };

            // 监听接受到服务器的消息
            socket.onmessage = function (event) {
                console.log('-------收到的消息: ' + event.data);
            };

            // 关闭连接的回调函数
            socket.onclose = function (event) {
                console.log('--------关闭连接: connection closed.------');
            };

            // 连接发生错误
            socket.onerror = function () {
                alert("连接错误", "网络超时或通讯地址错误.");
                disconnect();
            } ;
        }

        // 关闭websocket
        function disconnect() {
            if (socket != null) {
                socket.close();
                socket = null;
            }
        }

    </script>
</head>
<body>
    <div>
        <span>消息</span>
        <input type="text" id="message" name="message">
        <input type="button" id="sendMsg" name="sendMsg" value="发送公告" onclick="sendMsg();">
        <input type="button" id="sendMsgOwn" name="sendMsgOwn" value="自己给自己推送" onclick="sendMsgOwn();">
        <br/>
        <span>接收人</span>
        <input type="text" id="accountId" name="accountId">
        <input type="button" id="sendMsgById" name="sendMsgById" value="点对点消息" onclick="sendMsgById();">
    </div>
</body>
</html>

这样就可以通过页面做简单的消息推送了

大佬的精华:

https://blog.csdn.net/pacosonswjtu/article/details/51914567

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

0 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 594128浏览

随机文章
ElasticSearch—锁和版本控制(九)
5年前
Zookeeper—整理记录
10个月前
java中的双冒号操作符
5年前
SpringBoot DevTools—开发工具
5年前
Java—并发编程(八)线程池– (5) 线程池的原理
3年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 594128 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付