阅读完需:约 39 分钟
前置概念
虽然现在看看Spring的异步模式看起来有点老,毕竟现在都Spring5的时代了,甚至将来肯定是webflux
的时代,但是不可否认的是现在Servlet
仍然是主流,目前都是Servlet3.0
看起来是很老的技术,但是很多人还是没了解过,毕竟做业务也不是非它不可。
- Spring MVC足够优秀,封装得我们现在处理业务请求只需要面向JavaBean去编程即可,没必要再去了解Servlet底层的细节
- Servlet源生的API在Spring MVC的环境下,使用场景已经非常的少了。甚至给我们一种错觉:servlet技术已经淡化了大众的视野
- Spring MVC的异步模式多多少少都会增加使用的复杂度,从而增加犯错的概率。而它的同步模式可以说是能够满足现在绝大部分的使用场景(大不了觉得性能不够了,就加机器嘛,很少会从代码的本身去考虑和优化性能),所以确实没使用过也是在情理之中的
说到Spring的异步模式,还有一个点比较关键就是响应式编程(Reactive Programming)。
自从Spring5开始,出来了一个webflux
,基于Netty
开发的框架,补上了Spring响应式编程的遗憾,像SpringCloudGateway
就是基于webflux
开发。
Spring5.0
以后,它对servlet
不再强依赖,而是变为了可选依赖。另外一个选择还可以是:Reactive
编程,至此SpringBoot
可以分为Servlet
体系与Reactive
体系。
可以扩展一下
初始化选择 web 容器
Spring Boot 中初始化哪种 web 容器的选择原理,首先第一步是根据类是否存在确定是哪种 WebApplicationType
:
public enum WebApplicationType {
/**
* 没有 web 服务,不需要 web 容器
*/
NONE,
/**
* 使用基于 servlet 的 web 容器
*/
SERVLET,
/**
* 使用响应式的 web 容器
*/
REACTIVE;
private static final String[] SERVLET_INDICATOR_CLASSES = { "javax.servlet.Servlet",
"org.springframework.web.context.ConfigurableWebApplicationContext" };
private static final String WEBMVC_INDICATOR_CLASS = "org.springframework.web.servlet.DispatcherServlet";
private static final String WEBFLUX_INDICATOR_CLASS = "org.springframework.web.reactive.DispatcherHandler";
private static final String JERSEY_INDICATOR_CLASS = "org.glassfish.jersey.servlet.ServletContainer";
private static final String SERVLET_APPLICATION_CONTEXT_CLASS = "org.springframework.web.context.WebApplicationContext";
private static final String REACTIVE_APPLICATION_CONTEXT_CLASS = "org.springframework.boot.web.reactive.context.ReactiveWebApplicationContext";
static WebApplicationType deduceFromClasspath() {
if (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null)
&& !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null)) {
return WebApplicationType.REACTIVE;
}
for (String className : SERVLET_INDICATOR_CLASSES) {
if (!ClassUtils.isPresent(className, null)) {
return WebApplicationType.NONE;
}
}
return WebApplicationType.SERVLET;
}
从源码中可以看出,当有 WEBFLUX_INDICATOR_CLASS
并且没有 WEBMVC_INDICATOR_CLASS
以及 JERSEY_INDICATOR_CLASS
的时候,判断为 REACTIVE
环境。如果所有 SERVLET_INDICATOR_CLASSES
就认为是 SERVLET
环境。其实这样也可以看出,如果又引入 spring-web 又引入 spring-webflux 的依赖,其实还是 SERVLET 环境。如果以上都没有,那么就是无 web 容器的环境。在 Spring-Cloud-Gateway 中,是 REACTIVE
环境。
如果是 REACTIVE 环境,就会使用 org.springframework.boot.web.reactive.server.ReactiveWebServerFactory
的实现 Bean 创建 web 容器。那么究竟是哪个实现呢?目前有四个实现(Spring-boot 2.7.x)
-
TomcatReactiveWebServerFactory
:基于 Tomcat 的响应式 web 容器 Factory -
JettyReactiveWebServerFactory
:基于 Jetty 的响应式 web 容器 Factory -
UndertowReactiveWebServerFactory
:基于 Undertow 的响应式 web 容器Factory -
NettyReactiveWebServerFactory
:基于 Netty 的响应式 web 容器 Factory
实际会用哪个,看到底哪个 Bean 会注册到 ApplicationContext
中:
ReactiveWebServerFactoryConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ HttpServer.class })
static class EmbeddedNetty {
@Bean
@ConditionalOnMissingBean
ReactorResourceFactory reactorServerResourceFactory() {
return new ReactorResourceFactory();
}
@Bean
NettyReactiveWebServerFactory nettyReactiveWebServerFactory(ReactorResourceFactory resourceFactory,
ObjectProvider<NettyRouteProvider> routes, ObjectProvider<NettyServerCustomizer> serverCustomizers) {
NettyReactiveWebServerFactory serverFactory = new NettyReactiveWebServerFactory();
serverFactory.setResourceFactory(resourceFactory);
routes.orderedStream().forEach(serverFactory::addRouteProviders);
serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
return serverFactory;
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ org.apache.catalina.startup.Tomcat.class })
static class EmbeddedTomcat {
@Bean
TomcatReactiveWebServerFactory tomcatReactiveWebServerFactory(
ObjectProvider<TomcatConnectorCustomizer> connectorCustomizers,
ObjectProvider<TomcatContextCustomizer> contextCustomizers,
ObjectProvider<TomcatProtocolHandlerCustomizer<?>> protocolHandlerCustomizers) {
TomcatReactiveWebServerFactory factory = new TomcatReactiveWebServerFactory();
factory.getTomcatConnectorCustomizers()
.addAll(connectorCustomizers.orderedStream().collect(Collectors.toList()));
factory.getTomcatContextCustomizers()
.addAll(contextCustomizers.orderedStream().collect(Collectors.toList()));
factory.getTomcatProtocolHandlerCustomizers()
.addAll(protocolHandlerCustomizers.orderedStream().collect(Collectors.toList()));
return factory;
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ org.eclipse.jetty.server.Server.class, ServletHolder.class })
static class EmbeddedJetty {
@Bean
@ConditionalOnMissingBean
JettyResourceFactory jettyServerResourceFactory() {
return new JettyResourceFactory();
}
@Bean
JettyReactiveWebServerFactory jettyReactiveWebServerFactory(JettyResourceFactory resourceFactory,
ObjectProvider<JettyServerCustomizer> serverCustomizers) {
JettyReactiveWebServerFactory serverFactory = new JettyReactiveWebServerFactory();
serverFactory.getServerCustomizers().addAll(serverCustomizers.orderedStream().collect(Collectors.toList()));
serverFactory.setResourceFactory(resourceFactory);
return serverFactory;
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(ReactiveWebServerFactory.class)
@ConditionalOnClass({ Undertow.class })
static class EmbeddedUndertow {
@Bean
UndertowReactiveWebServerFactory undertowReactiveWebServerFactory(
ObjectProvider<UndertowBuilderCustomizer> builderCustomizers) {
UndertowReactiveWebServerFactory factory = new UndertowReactiveWebServerFactory();
factory.getBuilderCustomizers().addAll(builderCustomizers.orderedStream().collect(Collectors.toList()));
return factory;
}
}
从原码可以看出,每种配置上都有 @ConditionalOnMissingBean(ReactiveWebServerFactory.class)
以及判断是否有对应容器的 class 的条件,例如:@ConditionalOnClass({ Undertow.class })
,@Configuration(proxyBeanMethods = false)
是关闭这个配置中 Bean 之间的代理加快加载速度。
由于每个配置都有 @ConditionalOnMissingBean(ReactiveWebServerFactory.class)
,那么其实能保证就算满足多个配置的条件,最后也只有一个 ReactiveWebServerFactory,那么当满足多个条件时,哪个优先加载呢?这就要看这里的源码:
ReactiveWebServerFactoryAutoConfiguration
@Import({ ReactiveWebServerFactoryAutoConfiguration.BeanPostProcessorsRegistrar.class,
ReactiveWebServerFactoryConfiguration.EmbeddedTomcat.class,
ReactiveWebServerFactoryConfiguration.EmbeddedJetty.class,
ReactiveWebServerFactoryConfiguration.EmbeddedUndertow.class,
ReactiveWebServerFactoryConfiguration.EmbeddedNetty.class })
从这里可以看出,是按照 EmbeddedTomcat
,EmbeddedJetty
,EmbeddedUndertow
,EmbeddedNetty
的顺序 Import 的,也就是:只要你的依赖中加入了任何 Web 容器(例如 Undertow),那么最后创建的就是基于那个 web 容器的异步容器,而不是基于 netty 的。
回到原点,讲讲原本的MVC模式
Spring MVC的同步模式
要知道什么是异步模式,就先要知道什么是同步模式。
建议可以看一下补充内容
浏览器发起请求,Web服务器开一个线程处理(请求处理线程
),处理完把处理结果返回浏览器。这就是同步模式。绝大多数Web服务器都如此般处理。

此处需要明晰一个概念:比如tomcat,它既是一个web服务器,同时它也是个servlet后端容器(调Java后端服务),所以要区分清楚这两个概念。请求处理线程是有限的,宝贵的资源~(注意它和处理线程的区别)
tomcat默认的线程池容量是200,这就是请求处理线程,而这程序中我们可以自定义线程池去执行任务这就是处理线程。
- 请求发起者发起一个request,然后会一直等待一个response,这期间它是阻塞的
- 请求处理线程会在Call了之后等待Return,
自身处于阻塞状态
(这个很关键) - 然后都等待return,知道处理线程全部完事后返回了,然后把response反给调用者就算全部结束了
问题在哪里?
绝大部分情况下,这样是没有问题的。因为
第一:高并发、高流量的场景放眼中国的公司,占比也是非常少的。
第二:长时间处理服务这种情况也是少之又少的
所以两者结合起来,场景就更加稀少了。相信这就是为什么好多做开发N年了的,却还不知道Servlet和Spring MVC的异步模式的原因吧。
Tomcat等应用服务器的连接线程池实际上是有限制的;每一个连接请求都会耗掉线程池的一个连接数;如果某些耗时很长的操作,如对大量数据的查询操作、调用外部系统提供的服务以及一些IO密集型操作等,会占用连接很长时间,这个时候这个连接就无法被释放而被其它请求重用。如果连接占用过多,服务器就很可能无法及时响应每个请求;极端情况下如果将线程池中的所有连接耗尽,服务器将长时间无法向外提供服务!
Spring MVC异步模式
Spring MVC3.2之后支持异步请求,能够在controller中返回一个Callable
或者DeferredResult
。由于Spring MVC的良好封装,异步功能使用起来出奇的简单。
Callable
案例
@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {
@ResponseBody
@GetMapping("/hello")
public Callable<String> helloGet() throws Exception {
System.out.println(Thread.currentThread().getName() + " 主线程start");
Callable<String> callable = () -> {
System.out.println(Thread.currentThread().getName() + " 子子子线程start");
TimeUnit.SECONDS.sleep(5); //模拟处理业务逻辑,话费了5秒钟
System.out.println(Thread.currentThread().getName() + " 子子子线程end");
// 这里稍微小细节一下:最终返回的不是Callable对象,而是它里面的内容
return "hello world";
};
System.out.println(Thread.currentThread().getName() + " 主线程end");
return callable;
}
}
输出结果
http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end
先明细两个概念:
- 请求处理线程:处理线程 属于 web 服务器线程,负责 处理用户请求,采用 线程池 管理。
- 异步线程:异步线程 属于 用户自定义的线程,可采用 线程池管理。
前端页面等待5秒出现结果
注意:异步模式对前端来说,是无感知的,这是后端的一种技术。所以这个和我们自己开启一个线程处理,立马返回给前端是有非常大的不同的,需要注意~
由此我们可以看出,主线程早早就结束了(需要注意,此时还并没有把response返回的,此处一定要注意),真正干事的是子线程(交给TaskExecutor
去处理的,后续分析过程中可以看到),它的大致的一个处理流程图可以如下:

这里能够很直接的看出:我们很大程度上提高了我们请求处理线程
的利用率,从而肯定就提高了我们系统的吞吐量。
异步模式处理步骤概述如下:
- 当Controller返回值是Callable的时候
- Spring就会将Callable交给TaskExecutor去处理(一个隔离的线程池)
- 与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态
- Callable线程处理完成后,Spring MVC讲请求重新派发给容器(注意这里的重新派发,和后面讲的拦截器密切相关)
- 根据Callabel返回结果,继续处理(比如参数绑定、视图解析等等就和之前一样了)~~~~
WebAsyncTask
案例:
官方说如果我们需要超时处理的回调或者错误处理的回调,我们可以使用WebAsyncTask
代替Callable
实际使用中,我并不建议直接使用Callable ,而是使用Spring提供的
WebAsyncTask
代替,它包装了Callable,功能更强大些
@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {
@ResponseBody
@GetMapping("/hello")
public WebAsyncTask<String> helloGet() throws Exception {
System.out.println(Thread.currentThread().getName() + " 主线程start");
Callable<String> callable = () -> {
System.out.println(Thread.currentThread().getName() + " 子子子线程start");
TimeUnit.SECONDS.sleep(5); //模拟处理业务逻辑,话费了5秒钟
System.out.println(Thread.currentThread().getName() + " 子子子线程end");
return "hello world";
};
// 采用WebAsyncTask 返回 这样可以处理超时和错误 同时也可以指定使用的Excutor名称
WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000, callable);
// 注意:onCompletion表示完成,不管你是否超时、是否抛出异常,这个函数都会执行的
webAsyncTask.onCompletion(() -> System.out.println("程序[正常执行]完成的回调"));
// 这两个返回的内容,最终都会放进response里面去===========
webAsyncTask.onTimeout(() -> "程序[超时]的回调");
// 备注:这个是Spring5新增的
webAsyncTask.onError(() -> "程序[出现异常]的回调");
System.out.println(Thread.currentThread().getName() + " 主线程end");
return webAsyncTask;
}
}
由于我们设置了超时时间为3000ms,而业务处理是5s,所以会执行onTimeout
这个回调函数。因此页面是会显示“程序[超时]的回调”这几个字。其执行的过程同Callback。
输出结果
http-nio-8080-exec-1 主线程start
http-nio-8080-exec-1 主线程end
task-1 子子子线程start
程序[正常执行]完成的回调
下面我们简单看看WebAsyncTask
的源码,非常简单,就是个包装:
public class WebAsyncTask<V> implements BeanFactoryAware {
// 正常执行的函数(通过WebAsyncTask的构造函数可以传进来)
private final Callable<V> callable;
// 处理超时时间(ms),可通过构造函数指定,也可以不指定(不会有超时处理)
private Long timeout;
// 执行任务的执行器。可以构造函数设置进来,手动指定。
private AsyncTaskExecutor executor;
// 若设置了,会根据此名称去IoC容器里找这个Bean (和上面二选一)
// 若传了executorName,请务必调用set方法设置beanFactory
private String executorName;
private BeanFactory beanFactory;
// 超时的回调
private Callable<V> timeoutCallback;
// 发生错误的回调
private Callable<V> errorCallback;
// 完成的回调(不管超时还是错误都会执行)
private Runnable completionCallback;
...
// 这是获取执行器的逻辑
@Nullable
public AsyncTaskExecutor getExecutor() {
if (this.executor != null) {
return this.executor;
} else if (this.executorName != null) {
Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
} else {
return null;
}
}
public void onTimeout(Callable<V> callback) {
this.timeoutCallback = callback;
}
public void onError(Callable<V> callback) {
this.errorCallback = callback;
}
public void onCompletion(Runnable callback) {
this.completionCallback = callback;
}
// 最终执行超时回调、错误回调、完成回调都是通过这个拦截器实现的
CallableProcessingInterceptor getInterceptor() {
return new CallableProcessingInterceptor() {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
}
@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
if (completionCallback != null) {
completionCallback.run();
}
}
};
}
}
WebAsyncTask
的异步编程 API。相比于 @Async
注解,WebAsyncTask
提供更加健全的 超时处理 和 异常处理 支持。但是@Async
也有更优秀的地方,就是他不仅仅能用于controller中(任意地方)
在SpringBoot中使用WebAsyncTask作为接口的返回值,在调用接口的时候回提示信息。
An Executor is required to handle java.util.concurrent.Callable return values.
Please, configure a TaskExecutor in the MVC config under "async support".
The SimpleAsyncTaskExecutor currently in use is not suitable under load.
Controller层使用了Callable返回结果,上下文没有配置异步线程池引起的。
Spring 配置在这方面有点令人困惑,因为它需要单独配置 MVC 异步支持,即使用返回 Callable 的控制器处理程序方法,以及使用 @Async 注释的任何 Spring bean 方法。要正确配置它,您可以应用类似下面的配置,记住 AsyncTaskExecutor 配置可能需要修改。
附带说明一下,可能会想简单地使用 @Async 注释您的控制器处理程序方法。这只会产生预期的效果 – 释放 Web 服务器线程 – 触发和忘记操作(此观察基于 Spring Boot 2.1.2,他们可能会在未来解决这个问题)。如果您想利用 Servlet 3.0 异步处理的强大功能,您真的必须使用 Callables 并使用 WebMvcConfigurer 配置它们。
异步配置
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
final Integer AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
@Bean
protected WebMvcConfigurer webMvcConfigurer() {
return new WebMvcConfigurerAdapter() {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(getTaskExecutor());
}
};
}
@Bean
protected ConcurrentTaskExecutor getTaskExecutor() {
return new ConcurrentTaskExecutor(new ThreadPoolExecutor(AVAILABLE_PROCESSORS, 3 * AVAILABLE_PROCESSORS,
5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(200),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()));
}
}
DeferredResult
案例:
DeferredResult
使用方式与Callable
类似,但在返回结果上不一样,它返回的时候实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult
中去。
这个特性非常非常的重要,对后面实现复杂的功能(比如服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等等高级应用)
官方给的Demo如下:

关键在于这里的其他线程是可以通过MQ,定时任务,事件驱动等方式来触发的。
测试案例
@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {
private List<DeferredResult<String>> deferredResultList = new ArrayList<>();
@ResponseBody
@GetMapping("/hello")
public DeferredResult<String> helloGet() throws Exception {
DeferredResult<String> deferredResult = new DeferredResult<>();
//先存起来,等待触发
deferredResultList.add(deferredResult);
return deferredResult;
}
@ResponseBody
@GetMapping("/setHelloToAll")
public void helloSet() throws Exception {
// 让所有hold住的请求给与响应
deferredResultList.forEach(d -> d.setResult("say hello to all"));
}
}
我们第一个请求/hello
,会先deferredResult
存起来,然后前端页面是一直等待(转圈状态)的。直到我发第二个请求:setHelloToAll
,所有的相关页面才会有响应
执行过程
-
controller
返回一个DeferredResult
,我们把它保存到内存里或者List里面(供后续访问) - Spring MVC调用
request.startAsync()
,开启异步处理 - 与此同时将
DispatcherServlet
里的拦截器、Filter
等等都马上退出主线程,但是response
仍然保持打开的状态 - 应用通过另外一个线程(可能是MQ消息、定时任务等)给
DeferredResult
set
值。然后Spring MVC会把这个请求再次派发给servlet容器 -
DispatcherServlet
再次被调用,然后处理后续的标准流程
简单看看源码:
public class DeferredResult<T> {
private static final Object RESULT_NONE = new Object()
// 超时时间(ms) 可以不配置
@Nullable
private final Long timeout;
// 相当于超时的话的,传给回调函数的值
private final Object timeoutResult;
// 这三种回调也都是支持的
private Runnable timeoutCallback;
private Consumer<Throwable> errorCallback;
private Runnable completionCallback;
// 这个比较强大,就是能把我们结果再交给这个自定义的函数处理了 他是个@FunctionalInterface
private DeferredResultHandler resultHandler;
private volatile Object result = RESULT_NONE;
private volatile boolean expired = false;
// 判断这个DeferredResult是否已经被set过了(被set过的对象,就可以移除了嘛)
// 如果expired表示已经过期了你还没set,也是返回false的
// Spring4.0之后提供的
public final boolean isSetOrExpired() {
return (this.result != RESULT_NONE || this.expired);
}
// 没有isSetOrExpired 强大,建议使用上面那个
public boolean hasResult() {
return (this.result != RESULT_NONE);
}
// 还可以获得set进去的结果
@Nullable
public Object getResult() {
Object resultToCheck = this.result;
return (resultToCheck != RESULT_NONE ? resultToCheck : null);
}
public void onTimeout(Runnable callback) {
this.timeoutCallback = callback;
}
public void onError(Consumer<Throwable> callback) {
this.errorCallback = callback;
}
public void onCompletion(Runnable callback) {
this.completionCallback = callback;
}
// 如果你的result还需要处理,可以这是一个resultHandler,会对你设置进去的结果进行处理
public final void setResultHandler(DeferredResultHandler resultHandler) {
Assert.notNull(resultHandler, "DeferredResultHandler is required");
// Immediate expiration check outside of the result lock
if (this.expired) {
return;
}
Object resultToHandle;
synchronized (this) {
// Got the lock in the meantime: double-check expiration status
if (this.expired) {
return;
}
resultToHandle = this.result;
if (resultToHandle == RESULT_NONE) {
// No result yet: store handler for processing once it comes in
this.resultHandler = resultHandler;
return;
}
}
try {
resultHandler.handleResult(resultToHandle);
} catch (Throwable ex) {
logger.debug("Failed to handle existing result", ex);
}
}
// 我们发现,这里调用是private方法setResultInternal,我们设置进来的结果result,会经过它的处理
// 而它的处理逻辑也很简单,如果我们提供了resultHandler,它会把这个值进一步的交给我们的resultHandler处理
// 若我们没有提供此resultHandler,那就保存下这个result即可
public boolean setResult(T result) {
return setResultInternal(result);
}
private boolean setResultInternal(Object result) {
// Immediate expiration check outside of the result lock
if (isSetOrExpired()) {
return false;
}
DeferredResultHandler resultHandlerToUse;
synchronized (this) {
// Got the lock in the meantime: double-check expiration status
if (isSetOrExpired()) {
return false;
}
// At this point, we got a new result to process
this.result = result;
resultHandlerToUse = this.resultHandler;
if (resultHandlerToUse == null) {
this.resultHandler = null;
}
}
resultHandlerToUse.handleResult(result);
return true;
}
// 发生错误了,也可以设置一个值。这个result会被记下来,当作result
// 注意这个和setResult的唯一区别,这里入参是Object类型,而setResult只能set规定的指定类型
// 定义成Obj是有原因的:因为我们一般会把Exception等异常对象放进来。。。
public boolean setErrorResult(Object result) {
return setResultInternal(result);
}
// 拦截器 注意最终finally里面,都可能会调用我们的自己的处理器resultHandler(若存在的话)
// afterCompletion不会调用resultHandler~~~~~~~~~~~~~
final DeferredResultProcessingInterceptor getInterceptor() {
return new DeferredResultProcessingInterceptor() {
@Override
public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) {
boolean continueProcessing = true;
try {
if (timeoutCallback != null) {
timeoutCallback.run();
}
} finally {
if (timeoutResult != RESULT_NONE) {
continueProcessing = false;
try {
setResultInternal(timeoutResult);
} catch (Throwable ex) {
logger.debug("Failed to handle timeout result", ex);
}
}
}
return continueProcessing;
}
@Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) {
try {
if (errorCallback != null) {
errorCallback.accept(t);
}
} finally {
try {
setResultInternal(t);
} catch (Throwable ex) {
logger.debug("Failed to handle error result", ex);
}
}
return false;
}
@Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) {
expired = true;
if (completionCallback != null) {
completionCallback.run();
}
}
};
}
// 内部函数式接口 DeferredResultHandler
@FunctionalInterface
public interface DeferredResultHandler {
void handleResult(Object result);
}
}
DeferredResult
的超时处理,采用委托机制,也就是在实例DeferredResult
时给予一个超时时长(毫秒),同时在onTimeout
中委托(传入)一个新的处理线程(我们可以认为是超时线程);当超时时间到来,DeferredResult
启动超时线程,超时线程处理业务,封装返回数据,给DeferredResult
赋值(正确返回的或错误返回的)
Spring MVC异步模式中使用Filter
和HandlerInterceptor
看到上面的异步访问,不免我们会新生怀疑,若是普通的拦截器HandlerInterceptor
,还生效吗?若生效,效果是怎么样的,现在我们直接看一下吧:(以上面Callable
的Demo为示例)
Filter
// 注意,这里必须开启异步支持asyncSupported = true,否则报错:Async support must be enabled on a servlet and for all filters involved in async request processing
@WebFilter(urlPatterns = "/*", asyncSupported = true)
@Configuration
public class HelloFilter extends OncePerRequestFilter {
@Override
protected void initFilterBean() throws ServletException {
System.out.println("Filter初始化...");
}
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
System.out.println(Thread.currentThread().getName() + "--->" + request.getRequestURI());
filterChain.doFilter(request, response);
}
}
输出结果
http-apr-8080-exec-3--->/hello
http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end
由此可以看出,异步上下文,Filter还是只会被执行一次拦截的,符合我们的预期,所以没什么毛病。
HandlerInterceptor
public class HelloInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
System.out.println(Thread.currentThread().getName() + "---preHandle-->" + request.getRequestURI());
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
System.out.println(Thread.currentThread().getName() + "---postHandle-->" + request.getRequestURI());
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
System.out.println(Thread.currentThread().getName() + "---afterCompletion-->" + request.getRequestURI());
}
}
// 注册拦截器
@Configuration
// @EnableWebMvc //可以不需要配置注解,详情看上面的链接
public class AppConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
// /**拦截所有请求
registry.addInterceptor(new HelloInterceptor()).addPathPatterns("/**");
}
}
输出结果
http-apr-8080-exec-3--->/hello
http-apr-8080-exec-3---preHandle-->/hello
http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end
// 注意 子子子线程处理结束后,再一次触发了preHandle=====
// 此处还要一个细节:这里面的线程既不是子线程,也不是上面的线程 而是新开了一个线程~~~
http-apr-8080-exec-5---preHandle-->/hello
http-apr-8080-exec-5---postHandle-->/hello
http-apr-8080-exec-5---afterCompletion-->/hello
从上面可以看出,如果我们就是普通的Spring MVC的拦截器,preHandler会执行两次,这也符合我们上面分析的处理步骤。所以我们在书写preHandler的时候,一定要特别的注意,要让preHandler即使执行多次,也不要受到影响(幂等)
这里还是要特别注意的是@EnableWebMvc
注解会产生一个警告
!!!
An Executor is required to handle java.util.concurrent.Callable return values.
Please, configure a TaskExecutor in the MVC config under "async support".
The SimpleAsyncTaskExecutor currently in use is not suitable under load.
-------------------------------
Request URI: '/hello'
!!!
这个警告的是表面我们不应在生产中使用默认使用的SimpleAsyncTaskExecutor
,原本SpringBoot的MVC配置是不会使用它的,但是这个注解使得SpringBoot的配置失效,所以我们要手动配置MVC配置。
下面是我们手动配置线程池
/**
* @EnableWebMvc 导致Springboot自带的MVC配置失效,所以要重新配置
*/
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Bean
public ThreadPoolTaskExecutor mvcTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(10);
return taskExecutor;
}
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(mvcTaskExecutor());
}
}
回到正题,异步拦截器有 AsyncHandlerInterceptor、CallableProcessingInterceptor、DeferredResultProcessingInterceptor
Spring MVC给提供了异步拦截器,能让我们更深入的参与进去异步request的生命周期里面去。其中最为常用的为:AsyncHandlerInterceptor
:
public class AsyncHelloInterceptor implements AsyncHandlerInterceptor {
// 这是Spring3.2提供的方法,专门拦截异步请求的方式
@Override
public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
System.out.println(Thread.currentThread().getName() + "---afterConcurrentHandlingStarted-->" + request.getRequestURI());
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
System.out.println(Thread.currentThread().getName() + "---preHandle-->" + request.getRequestURI());
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
System.out.println(Thread.currentThread().getName() + "---postHandle-->" + request.getRequestURI());
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
System.out.println(Thread.currentThread().getName() + "---afterCompletion-->" + request.getRequestURI());
}
}
输出结果
http-apr-8080-exec-3---preHandle-->/hello
http-apr-8080-exec-3 主线程start
http-apr-8080-exec-3 主线程end
// 这里发现,它在主线程结束后,子线程开始之前执行的(线程号还是同一个哦~)
http-apr-8080-exec-3---afterConcurrentHandlingStarted-->/hello
MvcAsync1 子子子线程start
MvcAsync1 子子子线程end
http-apr-8080-exec-6---preHandle-->/hello
http-apr-8080-exec-6---postHandle-->/hello
http-apr-8080-exec-6---afterCompletion-->/hello
AsyncHandlerInterceptor
提供了一个afterConcurrentHandlingStarted()
方法, 这个方法会在Controller
方法异步执行时开始执行, 而Interceptor
的postHandle
方法则是需要等到Controller
的异步执行完才能执行
(比如我们用DeferredResult
的话,afterConcurrentHandlingStarted
是在return的之后执行,而postHandle()
是执行.setResult()
之后执行)
需要说明的是:如果我们不是异步请求,afterConcurrentHandlingStarted
是不会执行的。所以我们可以把它当做加强版的HandlerInterceptor
来用。平时我们若要使用拦截器,建议使用它。(Spring5,JDK8以后,很多的xxxAdapter都没啥用了,直接implements接口就成~)
同样可以注册CallableProcessingInterceptor
或者一个DeferredResultProcessingInterceptor
用于更深度的集成异步request的生命周期
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 注册异步的拦截器、默认的超时时间、任务处理器TaskExecutor等等
//configurer.registerCallableInterceptors();
//configurer.registerDeferredResultInterceptors();
//configurer.setDefaultTimeout();
//configurer.setTaskExecutor();
}
只是一般来说,我们并不需要注册这种精细的拦截器,绝大多数情况下,使用AsyncHandlerInterceptor
是够了的。
(Spring MVC的很多默认设置,请参考WebMvcConfigurationSupport
)
区别使用
我觉得最主要的区别是:DeferredResult
需要自己用线程来处理结果setResult
,而Callable
的话不需要我们来维护一个结果处理线程。
总体来说,Callable的话更为简单,同样的也是因为简单,灵活性不够;
相对地,DeferredResult更为复杂一些,但是又极大的灵活性,所以能实现非常多个性化的、复杂的功能,可以设计高级应用。
有些较常见的场景, Callable
也并不能解决,比如说:我们访问A接口,A接口调用三方的服务,服务回调(注意此处指的回调,不是返回值)
B接口,这种情况就没办法使用Callable了,这个时候可以使用DeferredResult
使用原则:基本上在可以用Callable
的时候,直接用Callable
;而遇到Callable
没法解决的场景的时候,可以尝试使用DeferredResult
。(这里所指的Callable
包括WebAsyncTask
)
进阶用法
DeferredResult
应用

它的优点也是非常明显的,能够实现两个完全不相干的线程间的通信。处理的时候请注意图中标记的线程安全问题
实现长轮询服务端推送消息(long polling)
双向通信的方式
在WebSocket
协议之前(它是2011年发布的),有三种实现双向通信的方式:轮询(polling)、长轮询(long-polling)和iframe流(streaming)。
- 轮询(polling):这个不解释了。优点是实现简单粗暴,后台处理简单。缺点也是大大的,耗流量、耗CPU。。。
- 长轮询(long-polling):长轮询是对轮询的改进版。客户端发送HTTP给服务器之后,看有没有新消息,如果没有新消息,就一直等待(而不是一直去请求了)。当有新消息的时候,才会返回给客户端。 优点是对轮询做了优化,时效性也较好。缺点是:保持连接会消耗资源; 服务器没有返回有效数据,程序超时~~~
-
iframe流(streaming):是在页面中插入一个
隐藏的iframe
,利用其src属性在服务器和客户端之间创建一条长连接,服务器向iframe传输数据(通常是HTML,内有负责插入信息的javascript),来实时更新页面。(个人觉得还不如长轮询) - WebSocket:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。它将TCP的Socket(套接字)应用在了webpage上。 它的有点一大把:支持双向通信,实时性更强;可发送二进制文件;非常节省流量。 但也是有缺点的:浏览器支持程度不一致,不支持断开重连 (最推荐的~~~)
演示案例
了解apollo配置中心的实现原理,apollo的发布配置推送变更消息就是用DeferredResult
实现的。它的大概实现步骤如下:
- apollo客户端会像服务端发送长轮询http请求,超时时间60秒
- 当超时后返回客户端一个304 httpstatus,表明配置没有变更,客户端继续这个步骤重复发起请求
- 当有发布配置的时候,服务端会调用
DeferredResult.setResult
返回200状态码。客户端收到响应结果后,会发起请求获取变更后的配置信息(注意这里是另外一个请求)。
注册拦截器设置过期时间
@Configuration
// @EnableWebMvc // 可以不需要这个注解
public class AppConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 超时时间设置为15s
configurer.setDefaultTimeout(TimeUnit.SECONDS.toMillis(15));
}
}
特定的异常捕获
@Slf4j
@ControllerAdvice
class GlobalControllerExceptionHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码 效果同HttpServletResponse#sendError(int) 但这样更优雅
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常
public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
System.out.println("handleAsyncRequestTimeoutException");
}
}
服务端接口代码
@Slf4j
@RestController
public class ApolloController {
// 值为List,因为监视同一个名称空间的长轮询可能有N个(毕竟可能有多个客户端用同一份配置嘛)
private static Map<String, List<DeferredResult<String>>> watchRequests = new ConcurrentHashMap<>();
@GetMapping(value = "/all/watchrequests")
public Object getWatchRequests() {
return watchRequests;
}
// 模拟长轮询:apollo客户端来监听配置文件的变更~ 可以指定namespace 监视指定的NameSpace
@GetMapping(value = "/watch/{namespace}")
public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
log.info("Request received,namespace is" + namespace + ",当前时间:" + System.currentTimeMillis());
DeferredResult<String> deferredResult = new DeferredResult<>();
//当deferredResult完成时(不论是超时还是异常还是正常完成),都应该移除watchRequests中相应的watch key
deferredResult.onCompletion(() -> {
log.info("onCompletion,移除对namespace:" + namespace + "的监视~");
List<DeferredResult<String>> list = watchRequests.get(namespace);
list.remove(deferredResult);
if (list.isEmpty()) {
watchRequests.remove(namespace);
}
});
// 超时
deferredResult.onTimeout(() -> {
log.info("调用超时");
});
// 发送异常
deferredResult.onError((Throwable t) -> {
deferredResult.setErrorResult(
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("An error occurred."));
});
List<DeferredResult<String>> list = watchRequests.computeIfAbsent(namespace, (k) -> new ArrayList<>());
list.add(deferredResult);
return deferredResult;
}
//模拟发布namespace配置:修改配置
@GetMapping(value = "/publish/{namespace}")
public void publishConfig(@PathVariable("namespace") String namespace) {
//do Something for update config
if (watchRequests.containsKey(namespace)) {
List<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
//通知所有watch这个namespace变更的长轮训配置变更结果
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult(namespace + " changed,时间为" + System.currentTimeMillis());
}
}
}
}
这样子我们就基本模拟了一个长轮询
的案例~
测试代码首先先调用http://127.0.0.1:8080/watch/ceshi
监听文件内容,前端会一直在等待,http的连接不会断开,而调用http://127.0.0.1:8080/publish/ceshi
修改文件内容,上面的接口就会返回数据。
长轮询的应用场景也是很多的,比如我们现在要实现这样一个功能:浏览器要实时展示服务端计算出来的数据。(这个用普通轮询就会有延迟且浪费资源,但是用这种类似长连接
的方案就很合适)
ResponseBodyEmitter和SseEmitter
Callback
和DeferredResult
用于设置单个结果,如果有多个结果需要set返回给客户端时,可以使用SseEmitter
以及ResponseBodyEmitter,each object is written with a compatible HttpMessageConverter
。返回值可以直接写他们本身,也可以放在ResponseEntity
里面
它俩都是Spring4.2之后提供的类。由ResponseBodyEmitterReturnValueHandler
负责处理。 这个和Spring5提供的webFlux技术已经很像了,后续讲到的时候还会提到他们
Emitter:发射器
它们的使用方式几乎同:DeferredResult
,这里我只把官方的例子拿出来你就懂了

案例
@RequestMapping("/async/responseBodyEmitter")
public ResponseBodyEmitter responseBodyEmitter(){
ResponseBodyEmitter responseBodyEmitter=new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
try {
responseBodyEmitter.send("demo");
responseBodyEmitter.send("test");
responseBodyEmitter.complete();
} catch (Exception ignore) {}
});
return responseBodyEmitter;
}
SseEmitter
是ResponseBodyEmitter
的子类,它提供Server-Sent Events(Sse)
.服务器事件发送是”HTTP Streaming
”的另一个变种技术.只是从服务器发送的事件按照W3C Server-Sent Events
规范来的(推荐使用) 它的使用方式上,完全同上
Server-Sent Events
这个规范能够来用于它们的预期使用目的:就是从server发送events到clients(服务器推).在Spring MVC中可以很容易的实现.仅仅需要返回一个SseEmitter
类型的值.
向这种场景在在线游戏、在线协作、金融领域等等都有很好的应用。当然,如果你对稳定性什么的要求都非常高,官方也推荐最好是使用WebSocket
来实现~
ResponseBodyEmitter
允许通过HttpMessageConverter
把发送的events写到对象到response中.这可能是最常见的情况。例如写JSON数据
可是有时候它被用来绕开message
转换直接写入到response的OutputStream。例如文件下载.这样可以通过返回StreamingResponseBody
类型的值做到.
sse 规范
sse | websocket |
---|---|
http 协议 | 独立的 websocket 协议 |
轻量,使用简单 | 相对复杂 |
默认支持断线重连 | 需要自己实现断线重连 |
文本传输 | 二进制传输 |
支持自定义发送的消息类型 | – |
开启长连接 + 流方式传递
Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive
案例
SpringBoot 利用 SseEmitter 来支持 sse,可以说非常简单了,直接返回SseEmitter
对象即可
@RestController
@RequestMapping(path = "sse2")
public class SseRest {
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "subscribe")
public SseEmitter push(String id) {
// 超时时间设置为1小时
SseEmitter sseEmitter = new SseEmitter(3600_000L);
sseCache.put(id, sseEmitter);
sseEmitter.onTimeout(() -> sseCache.remove(id));
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(content);
}
return "over";
}
@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}
}
第一个接口是订阅数据,第二个接口是数据推送
可以直接通过接口来调用。
上面的实现,用到了 SseEmitter 的几个方法,解释如下
-
send()
: 发送数据,如果传入的是一个非SseEventBuilder
对象,那么传递参数会被封装到 data 中 -
complete()
: 表示执行完毕,会断开连接 -
onTimeout()
: 超时回调触发 -
onCompletion()
: 结束之后的回调触发
写一个简单的 html 消费端,用来演示一下完整的 sse 的更多特性
sse机制不同于传统的“请求-响应”模型,在前端必须使用新建的 EventSource
对象请求一个sse
,然后监听此对象的message
事件以接收后端推送的值
<!doctype html>
<html lang="en">
<head>
<title>Sse测试文档</title>
</head>
<body>
<div>sse测试</div>
<div id="result"></div>
</body>
</html>
<script>
var source = new EventSource('http://localhost:8080/sse/subscribe?id=yihuihui');
source.onmessage = function (event) {
text = document.getElementById('result').innerText;
text += '\n' + event.data;
document.getElementById('result').innerText = text;
};
<!-- 添加一个开启回调 -->
source.onopen = function (event) {
text = document.getElementById('result').innerText;
text += '\n 开启: ';
console.log(event);
document.getElementById('result').innerText = text;
};
</script>
将上面的 html 文件放在项目的resources/static
目录下;然后修改一下前面的SseRest
@Controller
@RequestMapping(path = "sse")
public class SseRest {
@GetMapping(path = "")
public String index() {
return "index.html";
}
@ResponseBody
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) {
// 超时时间设置为3s,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(1_000L);
// 设置前端的重试时间为1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {
System.out.println(id + "超时");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
}
我们上面超时时间设置的比较短,用来测试下客户端的自动重连,如下,开启的日志不断增加

其次将 SseEmitter 的超时时间设长一点,可以再试一下数据推送功能。
StreamingResponseBody (很方便的文件下载)
它用于直接将结果写出到Response的OutputStream
中; 如文件下载等

接口源码非常简单:
@FunctionalInterface
public interface StreamingResponseBody {
void writeTo(OutputStream outputStream) throws IOException;
}
案例
@RequestMapping("/async/streamingResponseBody")
public StreamingResponseBody streamingResponseBody(){
StreamingResponseBody streamingResponseBody = outputStream -> {
Executors.newSingleThreadExecutor().submit(() -> {
try {
outputStream.write("<html>streamingResponseBody</html>".getBytes());
} catch (IOException ignore) {}
});
};
return streamingResponseBody;
}
异步优化
Spring内部默认不使用线程池处理的(通过源码分析后面我们是能看到的),为了提高处理的效率,我们可以自己优化,建议自己在配置里注入一个线程池供给使用,参考如下:
// 提供一个mvc里专用的线程池。。。 这是全局的方式~~~~
@Bean
public ThreadPoolTaskExecutor mvcTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setQueueCapacity(100);
executor.setMaxPoolSize(25);
return executor;
}
// 最优解决方案不是像上面一样配置通用的,而是配置一个单独的专用的,如下~~~~
@Configuration
@EnableWebMvc
public class WebMvcConfig extends WebMvcConfigurerAdapter {
// 配置异步支持~~~~
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 设置一个用于异步执行的执行器~~~AsyncTaskExecutor
configurer.setTaskExecutor(mvcTaskExecutor());
configurer.setDefaultTimeout(60000L);
}
}
总的来说,Spring MVC提供的便捷的异步支持,能够大大的提高Tomcat容器等的性能。同时也给我们的应用提供了更多的便利。这也为Spring5以后的Reactive编程模型提供了有利的支持和保障。