阅读完需:约 6 分钟
我们日常使用的 SpringMVC,基本上都不是异步 Servlet,而学习 WebFlux,异步 Servlet 是基础。
什么是异步 Servlet
先来说说什么是非异步 Servlet。
在 Servlet3.0 之前,Servlet 采用 Thread-Per-Request 的方式处理 Http 请求,即每一次请求都是由某一个线程从头到尾负责处理。
如果一个请求需要进行 IO 操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待 IO 操作完成, 而 IO 操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,如果并发量很大的话,那肯定会造性能问题。
传统的 MVC 框架如 SpringMVC 也无法摆脱 Servlet 的桎梏,原因很简单,他们都是基于 Servlet 来实现的。如 SpringMVC 中大家所熟知的 DispatcherServlet
为了解决这一问题,Servlet3.0 中引入了异步 Servlet,然后在 Servlet3.1 中又引入了非阻塞 IO 来进一步增强异步处理的性能。
如果我们要使用异步 Servlet,Tomcat 至少要 7.0 以上的版本;如果你还想体验一把非阻塞 IO,那么 Tomcat 至少要 8.0 以上。
同步 Servlet
@WebServlet(urlPatterns = "/sync")
public class SyncServlet extends HttpServlet {
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long start = System.currentTimeMillis();
printLog(request, response);
System.out.println("总耗时:" + (System.currentTimeMillis() - start));
}
private void printLog(HttpServletRequest request, HttpServletResponse response) throws IOException {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
response.getWriter().write("ok");
}
}
这个 Servlet 大家再熟悉不过了。
前端请求到达后,我们调用 printLog 方法做一些处理,同时把 doGet 方法执行耗时打印出来。
在 printLog 中,我们先休息 3s,然后给前端返回一个字符串给前端。
前端发送请求,最终 doGet 方法中耗时 3001 毫秒。
这是我们大家熟知的同步 Servlet。在整个请求处理过程中,请求会一直占用 Servlet 线程,直到一个请求处理完毕这个线程才会被释放。
接下来我们对其稍微进行改造,使之变为一个异步 Servlet。
有人可能会说,异步有何难?直接把 printLog 方法扔到子线程里边去执行不就行了?但是这样会有另外一个问题,子线程里边没有办法通过 HttpServletResponse
直接返回数据,所以我们一定需要 Servlet 的异步支持,有了异步支持,才可以在子线程中返回数据。
异步 Servlet
@WebServlet(urlPatterns = "/async",asyncSupported = true)
public class AsyncServlet extends HttpServlet {
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long start = System.currentTimeMillis();
AsyncContext asyncContext = request.startAsync();
CompletableFuture.runAsync(() -> printLog(asyncContext,asyncContext.getRequest(),asyncContext.getResponse()));
System.out.println("总耗时:" + (System.currentTimeMillis() - start));
}
private void printLog(AsyncContext asyncContext, ServletRequest request, ServletResponse response){
try {
Thread.sleep(3000);
response.getWriter().write("ok");
asyncContext.complete();
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
这里的改造主要有如下几方面:
-
@WebServlet
注解上添加asyncSupported
属性,开启异步支持。 - 调用
request.startAsync();
方法开启异步上下文。 - 通过 JDK8 中的
CompletableFuture.runAsync
方法来启动一个子线程(当然也可以自己 new 一个子线程)。 - 调用 printLog 方法时的 request 和 response 重新构造,直接从
asyncContext
中获取,注意,这点是【关键】。 - 在 printLog 方法中,方法执行完成后,调用
asyncContext.complete()
方法通知异步上下文请求处理完毕。
经过上面的改造之后,现在的控制台打印出来的总耗时几乎可以忽略不计了。
也就是说,有了异步 Servlet 之后,后台 Servlet 的线程会被及时释放,释放之后又可以去接收新的请求,进而提高应用的并发能力。
Servlet3的异步使用示例
AsyncLongRunningServlet.java
处理Servlet请求,并开启异步
package com.test.servlet3;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
@WebServlet(urlPatterns = "/AsyncLongRunningServlet", asyncSupported = true)
public class AsyncLongRunningServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long startTime = System.currentTimeMillis();
System.out.println("AsyncLongRunningServlet Start::Name="
+ Thread.currentThread().getName() + "::ID="
+ Thread.currentThread().getId());
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String time = request.getParameter("time");
int secs = Integer.valueOf(time);
// max 10 seconds
if (secs > 10000)
secs = 10000;
AsyncContext asyncCtx = request.startAsync();
asyncCtx.addListener(new AppAsyncListener());
asyncCtx.setTimeout(9000);//异步servlet的超时时间,异步Servlet有对应的超时时间,如果在指定的时间内没有执行完操作,response依然会走原来Servlet的结束逻辑,后续的异步操作执行完再写回的时候,可能会遇到异常。
ThreadPoolExecutor executor = (ThreadPoolExecutor) request
.getServletContext().getAttribute("executor");
executor.execute(new AsyncRequestProcessor(asyncCtx, secs));
long endTime = System.currentTimeMillis();
System.out.println("AsyncLongRunningServlet End::Name="
+ Thread.currentThread().getName() + "::ID="
+ Thread.currentThread().getId() + "::Time Taken="
+ (endTime - startTime) + " ms.");
}
}
AppAsyncListener.java
异步监听器
package com.test.servlet3;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebListener;
import java.io.IOException;
import java.io.PrintWriter;
@WebListener
public class AppAsyncListener implements AsyncListener {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
System.out.println("AppAsyncListener onComplete");
// we can do resource cleanup activity here
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
System.out.println("AppAsyncListener onError");
//we can return error response to client
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
System.out.println("AppAsyncListener onStartAsync");
//we can log the event here
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
System.out.println("AppAsyncListener onTimeout");
//we can send appropriate response to client
ServletResponse response = asyncEvent.getAsyncContext().getResponse();
PrintWriter out = response.getWriter();
out.write("TimeOut Error in Processing");
}
}
AppContextListener.java
Servlet上下文监听器,可以在里面初始化业务线程池
package com.test.servlet3;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 在监听中初始化线程池
*/
@WebListener
public class AppContextListener implements ServletContextListener {
public void contextInitialized(ServletContextEvent servletContextEvent) {
// create the thread pool
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 50000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100));
servletContextEvent.getServletContext().setAttribute("executor",
executor);
}
public void contextDestroyed(ServletContextEvent servletContextEvent) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) servletContextEvent
.getServletContext().getAttribute("executor");
executor.shutdown();
}
}
AsyncRequestProcessor.java
业务工作线程
package com.test.servlet3;
import javax.servlet.AsyncContext;
import java.io.IOException;
import java.io.PrintWriter;
/**
* 业务工作线程
*/
public class AsyncRequestProcessor implements Runnable {
private AsyncContext asyncContext;
private int secs;
public AsyncRequestProcessor() {
}
public AsyncRequestProcessor(AsyncContext asyncCtx, int secs) {
this.asyncContext = asyncCtx;
this.secs = secs;
}
@Override
public void run() {
System.out.println("Async Supported? "
+ asyncContext.getRequest().isAsyncSupported());
longProcessing(secs);
try {
PrintWriter out = asyncContext.getResponse().getWriter();
out.write("Processing done for " + secs + " milliseconds!!");
} catch (IOException e) {
e.printStackTrace();
}
//complete the processing
asyncContext.complete();
}
private void longProcessing(int secs) {
// wait for given time before finishing
try {
Thread.sleep(secs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}