User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Spring   ›   正文
Spring

Spring—异步Http

2022-08-31 12:15:12
1624  0 1
参考目录 隐藏
1) 前置概念
2) 初始化选择 web 容器
3) Spring MVC的同步模式
4) Spring MVC异步模式
5) Callable案例
6) WebAsyncTask案例:
7) DeferredResult案例:
8) Spring MVC异步模式中使用Filter和HandlerInterceptor
9) 区别使用
10) 进阶用法
11) DeferredResult应用
12) 实现长轮询服务端推送消息(long polling)
13) ResponseBodyEmitter和SseEmitter
14) sse 规范
15) StreamingResponseBody (很方便的文件下载)

阅读完需:约 39 分钟

前置概念

虽然现在看看Spring的异步模式看起来有点老,毕竟现在都Spring5的时代了,甚至将来肯定是webflux的时代,但是不可否认的是现在Servlet 仍然是主流,目前都是Servlet3.0

看起来是很老的技术,但是很多人还是没了解过,毕竟做业务也不是非它不可。

  • Spring MVC足够优秀,封装得我们现在处理业务请求只需要面向JavaBean去编程即可,没必要再去了解Servlet底层的细节
  • Servlet源生的API在Spring MVC的环境下,使用场景已经非常的少了。甚至给我们一种错觉:servlet技术已经淡化了大众的视野
  • Spring MVC的异步模式多多少少都会增加使用的复杂度,从而增加犯错的概率。而它的同步模式可以说是能够满足现在绝大部分的使用场景(大不了觉得性能不够了,就加机器嘛,很少会从代码的本身去考虑和优化性能),所以确实没使用过也是在情理之中的

说到Spring的异步模式,还有一个点比较关键就是响应式编程(Reactive Programming)。

自从Spring5开始,出来了一个webflux,基于Netty开发的框架,补上了Spring响应式编程的遗憾,像SpringCloudGateway就是基于webflux开发。

Spring5.0以后,它对servlet不再强依赖,而是变为了可选依赖。另外一个选择还可以是:Reactive编程,至此SpringBoot可以分为Servlet体系与Reactive体系。


可以扩展一下

初始化选择 web 容器

Spring Boot 中初始化哪种 web 容器的选择原理,首先第一步是根据类是否存在确定是哪种 WebApplicationType:

public enum WebApplicationType {

	/**
	 * 没有 web 服务,不需要 web 容器
	 */
	NONE,

	/**
	 * 使用基于 servlet 的 web 容器
	 */
	SERVLET,

	/**
	 * 使用响应式的 web 容器
	 */
	REACTIVE;
	
	private static final String[] SERVLET_INDICATOR_CLASSES = { "javax.servlet.Servlet",
			"org.springframework.web.context.ConfigurableWebApplicationContext" };

	private static final String WEBMVC_INDICATOR_CLASS = "org.springframework.web.servlet.DispatcherServlet";

	private static final String WEBFLUX_INDICATOR_CLASS = "org.springframework.web.reactive.DispatcherHandler";

	private static final String JERSEY_INDICATOR_CLASS = "org.glassfish.jersey.servlet.ServletContainer";

	private static final String SERVLET_APPLICATION_CONTEXT_CLASS = "org.springframework.web.context.WebApplicationContext";

	private static final String REACTIVE_APPLICATION_CONTEXT_CLASS = "org.springframework.boot.web.reactive.context.ReactiveWebApplicationContext";

	static WebApplicationType deduceFromClasspath() {
		if (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null)
				&& !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null)) {
			return WebApplicationType.REACTIVE;
		}
		for (String className : SERVLET_INDICATOR_CLASSES) {
			if (!ClassUtils.isPresent(className, null)) {
				return WebApplicationType.NONE;
			}
		}
		return WebApplicationType.SERVLET;
	}

从源码中可以看出,当有 WEBFLUX_INDICATOR_CLASS 并且没有 WEBMVC_INDICATOR_CLASS 以及 JERSEY_INDICATOR_CLASS 的时候,判断为 REACTIVE 环境。如果所有 SERVLET_INDICATOR_CLASSES 就认为是 SERVLET 环境。其实这样也可以看出,如果又引入 spring-web 又引入 spring-webflux 的依赖,其实还是 SERVLET 环境。如果以上都没有,那么就是无 web 容器的环境。在 Spring-Cloud-Gateway 中,是 REACTIVE 环境。

如果是 REACTIVE 环境,就会使用 org.springframework.boot.web.reactive.server.ReactiveWebServerFactory 的实现 Bean 创建 web 容器。那么究竟是哪个实现呢?目前有四个实现(Spring-boot 2.7.x)

  • TomcatReactiveWebServerFactory:基于 Tomcat 的响应式 web 容器 Factory
  • JettyReactiveWebServerFactory:基于 Jetty 的响应式 web 容器 Factory
  • UndertowReactiveWebServerFactory:基于 Undertow 的响应式 web 容器Factory
  • NettyReactiveWebServerFactory:基于 Netty 的响应式 web 容器 Factory

实际会用哪个,看到底哪个 Bean 会注册到 ApplicationContext 中:

ReactiveWebServerFactoryConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ HttpServer.class })
static class EmbeddedNetty {

	@Bean
	@ConditionalOnMissingBean
	ReactorResourceFactory reactorServerResourceFactory() {
		return new ReactorResourceFactory();
	}

	@Bean
	NettyReactiveWebServerFactory nettyReactiveWebServerFactory(ReactorResourceFactory resourceFactory,
			ObjectProvider<NettyRouteProvider> routes, ObjectProvider<NettyServerCustomizer> serverCustomizers) {
		NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory();
		serverFactory.setResourceFactory(resourceFactory);
		routes.orderedStream().forEach(serverFactory::addRouteProviders);
		serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
		return serverFactory;
	}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ org.apache.catalina.startup.Tomcat.class })
static class EmbeddedTomcat {

	@Bean
	TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory(
			ObjectProvider<TomcatConnectorCustomizer> connectorCustomizers,
			ObjectProvider<TomcatContextCustomizer> contextCustomizers,
			ObjectProvider<TomcatProtocolHandlerCustomizer<?>> protocolHandlerCustomizers) {
		TomcatReactiveWebServerFactory factory = new TomcatReactiveWebServerFactory();
		factory.getTomcatConnectorCustomizers()
				.addAll(connectorCustomizers.orderedStream().collect(Collectors.toList()));
		factory.getTomcatContextCustomizers()
				.addAll(contextCustomizers.orderedStream().collect(Collectors.toList()));
		factory.getTomcatProtocolHandlerCustomizers()
				.addAll(protocolHandlerCustomizers.orderedStream().collect(Collectors.toList()));
		return factory;
	}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ org.eclipse.jetty.server.Server.class, ServletHolder.class })
static class EmbeddedJetty {

	@Bean
	@ConditionalOnMissingBean
	JettyResourceFactory jettyServerResourceFactory() {
		return new JettyResourceFactory();
	}

	@Bean
	JettyReactiveWebServerFactory jettyReactiveWebServerFactory(JettyResourceFactory resourceFactory,
			ObjectProvider<JettyServerCustomizer> serverCustomizers) {
		JettyReactiveWebServerFactory serverFactory = new JettyReactiveWebServerFactory();
		serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
		serverFactory.setResourceFactory(resourceFactory);
		return serverFactory;
	}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ Undertow.class })
static class EmbeddedUndertow {

	@Bean
	UndertowReactiveWebServerFactory undertowReactiveWebServerFactory(
			ObjectProvider<UndertowBuilderCustomizer> builderCustomizers) {
		UndertowReactiveWebServerFactory factory = new UndertowReactiveWebServerFactory();
		factory.getBuilderCustomizers().addAll(builderCustomizers.orderedStream().collect(Collectors.toList()));
		return factory;
	}

}

从原码可以看出,每种配置上都有 @ConditionalOnMissingBean(ReactiveWebServerFactory.class) 以及判断是否有对应容器的 class 的条件,例如:@ConditionalOnClass({ Undertow.class }),@Configuration(proxyBeanMethods = false)是关闭这个配置中 Bean 之间的代理加快加载速度。

由于每个配置都有 @ConditionalOnMissingBean(ReactiveWebServerFactory.class),那么其实能保证就算满足多个配置的条件,最后也只有一个 ReactiveWebServerFactory,那么当满足多个条件时,哪个优先加载呢?这就要看这里的源码:

ReactiveWebServerFactoryAutoConfiguration

@Import({ ReactiveWebServerFactoryAutoConfiguration.BeanPostProcessorsRegistrar.class,
		ReactiveWebServerFactoryConfiguration.EmbeddedTomcat.class,
		ReactiveWebServerFactoryConfiguration.EmbeddedJetty.class,
		ReactiveWebServerFactoryConfiguration.EmbeddedUndertow.class,
		ReactiveWebServerFactoryConfiguration.EmbeddedNetty.class })

从这里可以看出,是按照 EmbeddedTomcat,EmbeddedJetty,EmbeddedUndertow,EmbeddedNetty 的顺序 Import 的,也就是:只要你的依赖中加入了任何 Web 容器(例如 Undertow),那么最后创建的就是基于那个 web 容器的异步容器,而不是基于 netty 的。


回到原点,讲讲原本的MVC模式

Spring MVC的同步模式

要知道什么是异步模式,就先要知道什么是同步模式。

建议可以看一下补充内容

Java—I/O模型与同步、异步、阻塞、非阻塞

浏览器发起请求,Web服务器开一个线程处理(请求处理线程),处理完把处理结果返回浏览器。这就是同步模式。绝大多数Web服务器都如此般处理。

此处需要明晰一个概念:比如tomcat,它既是一个web服务器,同时它也是个servlet后端容器(调Java后端服务),所以要区分清楚这两个概念。请求处理线程是有限的,宝贵的资源~(注意它和处理线程的区别)

tomcat默认的线程池容量是200,这就是请求处理线程,而这程序中我们可以自定义线程池去执行任务这就是处理线程。

  1. 请求发起者发起一个request,然后会一直等待一个response,这期间它是阻塞的
  2. 请求处理线程会在Call了之后等待Return,自身处于阻塞状态(这个很关键)
  3. 然后都等待return,知道处理线程全部完事后返回了,然后把response反给调用者就算全部结束了

问题在哪里?

绝大部分情况下,这样是没有问题的。因为
第一:高并发、高流量的场景放眼中国的公司,占比也是非常少的。
第二:长时间处理服务这种情况也是少之又少的

所以两者结合起来,场景就更加稀少了。相信这就是为什么好多做开发N年了的,却还不知道Servlet和Spring MVC的异步模式的原因吧。

Tomcat等应用服务器的连接线程池实际上是有限制的;每一个连接请求都会耗掉线程池的一个连接数;如果某些耗时很长的操作,如对大量数据的查询操作、调用外部系统提供的服务以及一些IO密集型操作等,会占用连接很长时间,这个时候这个连接就无法被释放而被其它请求重用。如果连接占用过多,服务器就很可能无法及时响应每个请求;极端情况下如果将线程池中的所有连接耗尽,服务器将长时间无法向外提供服务!

Spring MVC异步模式

Spring MVC3.2之后支持异步请求,能够在controller中返回一个Callable或者DeferredResult。由于Spring MVC的良好封装,异步功能使用起来出奇的简单。

Callable案例

	@Controller
	@RequestMapping("/async/controller")
	public class AsyncHelloController {
	
	    @ResponseBody
	    @GetMapping("/hello")
	    public Callable<String> helloGet() throws Exception {
	        System.out.println(Thread.currentThread().getName() + " 主线程start");
	
	        Callable<String> callable = () -> {
	            System.out.println(Thread.currentThread().getName() + " 子子子线程start");
	            TimeUnit.SECONDS.sleep(5); //模拟处理业务逻辑,话费了5秒钟
	            System.out.println(Thread.currentThread().getName() + " 子子子线程end");
			
				// 这里稍微小细节一下:最终返回的不是Callable对象,而是它里面的内容
	            return "hello world";
	        };
	
	        System.out.println(Thread.currentThread().getName() + " 主线程end");
	        return callable;
	    }
	}

输出结果

http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end

先明细两个概念:

  1. 请求处理线程:处理线程 属于 web 服务器线程,负责 处理用户请求,采用 线程池 管理。
  2. 异步线程:异步线程 属于 用户自定义的线程,可采用 线程池管理。

前端页面等待5秒出现结果

注意:异步模式对前端来说,是无感知的,这是后端的一种技术。所以这个和我们自己开启一个线程处理,立马返回给前端是有非常大的不同的,需要注意~

由此我们可以看出,主线程早早就结束了(需要注意,此时还并没有把response返回的,此处一定要注意),真正干事的是子线程(交给TaskExecutor去处理的,后续分析过程中可以看到),它的大致的一个处理流程图可以如下:

这里能够很直接的看出:我们很大程度上提高了我们请求处理线程的利用率,从而肯定就提高了我们系统的吞吐量。

异步模式处理步骤概述如下:

  1. 当Controller返回值是Callable的时候
  2. Spring就会将Callable交给TaskExecutor去处理(一个隔离的线程池)
  3. 与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态
  4. Callable线程处理完成后,Spring MVC讲请求重新派发给容器(注意这里的重新派发,和后面讲的拦截器密切相关)
  5. 根据Callabel返回结果,继续处理(比如参数绑定、视图解析等等就和之前一样了)~~~~

WebAsyncTask案例:

官方说如果我们需要超时处理的回调或者错误处理的回调,我们可以使用WebAsyncTask代替Callable

实际使用中,我并不建议直接使用Callable ,而是使用Spring提供的WebAsyncTask 代替,它包装了Callable,功能更强大些

@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {

    @ResponseBody
    @GetMapping("/hello")
    public WebAsyncTask<String> helloGet() throws Exception {
        System.out.println(Thread.currentThread().getName() + " 主线程start");

        Callable<String> callable = () -> {
            System.out.println(Thread.currentThread().getName() + " 子子子线程start");
            TimeUnit.SECONDS.sleep(5); //模拟处理业务逻辑,话费了5秒钟
            System.out.println(Thread.currentThread().getName() + " 子子子线程end");

            return "hello world";
        };

        // 采用WebAsyncTask 返回 这样可以处理超时和错误 同时也可以指定使用的Excutor名称
        WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000, callable);
        // 注意:onCompletion表示完成,不管你是否超时、是否抛出异常,这个函数都会执行的
        webAsyncTask.onCompletion(() -> System.out.println("程序[正常执行]完成的回调"));

        // 这两个返回的内容,最终都会放进response里面去===========
        webAsyncTask.onTimeout(() -> "程序[超时]的回调");
        // 备注:这个是Spring5新增的
        webAsyncTask.onError(() -> "程序[出现异常]的回调");


        System.out.println(Thread.currentThread().getName() + " 主线程end");
        return webAsyncTask;
    }
}

由于我们设置了超时时间为3000ms,而业务处理是5s,所以会执行onTimeout这个回调函数。因此页面是会显示“程序[超时]的回调”这几个字。其执行的过程同Callback。

输出结果

http-nio-8080-exec-1 主线程start
http-nio-8080-exec-1 主线程end
task-1 子子子线程start
程序[正常执行]完成的回调

下面我们简单看看WebAsyncTask的源码,非常简单,就是个包装:

public class WebAsyncTask<V> implements BeanFactoryAware {
	
	// 正常执行的函数(通过WebAsyncTask的构造函数可以传进来)
	private final Callable<V> callable;
	// 处理超时时间(ms),可通过构造函数指定,也可以不指定(不会有超时处理)
	private Long timeout;
	// 执行任务的执行器。可以构造函数设置进来,手动指定。
	private AsyncTaskExecutor executor;
	// 若设置了,会根据此名称去IoC容器里找这个Bean (和上面二选一)  
	// 若传了executorName,请务必调用set方法设置beanFactory
	private String executorName;
	private BeanFactory beanFactory;

	// 超时的回调
	private Callable<V> timeoutCallback;
	// 发生错误的回调
	private Callable<V> errorCallback;
	// 完成的回调(不管超时还是错误都会执行)
	private Runnable completionCallback;

	...
	
	// 这是获取执行器的逻辑
	@Nullable
	public AsyncTaskExecutor getExecutor() {
		if (this.executor != null) {
			return this.executor;
		} else if (this.executorName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
			return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
		} else {
			return null;
		}
	}


	public void onTimeout(Callable<V> callback) {
		this.timeoutCallback = callback;
	}
	public void onError(Callable<V> callback) {
		this.errorCallback = callback;
	}
	public void onCompletion(Runnable callback) {
		this.completionCallback = callback;
	}

	// 最终执行超时回调、错误回调、完成回调都是通过这个拦截器实现的
	CallableProcessingInterceptor getInterceptor() {
		return new CallableProcessingInterceptor() {
			@Override
			public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
				return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
			}
			@Override
			public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
				return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
			}
			@Override
			public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
				if (completionCallback != null) {
					completionCallback.run();
				}
			}
		};
	}

}

WebAsyncTask 的异步编程 API。相比于 @Async 注解,WebAsyncTask 提供更加健全的 超时处理 和 异常处理 支持。但是@Async也有更优秀的地方,就是他不仅仅能用于controller中(任意地方)

在SpringBoot中使用WebAsyncTask作为接口的返回值,在调用接口的时候回提示信息。

An Executor is required to handle java.util.concurrent.Callable return values.
Please, configure a TaskExecutor in the MVC config under "async support".
The SimpleAsyncTaskExecutor currently in use is not suitable under load.

Controller层使用了Callable返回结果,上下文没有配置异步线程池引起的。

Spring 配置在这方面有点令人困惑,因为它需要单独配置 MVC 异步支持,即使用返回 Callable 的控制器处理程序方法,以及使用 @Async 注释的任何 Spring bean 方法。要正确配置它,您可以应用类似下面的配置,记住 AsyncTaskExecutor 配置可能需要修改。

附带说明一下,可能会想简单地使用 @Async 注释您的控制器处理程序方法。这只会产生预期的效果 – 释放 Web 服务器线程 – 触发和忘记操作(此观察基于 Spring Boot 2.1.2,他们可能会在未来解决这个问题)。如果您想利用 Servlet 3.0 异步处理的强大功能,您真的必须使用 Callables 并使用 WebMvcConfigurer 配置它们。

异步配置

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    final Integer AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

    @Bean
    protected WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurerAdapter() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setTaskExecutor(getTaskExecutor());
            }
        };
    }

    @Bean
    protected ConcurrentTaskExecutor getTaskExecutor() {

        return new ConcurrentTaskExecutor(new ThreadPoolExecutor(AVAILABLE_PROCESSORS, 3 * AVAILABLE_PROCESSORS,
                5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(200),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()));
    }

}

DeferredResult案例:

DeferredResult使用方式与Callable类似,但在返回结果上不一样,它返回的时候实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去。

这个特性非常非常的重要,对后面实现复杂的功能(比如服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等等高级应用)

官方给的Demo如下:

关键在于这里的其他线程是可以通过MQ,定时任务,事件驱动等方式来触发的。

测试案例

@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {

    private List<DeferredResult<String>> deferredResultList = new ArrayList<>();

    @ResponseBody
    @GetMapping("/hello")
    public DeferredResult<String> helloGet() throws Exception {
        DeferredResult<String> deferredResult = new DeferredResult<>();

        //先存起来,等待触发
        deferredResultList.add(deferredResult);
        return deferredResult;
    }

    @ResponseBody
    @GetMapping("/setHelloToAll")
    public void helloSet() throws Exception {
        // 让所有hold住的请求给与响应
        deferredResultList.forEach(d -> d.setResult("say hello to all"));
    }
}

我们第一个请求/hello,会先deferredResult存起来,然后前端页面是一直等待(转圈状态)的。直到我发第二个请求:setHelloToAll,所有的相关页面才会有响应

执行过程

  1. controller 返回一个DeferredResult,我们把它保存到内存里或者List里面(供后续访问)
  2. Spring MVC调用request.startAsync(),开启异步处理
  3. 与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态
  4. 应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult set值。然后Spring MVC会把这个请求再次派发给servlet容器
  5. DispatcherServlet再次被调用,然后处理后续的标准流程

简单看看源码:

public class DeferredResult<T> {

	private static final Object RESULT_NONE = new Object()

	
	// 超时时间(ms) 可以不配置
	@Nullable
	private final Long timeout;
	// 相当于超时的话的,传给回调函数的值
	private final Object timeoutResult;

	// 这三种回调也都是支持的
	private Runnable timeoutCallback;
	private Consumer<Throwable> errorCallback;
	private Runnable completionCallback;


	// 这个比较强大,就是能把我们结果再交给这个自定义的函数处理了 他是个@FunctionalInterface
	private DeferredResultHandler resultHandler;

	private volatile Object result = RESULT_NONE;
	private volatile boolean expired = false;


	// 判断这个DeferredResult是否已经被set过了(被set过的对象,就可以移除了嘛)
	// 如果expired表示已经过期了你还没set,也是返回false的
	// Spring4.0之后提供的
	public final boolean isSetOrExpired() {
		return (this.result != RESULT_NONE || this.expired);
	}

	// 没有isSetOrExpired 强大,建议使用上面那个
	public boolean hasResult() {
		return (this.result != RESULT_NONE);
	}

	// 还可以获得set进去的结果
	@Nullable
	public Object getResult() {
		Object resultToCheck = this.result;
		return (resultToCheck != RESULT_NONE ? resultToCheck : null);
	}


	public void onTimeout(Runnable callback) {
		this.timeoutCallback = callback;
	}
	public void onError(Consumer<Throwable> callback) {
		this.errorCallback = callback;
	}
	public void onCompletion(Runnable callback) {
		this.completionCallback = callback;
	}

	
	// 如果你的result还需要处理,可以这是一个resultHandler,会对你设置进去的结果进行处理
	public final void setResultHandler(DeferredResultHandler resultHandler) {
		Assert.notNull(resultHandler, "DeferredResultHandler is required");
		// Immediate expiration check outside of the result lock
		if (this.expired) {
			return;
		}
		Object resultToHandle;
		synchronized (this) {
			// Got the lock in the meantime: double-check expiration status
			if (this.expired) {
				return;
			}
			resultToHandle = this.result;
			if (resultToHandle == RESULT_NONE) {
				// No result yet: store handler for processing once it comes in
				this.resultHandler = resultHandler;
				return;
			}
		}
		try {
			resultHandler.handleResult(resultToHandle);
		} catch (Throwable ex) {
			logger.debug("Failed to handle existing result", ex);
		}
	}

	// 我们发现,这里调用是private方法setResultInternal,我们设置进来的结果result,会经过它的处理
	// 而它的处理逻辑也很简单,如果我们提供了resultHandler,它会把这个值进一步的交给我们的resultHandler处理
	// 若我们没有提供此resultHandler,那就保存下这个result即可
	public boolean setResult(T result) {
		return setResultInternal(result);
	}

	private boolean setResultInternal(Object result) {
		// Immediate expiration check outside of the result lock
		if (isSetOrExpired()) {
			return false;
		}
		DeferredResultHandler resultHandlerToUse;
		synchronized (this) {
			// Got the lock in the meantime: double-check expiration status
			if (isSetOrExpired()) {
				return false;
			}
			// At this point, we got a new result to process
			this.result = result;
			resultHandlerToUse = this.resultHandler;
			if (resultHandlerToUse == null) {
				this.resultHandler = null;
			}
		}
		resultHandlerToUse.handleResult(result);
		return true;
	}

	// 发生错误了,也可以设置一个值。这个result会被记下来,当作result
	// 注意这个和setResult的唯一区别,这里入参是Object类型,而setResult只能set规定的指定类型
	// 定义成Obj是有原因的:因为我们一般会把Exception等异常对象放进来。。。
	public boolean setErrorResult(Object result) {
		return setResultInternal(result);
	}

	// 拦截器 注意最终finally里面,都可能会调用我们的自己的处理器resultHandler(若存在的话)
	// afterCompletion不会调用resultHandler~~~~~~~~~~~~~
	final DeferredResultProcessingInterceptor getInterceptor() {
		return new DeferredResultProcessingInterceptor() {
			@Override
			public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) {
				boolean continueProcessing = true;
				try {
					if (timeoutCallback != null) {
						timeoutCallback.run();
					}
				} finally {
					if (timeoutResult != RESULT_NONE) {
						continueProcessing = false;
						try {
							setResultInternal(timeoutResult);
						} catch (Throwable ex) {
							logger.debug("Failed to handle timeout result", ex);
						}
					}
				}
				return continueProcessing;
			}
			@Override
			public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) {
				try {
					if (errorCallback != null) {
						errorCallback.accept(t);
					}
				} finally {
					try {
						setResultInternal(t);
					} catch (Throwable ex) {
						logger.debug("Failed to handle error result", ex);
					}
				}
				return false;
			}
			@Override
			public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) {
				expired = true;
				if (completionCallback != null) {
					completionCallback.run();
				}
			}
		};
	}

	// 内部函数式接口 DeferredResultHandler
	@FunctionalInterface
	public interface DeferredResultHandler {
		void handleResult(Object result);
	}

}

DeferredResult的超时处理,采用委托机制,也就是在实例DeferredResult时给予一个超时时长(毫秒),同时在onTimeout中委托(传入)一个新的处理线程(我们可以认为是超时线程);当超时时间到来,DeferredResult启动超时线程,超时线程处理业务,封装返回数据,给DeferredResult赋值(正确返回的或错误返回的)

Spring MVC异步模式中使用Filter和HandlerInterceptor

看到上面的异步访问,不免我们会新生怀疑,若是普通的拦截器HandlerInterceptor,还生效吗?若生效,效果是怎么样的,现在我们直接看一下吧:(以上面Callable的Demo为示例)

Filter

// 注意,这里必须开启异步支持asyncSupported = true,否则报错:Async support must be enabled on a servlet and for all filters involved in async request processing
@WebFilter(urlPatterns = "/*", asyncSupported = true)
@Configuration
public class HelloFilter extends OncePerRequestFilter {

    @Override
    protected void initFilterBean() throws ServletException {
        System.out.println("Filter初始化...");
    }

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        System.out.println(Thread.currentThread().getName() + "--->" + request.getRequestURI());
        filterChain.doFilter(request, response);
    }

}

输出结果

http-apr-8080-exec-3--->/hello
http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end

由此可以看出,异步上下文,Filter还是只会被执行一次拦截的,符合我们的预期,所以没什么毛病。

HandlerInterceptor

public class HelloInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---preHandle-->" + request.getRequestURI());
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---postHandle-->" + request.getRequestURI());
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---afterCompletion-->" + request.getRequestURI());
    }
}

// 注册拦截器
@Configuration
// @EnableWebMvc //可以不需要配置注解,详情看上面的链接
public class AppConfig implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
    	// /**拦截所有请求
        registry.addInterceptor(new HelloInterceptor()).addPathPatterns("/**");
    }
}

输出结果

http-apr-8080-exec-3--->/hello
http-apr-8080-exec-3---preHandle-->/hello

http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end

// 注意  子子子线程处理结束后,再一次触发了preHandle=====
// 此处还要一个细节:这里面的线程既不是子线程,也不是上面的线程  而是新开了一个线程~~~
http-apr-8080-exec-5---preHandle-->/hello
http-apr-8080-exec-5---postHandle-->/hello
http-apr-8080-exec-5---afterCompletion-->/hello

从上面可以看出,如果我们就是普通的Spring MVC的拦截器,preHandler会执行两次,这也符合我们上面分析的处理步骤。所以我们在书写preHandler的时候,一定要特别的注意,要让preHandler即使执行多次,也不要受到影响(幂等)

这里还是要特别注意的是@EnableWebMvc注解会产生一个警告

!!!
An Executor is required to handle java.util.concurrent.Callable return values.
Please, configure a TaskExecutor in the MVC config under "async support".
The SimpleAsyncTaskExecutor currently in use is not suitable under load.
-------------------------------
Request URI: '/hello'
!!!

这个警告的是表面我们不应在生产中使用默认使用的SimpleAsyncTaskExecutor,原本SpringBoot的MVC配置是不会使用它的,但是这个注解使得SpringBoot的配置失效,所以我们要手动配置MVC配置。

Spring Boot 中自定义 SpringMVC 配置,到底继承谁?

SpringBoot—WebMvcConfigurer详解

下面是我们手动配置线程池

/**
 * @EnableWebMvc 导致Springboot自带的MVC配置失效,所以要重新配置
 */
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Bean
    public ThreadPoolTaskExecutor mvcTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(10);
        return taskExecutor;
    }

    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setTaskExecutor(mvcTaskExecutor());
    }

}

回到正题,异步拦截器有 AsyncHandlerInterceptor、CallableProcessingInterceptor、DeferredResultProcessingInterceptor

Spring MVC给提供了异步拦截器,能让我们更深入的参与进去异步request的生命周期里面去。其中最为常用的为:AsyncHandlerInterceptor:

public class AsyncHelloInterceptor implements AsyncHandlerInterceptor {

    // 这是Spring3.2提供的方法,专门拦截异步请求的方式
    @Override
    public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---afterConcurrentHandlingStarted-->" + request.getRequestURI());
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---preHandle-->" + request.getRequestURI());
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---postHandle-->" + request.getRequestURI());
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        System.out.println(Thread.currentThread().getName() + "---afterCompletion-->" + request.getRequestURI());
    }
}

输出结果

http-apr-8080-exec-3---preHandle-->/hello
http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end

// 这里发现,它在主线程结束后,子线程开始之前执行的(线程号还是同一个哦~)
http-apr-8080-exec-3---afterConcurrentHandlingStarted-->/hello

MvcAsync1 子子子线程start
MvcAsync1 子子子线程end
http-apr-8080-exec-6---preHandle-->/hello
http-apr-8080-exec-6---postHandle-->/hello
http-apr-8080-exec-6---afterCompletion-->/hello

AsyncHandlerInterceptor提供了一个afterConcurrentHandlingStarted()方法, 这个方法会在Controller方法异步执行时开始执行, 而Interceptor的postHandle方法则是需要等到Controller的异步执行完才能执行

(比如我们用DeferredResult的话,afterConcurrentHandlingStarted是在return的之后执行,而postHandle()是执行.setResult()之后执行)

需要说明的是:如果我们不是异步请求,afterConcurrentHandlingStarted是不会执行的。所以我们可以把它当做加强版的HandlerInterceptor来用。平时我们若要使用拦截器,建议使用它。(Spring5,JDK8以后,很多的xxxAdapter都没啥用了,直接implements接口就成~)

同样可以注册CallableProcessingInterceptor或者一个DeferredResultProcessingInterceptor用于更深度的集成异步request的生命周期

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        // 注册异步的拦截器、默认的超时时间、任务处理器TaskExecutor等等
        //configurer.registerCallableInterceptors();
        //configurer.registerDeferredResultInterceptors();
        //configurer.setDefaultTimeout();
        //configurer.setTaskExecutor();
    }

只是一般来说,我们并不需要注册这种精细的拦截器,绝大多数情况下,使用AsyncHandlerInterceptor是够了的。
(Spring MVC的很多默认设置,请参考WebMvcConfigurationSupport)


区别使用

我觉得最主要的区别是:DeferredResult需要自己用线程来处理结果setResult,而Callable的话不需要我们来维护一个结果处理线程。

总体来说,Callable的话更为简单,同样的也是因为简单,灵活性不够;

相对地,DeferredResult更为复杂一些,但是又极大的灵活性,所以能实现非常多个性化的、复杂的功能,可以设计高级应用。

有些较常见的场景, Callable也并不能解决,比如说:我们访问A接口,A接口调用三方的服务,服务回调(注意此处指的回调,不是返回值)B接口,这种情况就没办法使用Callable了,这个时候可以使用DeferredResult

使用原则:基本上在可以用Callable的时候,直接用Callable;而遇到Callable没法解决的场景的时候,可以尝试使用DeferredResult。(这里所指的Callable包括WebAsyncTask)


进阶用法

DeferredResult应用

它的优点也是非常明显的,能够实现两个完全不相干的线程间的通信。处理的时候请注意图中标记的线程安全问题

实现长轮询服务端推送消息(long polling)

双向通信的方式

在WebSocket协议之前(它是2011年发布的),有三种实现双向通信的方式:轮询(polling)、长轮询(long-polling)和iframe流(streaming)。

  • 轮询(polling):这个不解释了。优点是实现简单粗暴,后台处理简单。缺点也是大大的,耗流量、耗CPU。。。
  • 长轮询(long-polling):长轮询是对轮询的改进版。客户端发送HTTP给服务器之后,看有没有新消息,如果没有新消息,就一直等待(而不是一直去请求了)。当有新消息的时候,才会返回给客户端。 优点是对轮询做了优化,时效性也较好。缺点是:保持连接会消耗资源; 服务器没有返回有效数据,程序超时~~~
  • iframe流(streaming):是在页面中插入一个隐藏的iframe,利用其src属性在服务器和客户端之间创建一条长连接,服务器向iframe传输数据(通常是HTML,内有负责插入信息的javascript),来实时更新页面。(个人觉得还不如长轮询)
  • WebSocket:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。它将TCP的Socket(套接字)应用在了webpage上。 它的有点一大把:支持双向通信,实时性更强;可发送二进制文件;非常节省流量。 但也是有缺点的:浏览器支持程度不一致,不支持断开重连 (最推荐的~~~)

演示案例

了解apollo配置中心的实现原理,apollo的发布配置推送变更消息就是用DeferredResult实现的。它的大概实现步骤如下:

  1. apollo客户端会像服务端发送长轮询http请求,超时时间60秒
  2. 当超时后返回客户端一个304 httpstatus,表明配置没有变更,客户端继续这个步骤重复发起请求
  3. 当有发布配置的时候,服务端会调用DeferredResult.setResult返回200状态码。客户端收到响应结果后,会发起请求获取变更后的配置信息(注意这里是另外一个请求)。

注册拦截器设置过期时间

@Configuration
// @EnableWebMvc // 可以不需要这个注解
public class AppConfig implements WebMvcConfigurer {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        // 超时时间设置为15s
        configurer.setDefaultTimeout(TimeUnit.SECONDS.toMillis(15));
    }

}

特定的异常捕获

@Slf4j
@ControllerAdvice
class GlobalControllerExceptionHandler {

    @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码  效果同HttpServletResponse#sendError(int) 但这样更优雅
    @ResponseBody
    @ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常
    public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
        System.out.println("handleAsyncRequestTimeoutException");
    }
}

服务端接口代码

@Slf4j
@RestController
public class ApolloController {

    // 值为List,因为监视同一个名称空间的长轮询可能有N个(毕竟可能有多个客户端用同一份配置嘛)
    private static Map<String, List<DeferredResult<String>>> watchRequests = new ConcurrentHashMap<>();

    @GetMapping(value = "/all/watchrequests")
    public Object getWatchRequests() {
        return watchRequests;
    }

    // 模拟长轮询:apollo客户端来监听配置文件的变更~  可以指定namespace 监视指定的NameSpace
    @GetMapping(value = "/watch/{namespace}")
    public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
        log.info("Request received,namespace is" + namespace + ",当前时间:" + System.currentTimeMillis());

        DeferredResult<String> deferredResult = new DeferredResult<>();

        //当deferredResult完成时(不论是超时还是异常还是正常完成),都应该移除watchRequests中相应的watch key
        deferredResult.onCompletion(() -> {
            log.info("onCompletion,移除对namespace:" + namespace + "的监视~");
            List<DeferredResult<String>> list = watchRequests.get(namespace);
            list.remove(deferredResult);
            if (list.isEmpty()) {
                watchRequests.remove(namespace);
            }
        });
        
        // 超时
        deferredResult.onTimeout(() -> {
            log.info("调用超时");
        });

       // 发送异常
       deferredResult.onError((Throwable t) -> {
            deferredResult.setErrorResult(
                    ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                            .body("An error occurred."));
        });

        List<DeferredResult<String>> list = watchRequests.computeIfAbsent(namespace, (k) -> new ArrayList<>());
        list.add(deferredResult);
        return deferredResult;
    }

    //模拟发布namespace配置:修改配置
    @GetMapping(value = "/publish/{namespace}")
    public void publishConfig(@PathVariable("namespace") String namespace) {
        //do Something for update config

        if (watchRequests.containsKey(namespace)) {
            List<DeferredResult<String>> deferredResults = watchRequests.get(namespace);

            //通知所有watch这个namespace变更的长轮训配置变更结果
            for (DeferredResult<String> deferredResult : deferredResults) {
                deferredResult.setResult(namespace + " changed,时间为" + System.currentTimeMillis());
            }
        }

    }
}

这样子我们就基本模拟了一个长轮询的案例~

测试代码首先先调用http://127.0.0.1:8080/watch/ceshi监听文件内容,前端会一直在等待,http的连接不会断开,而调用http://127.0.0.1:8080/publish/ceshi修改文件内容,上面的接口就会返回数据。

长轮询的应用场景也是很多的,比如我们现在要实现这样一个功能:浏览器要实时展示服务端计算出来的数据。(这个用普通轮询就会有延迟且浪费资源,但是用这种类似长连接的方案就很合适)


ResponseBodyEmitter和SseEmitter

Callback和DeferredResult用于设置单个结果,如果有多个结果需要set返回给客户端时,可以使用SseEmitter以及ResponseBodyEmitter,each object is written with a compatible HttpMessageConverter。返回值可以直接写他们本身,也可以放在ResponseEntity里面

它俩都是Spring4.2之后提供的类。由ResponseBodyEmitterReturnValueHandler负责处理。 这个和Spring5提供的webFlux技术已经很像了,后续讲到的时候还会提到他们
Emitter:发射器

它们的使用方式几乎同:DeferredResult,这里我只把官方的例子拿出来你就懂了

案例

@RequestMapping("/async/responseBodyEmitter")
public ResponseBodyEmitter responseBodyEmitter(){
    ResponseBodyEmitter responseBodyEmitter=new ResponseBodyEmitter();
 
    Executors.newSingleThreadExecutor().submit(() -> {
        try {
            responseBodyEmitter.send("demo");
            responseBodyEmitter.send("test");
            responseBodyEmitter.complete();
        } catch (Exception ignore) {}
    });
 
    return responseBodyEmitter;
}

SseEmitter是ResponseBodyEmitter的子类,它提供Server-Sent Events(Sse).服务器事件发送是”HTTP Streaming”的另一个变种技术.只是从服务器发送的事件按照W3C Server-Sent Events规范来的(推荐使用) 它的使用方式上,完全同上

Server-Sent Events这个规范能够来用于它们的预期使用目的:就是从server发送events到clients(服务器推).在Spring MVC中可以很容易的实现.仅仅需要返回一个SseEmitter类型的值.

向这种场景在在线游戏、在线协作、金融领域等等都有很好的应用。当然,如果你对稳定性什么的要求都非常高,官方也推荐最好是使用WebSocket来实现~

ResponseBodyEmitter允许通过HttpMessageConverter把发送的events写到对象到response中.这可能是最常见的情况。例如写JSON数据

可是有时候它被用来绕开message转换直接写入到response的OutputStream。例如文件下载.这样可以通过返回StreamingResponseBody类型的值做到.

sse 规范

sse websocket
http 协议 独立的 websocket 协议
轻量,使用简单 相对复杂
默认支持断线重连 需要自己实现断线重连
文本传输 二进制传输
支持自定义发送的消息类型 –

开启长连接 + 流方式传递

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

案例

SpringBoot 利用 SseEmitter 来支持 sse,可以说非常简单了,直接返回SseEmitter对象即可

@RestController
@RequestMapping(path = "sse2")
public class SseRest {
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
    @GetMapping(path = "subscribe")
    public SseEmitter push(String id) {
        // 超时时间设置为1小时
        SseEmitter sseEmitter = new SseEmitter(3600_000L);
        sseCache.put(id, sseEmitter);
        sseEmitter.onTimeout(() -> sseCache.remove(id));
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }

    @GetMapping(path = "push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.send(content);
        }
        return "over";
    }

    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.complete();
            sseCache.remove(id);
        }
        return "over";
    }
}

第一个接口是订阅数据,第二个接口是数据推送

可以直接通过接口来调用。

上面的实现,用到了 SseEmitter 的几个方法,解释如下

  • send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中
  • complete(): 表示执行完毕,会断开连接
  • onTimeout(): 超时回调触发
  • onCompletion(): 结束之后的回调触发

写一个简单的 html 消费端,用来演示一下完整的 sse 的更多特性

sse机制不同于传统的“请求-响应”模型,在前端必须使用新建的 EventSource 对象请求一个sse,然后监听此对象的message事件以接收后端推送的值

<!doctype html>
<html lang="en">
<head>
  <title>Sse测试文档</title>
</head>
<body>
<div>sse测试</div>
<div id="result"></div>
</body>
</html>
<script>
  var source = new EventSource('http://localhost:8080/sse/subscribe?id=yihuihui');
  source.onmessage = function (event) {
    text = document.getElementById('result').innerText;
    text += '\n' + event.data;
    document.getElementById('result').innerText = text;
  };
  <!-- 添加一个开启回调 -->
  source.onopen = function (event) {
    text = document.getElementById('result').innerText;
    text += '\n 开启: ';
    console.log(event);
    document.getElementById('result').innerText = text;
  };
</script>

将上面的 html 文件放在项目的resources/static目录下;然后修改一下前面的SseRest

@Controller
@RequestMapping(path = "sse")
public class SseRest {
    @GetMapping(path = "")
    public String index() {
        return "index.html";
    }

    @ResponseBody
    @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter push(String id) {
        // 超时时间设置为3s,用于演示客户端自动重连
        SseEmitter sseEmitter = new SseEmitter(1_000L);
        // 设置前端的重试时间为1s
        sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
        sseCache.put(id, sseEmitter);
        System.out.println("add " + id);
        sseEmitter.onTimeout(() -> {
            System.out.println(id + "超时");
            sseCache.remove(id);
        });
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }
}

我们上面超时时间设置的比较短,用来测试下客户端的自动重连,如下,开启的日志不断增加

其次将 SseEmitter 的超时时间设长一点,可以再试一下数据推送功能。

StreamingResponseBody (很方便的文件下载)

它用于直接将结果写出到Response的OutputStream中; 如文件下载等

接口源码非常简单:

@FunctionalInterface
public interface StreamingResponseBody {
	void writeTo(OutputStream outputStream) throws IOException;
}

案例

@RequestMapping("/async/streamingResponseBody")
public StreamingResponseBody streamingResponseBody(){
    StreamingResponseBody streamingResponseBody = outputStream -> {
        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                outputStream.write("<html>streamingResponseBody</html>".getBytes());
            } catch (IOException ignore) {}
        });
    };
    return streamingResponseBody;
}

异步优化

Spring内部默认不使用线程池处理的(通过源码分析后面我们是能看到的),为了提高处理的效率,我们可以自己优化,建议自己在配置里注入一个线程池供给使用,参考如下:

	// 提供一个mvc里专用的线程池。。。  这是全局的方式~~~~
    @Bean
    public ThreadPoolTaskExecutor mvcTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setQueueCapacity(100);
        executor.setMaxPoolSize(25);
        return executor;
    }

// 最优解决方案不是像上面一样配置通用的,而是配置一个单独的专用的,如下~~~~
@Configuration
@EnableWebMvc
public class WebMvcConfig extends WebMvcConfigurerAdapter {

	// 配置异步支持~~~~
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
    	// 设置一个用于异步执行的执行器~~~AsyncTaskExecutor
        configurer.setTaskExecutor(mvcTaskExecutor());
        configurer.setDefaultTimeout(60000L);
    }
}

总的来说,Spring MVC提供的便捷的异步支持,能够大大的提高Tomcat容器等的性能。同时也给我们的应用提供了更多的便利。这也为Spring5以后的Reactive编程模型提供了有利的支持和保障。

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

1 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 593513浏览

随机文章
SpringBoot—SpringSecurity(基于数据库的动态权限配置)
5年前
整合SSM框架
5年前
SpringBoot—路径映射
5年前
Java—并发编程(六)JUC锁 – (7)共享锁和ReentrantReadWriteLock
4年前
SpringBoot—通过CORS解决跨域问题
5年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 593513 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付