阅读完需:约 25 分钟
先聊聊 ForkJoinPool 与 ThreadPoolExecutor的区别。 我们为啥要用 ForkJoinPool ? 相比于我们更常用的 ThreadPoolExecutor ,ForkJoinPool 又能给我们带来什么呢? 带着这样的问题我们来好好聊聊。
异同
继承
首先他们都继承自 AbstractExecutorService
但 ForkJoinPool
并不是为了替代 ThreadPoolExecutor
而产生的,相对来说 ForkJoinPool
是对线程池使用场景和功能上进行了一个补充
public class ForkJoinPool extends AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService

构造函数不同
ThreadPoolExecutor
不是本篇重点,构造函数就不细讲了,相信大家也比较熟悉了。 我们重点说下 ForkJoinPool
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix)
-
parallelism
:并行度(the parallelism level),默认情况下跟我们机器的 CPU 核心数保持一致,使用 Runtime.getRuntime().availableProcessors() 可以得到我们机器运行时可用的 CPU 核心数。 -
factory
:创建新线程的工厂( the factory for creating new threads)。默认情况下使用ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory
。 -
handler
:线程异常情况下的处理器(Thread.UncaughtExceptionHandler handler),在线程执行任务时对由于某些无法预料的错误而导致任务线程中断时,该处理器会进行一些处理,默认情况为 null。 -
asyncMode
:在ForkJoinPool
中,每一个工作线程都有一个独立的任务队列,asyncMode
表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出FIFO,也可以是后进先出 LIFO。如果为 true,则线程池中的工作线程则使用 先进先出方式 进行任务调度,默认情况下是false 也就是默认为 LIFO 后进先出。 -
workerNamePrefix
:顾名思义,工作线程名称前缀 默认为"ForkJoinPool-" + nextPoolId() + "-worker-"
工作模式不同
ForkJoinPool
采用了 一个线程对应专属的一个工作队列,而非 ThreadPoolExecutor
的多个线程对应一个工作队列。
即 线程与工作队列关系 由 多对一 变为 一对一
分治算法与Fork/Join模式
在并发计算中,Fork/Join模式往往用于对大任务的并行计算,它通过递归的方式对任务不断地拆解,再将结果进行合并。如果从其思想上看,Fork/Join并不复杂,其本质是分治算法(Divide-and-Conquer) 的应用。
分治算法的**基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。**分治算法的步骤如下:
- (1)分解:将要解决的问题划分成若干规模较小的同类问题;
- (2)求解:当子问题划分得足够小时,用较简单的方法解决;
- (3)合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。

Fork/Join对任务的拆分和对结果合并过程也是如此,可以用下面伪代码来表示:
solve(problem):
if problem is small enough:
// 如果任务足够小,执行任务
solve problem directly (sequential algorithm)
else:
// 拆分任务
for part in subdivide(problem)
fork subtask to solve(part)
// 合并结果
join all subtasks spawned in previous loop
return combined results
所以,理解Fork/Join模型和ForkJoinPool线程池,首先要理解其背后的算法的目的和思想,因为后文所要详述的ForkJoinPool不过只是这种算法的一种的实现和应用。
Fork/Join应用场景与体验
场景:给定两个自然数,计算两个两个数之间的总和。比如1~n之间的和:1+2+3+…+n
为了解决这个问题,我们创建了TheKingRecursiveSumTask
这个核心类,它继承于RecursiveTask. RecursiveTask
是ForkJoinPool
中的一种任务类型,你暂且不必深入了解它,后文会有详细描述。
TheKingRecursiveSumTask
中定义了任务计算的起止范围(sumBegin
和sumEnd
)和拆分阈值(threshold
),以及核心计算逻辑compute()
.
public class TheKingRecursiveSumTask extends RecursiveTask<Long> {
private static final AtomicInteger taskCount = new AtomicInteger();
private final int sumBegin;
private final int sumEnd;
/**
* 任务拆分阈值,当任务尺寸大于该值时,进行拆分
*/
private final int threshold;
public TheKingRecursiveSumTask(int sumBegin, int sumEnd, int threshold) {
this.sumBegin = sumBegin;
this.sumEnd = sumEnd;
this.threshold = threshold;
}
@Override
protected Long compute() {
if ((sumEnd - sumBegin) > threshold) {
// 两个数之间的差值大于阈值,拆分任务
TheKingRecursiveSumTask subTask1 = new TheKingRecursiveSumTask(sumBegin, (sumBegin + sumEnd) / 2, threshold);
TheKingRecursiveSumTask subTask2 = new TheKingRecursiveSumTask((sumBegin + sumEnd) / 2, sumEnd, threshold);
subTask1.fork();
subTask2.fork();
taskCount.incrementAndGet();
return subTask1.join() + subTask2.join();
}
// 直接执行结果
long result = 0L;
for (int i = sumBegin; i < sumEnd; i++) {
result += i;
}
return result;
}
public static AtomicInteger getTaskCount() {
return taskCount;
}
}
在下面的代码中,我们设置的计算区间值0~10000000,当计算的个数超过100时,将对任务进行拆分,最大并发数设置为16.
public static void main(String[] args) {
int sumBegin = 0, sumEnd = 10000000;
computeByForkJoin(sumBegin, sumEnd);
computeBySingleThread(sumBegin, sumEnd);
}
private static void computeByForkJoin(int sumBegin, int sumEnd) {
ForkJoinPool forkJoinPool = new ForkJoinPool(16);
long forkJoinStartTime = System.nanoTime();
TheKingRecursiveSumTask theKingRecursiveSumTask = new TheKingRecursiveSumTask(sumBegin, sumEnd, 100);
long forkJoinResult = forkJoinPool.invoke(theKingRecursiveSumTask);
System.out.println("======");
System.out.println("ForkJoin任务拆分:" + TheKingRecursiveSumTask.getTaskCount());
System.out.println("ForkJoin计算结果:" + forkJoinResult);
System.out.println("ForkJoin计算耗时:" + (System.nanoTime() - forkJoinStartTime) / 1000000);
}
private static void computeBySingleThread(int sumBegin, int sumEnd) {
long computeResult = 0 L;
long startTime = System.nanoTime();
for (int i = sumBegin; i < sumEnd; i++) {
computeResult += i;
}
System.out.println("======");
System.out.println("单线程计算结果:" + computeResult);
System.out.println("单线程计算耗时:" + (System.nanoTime() - startTime) / 1000000);
}
运行结果如下:
======
ForkJoin任务拆分:131071
ForkJoin计算结果:49999995000000
ForkJoin计算耗时:207
======
单线程计算结果:49999995000000
单线程计算耗时:40
Process finished with exit code 0
从计算结果中可以看到,ForkJoinPool总共进行了131071次的任务拆分,最终的计算结果是49999995000000,耗时207毫秒。
不过,细心的你可能已经发现了,ForkJoin的并行计算的耗时竟然比单程程还慢?并且足足慢了近5倍!先别慌,关于ForkJoin的性能问题,我们会在后文有讲解。
ForkJoinPool设计与源码分析
在Java中,ForkJoinPool是Fork/Join模型的实现,于Java7引入并在Java8中广泛应用。ForkJoinPool允许其他线程向它提交任务,并根据设定将这些任务拆分为粒度更细的子任务,这些子任务将由ForkJoinPool内部的工作线程来并行执行,并且工作线程之间可以窃取彼此之间的任务。
在接口实现和继承关系上,ForkJoinPool和ThreadPoolExecutor类似,都实现了Executor和ExecutorService接口,并继承了AbstractExecutorService抽类。而在任务类型上,ForkJoinPool主要有两种任务类型:RecursiveAction和RecursiveTask
,它们继承于ForkJoinTask
. 相关关系如下图所示:

解读ForkJoinPool的源码并不容易,虽然它的思想较为简单,但在实现上要考虑的显然更多,加上部分代码可读性一般,所以讲解它的全部源码是不现实的,当然也是没必要的。在下文中,我们将主要介绍其核心的任务提交和执行相关的部分源码,其他源码有兴趣的可以自行阅读。
构造ForkJoinPool的几种不同方式
ForkJoinPool中有四个核心参数,用于控制线程池的并行数、工作线程的创建、异常处理和模式指定等。各参数解释如下:
-
int parallelism
:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()
来设置并行级别; -
ForkJoinWorkerThreadFactory factory
:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory. 如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作; -
UncaughtExceptionHandler handler
:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理; -
boolean asyncMode
:从名字上看,你可能会觉得它是异步模式设置,但其实是设置队列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE
. 当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。
围绕上面的四个核心参数,ForkJoinPool提供了三种构造方式,使用时你可以根据需要选择其中的一种。
方式一:默认无参构造
在该构造方式中,你无需设定任何参数。ForkJoinPool将根据当前处理器数量来设置并行数量,并使用默认的线程构造工厂。不推荐。
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
方式二:通过并行数构造
在该构造方式中,你可以指定并行数量,以更有效地平衡处理器数量和负载。建议在设置时,并行级别应低于当前处理器的数量。
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
方式三:自定义全部参数构造
以上两种构造方式都是基于这种构造,它允许你配置所有的核心参数。为了更有效地管理ForkJoinPool,建议你使用这种构造方式。
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
按类型提交不同任务
任务提交是ForkJoinPool的核心能力之一,在提交任务时你有三种选择,如下面表格所示:
从非fork/join线程调用 | 从fork/join调用 | |
---|---|---|
提交异步执行 | execute(ForkJoinTask) | ForkJoinTask.fork() |
等待并获取结果 | invoke(ForkJoinTask) | ForkJoinTask.invoke() |
提交执行获取Future结果 | submit(ForkJoinTask) | ForkJoinTask.fork() (ForkJoinTasks are Futures) |
第一类核心方法:invoke
invoke类型的方法接受ForkJoinTask类型的任务,并在任务执行结束后,返回泛型结果。如果提交的任务是null,将抛出空指针异常。
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
第二类核心方法:execute
execute类型的方法在提交任务后,不会返回结果。另外要注意的是,ForkJoinPool不仅允许提交ForkJoinTask类型任务,还允许提交Callable或Runnable任务,因此你可以像使用现有Executors一样使用ForkJoinPool。
当然,Callable或Runnable类型任务时,将会转换为ForkJoinTask类型,具体可以查看任务提交的相关源码。那么,这类任务和直接提交ForkJoinTask任务有什么区别呢?还是有的。区别在于,由于任务是不可切分的,所以这类任务无法获得任务拆分这方面的效益,不过仍然可以获得任务窃取带来的好处和性能提升。
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
第三类核心方法:submit
submit类型的方法支持三种类型的任务提交:ForkJoinTask类型、Callable类型和Runnable类型。在提交任务后,将返回ForkJoinTask类型的结果。如果提交的任务是null,将抛出空指针异常,并且当任务不能按计划执行的话,将抛出任务拒绝异常。
public < T > ForkJoinTask < T > submit(ForkJoinTask < T > task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
public < T > ForkJoinTask < T > submit(Callable < T > task) {
ForkJoinTask < T > job = new ForkJoinTask.AdaptedCallable < T > (task);
externalPush(job);
return job;
}
public < T > ForkJoinTask < T > submit(Runnable task, T result) {
ForkJoinTask < T > job = new ForkJoinTask.AdaptedRunnable < T > (task, result);
externalPush(job);
return job;
}
public ForkJoinTask < ? > submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask < ? > job;
if (task instanceof ForkJoinTask < ? > ) // avoid re-wrap
job = (ForkJoinTask < ? > ) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
externalPush(job);
return job;
}
ForkJoinTask
ForkJoinTask是ForkJoinPool的核心之一,它是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑,本文前面的示例代码就是通过继承它实现。作为一个抽象类,ForkJoinTask的行为有点类似于线程,但它更为轻量,因为它不维护自己的运行时堆栈或程序计数器等。
在类的设计上,ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future,它们之间的关系如下图所示。

fork与join
fork()
/join()
是ForkJoinTask
甚至是ForkJoinPool
的核心方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
fork-提交任务
fork()
方法用于向当前任务所运行的线程池中提交任务,比如上文示例代码中的subTask1.fork()
. 注意,不同于其他线程池的写法,任务提交由任务自己通过调用fork()
完成,对此不要感觉诧异,fork()
内部会将任务与当前线程进行关联。
从源码中看,如果当前线程是ForkJoinWorkerThread
类型,将会放入该线程的任务队列,否则放入common
线程池的任务队列中。关于common
线程池,后续会有介绍。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
join-获取任务执行结果
前面,你已经知道可以通过fork()
提交任务。那么现在,你则可以通过join()
方法获取任务的执行结果。
调用join()
时,将阻塞当前线程直到对应的子任务完成运行并返回结果。从源码看,join()
的核心逻辑由doJoin()
负责。doJoin()
虽然很短,但可读性较差,阅读时稍微忍一下。
public final V join() {
int s;
// 如果调用doJoin返回的非NORMAL状态,将报告异常
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
// 正常执行结束,返回原始结果
return getRawResult();
}
private int doJoin() {
int s;
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w;
//如果已完成,返回状态
return (s = status) < 0 ? s :
//如果未完成且当前线程是ForkJoinWorkerThread,则从该线程中取出workQueue,并尝试将当前task取出执行。如果执行的结果是完成,则返回状态;否则,使用当前线程池awaitJoin方法进行等待
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread) t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0 L):
//当前线程非ForkJoinWorkerThread,调用externalAwaitDone方法等待
externalAwaitDone();
}
final int doExec() {
int s;
boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
// 执行完成后,将状态设置为NORMAL
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
RecursiveAction与RecursiveTask

在ForkJoinPool中,常用的有两种任务类型:返回结果的和不返回结果的,这方面和ThreadPoolExecutor等线程池是一致的,对应的两个类分别是:RecursiveAction
和RecursiveTask
. 从类图中可以看到,它们均继承于ForkJoinTask.
RecursiveAction
:无结果返回
RecursiveAction
用于递归执行但不需要返回结果的任务,比如下面的排序就是它的典型应用场景。在使用RecursiveAction
时,你需要继承并实现它的核心方法compute()
.
static class SortTask extends RecursiveAction {
final long[] array;
final int lo, hi;
SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
SortTask(long[] array) {
this(array, 0, array.length);
}
// 核心计算方法
protected void compute() {
if (hi - lo < THRESHOLD)
// 直接执行
sortSequentially(lo, hi);
else {
// 拆分任务
int mid = (lo + hi) >>> 1;
invokeAll(new SortTask(array, lo, mid),
new SortTask(array, mid, hi));
merge(lo, mid, hi);
}
}
// implementation details follow:
static final int THRESHOLD = 1000;
void sortSequentially(int lo, int hi) {
Arrays.sort(array, lo, hi);
}
void merge(int lo, int mid, int hi) {
long[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++)
array[j] = (k == hi || buf[i] < array[k]) ?
buf[i++] : array[k++];
}
}
RecursiveTask
:返回结果
RecursiveTask
用于递归执行需要返回结果的任务,比如前面示例代码中的求和或下面这段求斐波拉契数列求和都是它的典型应用场景。在使用RecursiveTask
时,你也需要继承并实现它的核心方法compute()
.
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
ForkJoinTask使用限制
虽然在某些场景下,ForkJoinTask
可以通过任务拆解的方式提高执行效率,但是需要注意的是它并非适合所有的场景。ForkJoinTask在使用时需要谨记一些限制,违背这些限制可能会适得其反甚至引来灾难。
为什么这么说呢?
这是因为,ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。你可以想象,如果大量的任务或被拆分的子任务之间彼此依赖或对外部存在严重阻塞依赖,那将是怎样的画面…用千丝万缕来形容也不为过,外部依赖会带来任务执行和问题排查方面的双重不确定性。
所以,在理想情况下,提交到ForkJoinPool中的任务应避免执行阻塞I/O,以免出现不可控的意外情况。当然,这也并非是绝对的,在必要时你也可以定义和使用可阻塞的ForkJoinTask,只不过你需要付出更多的代价和考虑,使用时应当慎之又慎。
工作队列与任务窃取
前面已经说到,ForkJoinPool
与ThreadPoolExecutor
有个很大的不同之处在于,ForkJoinPool
存在引入了任务窃取设计,它是其性能保证的关键之一。
关于任务窃取,简单来说,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
ForkJoinPool
的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue
实现。其实,这个队列也不是什么神奇之物,它是Deques
的特殊形式,但仅支持三种操作方式:push
、
pop
和
poll
(也称为窃取)。当然,在ForkJoinPool
中,队列的读取有着严格的约束,push
和pop
仅能从其所属线程调用,而poll
则可以从其他线程调用。换句话说,前两个方法是留给自己用的,而第三种方法则是为了方便别人来窃取任务用的。任务窃取的相关过程,可以用下面这幅图来表示,这幅图建议你收藏:

看到这里,不知你是否会有疑问:为什么工作线程总是从自己的头部获取任务?为什么要这样设计?首先处理队列中等待时间较长的任务难道不是更有意义吗?
答案当然不会是“更有意义”。这样做的主要原因是为了提高性能,通过始终选择最近提交的任务,可以增加资源仍分配在CPU缓存中的机会,这样CPU处理起来要快一些。而窃取者之所以从尾部获取任务,则是为了降低线程之间的竞争可能,毕竟大家都从一个部分拿任务,竞争的可能要大很多。
此外,这样的设计还有一种考虑。由于任务是可分割的,那队列中较旧的任务最有可能粒度较大,因为它们可能还没有被分割,而空闲的线程则相对更有“精力”来完成这些粒度较大的任务。
ForkJoinPool监控
对于一个复杂框架来说,实时地了解ForkJoinPool
的内部状态是十分必要的。因此,ForkJoinPool
提供了一些常用方法。通过这些方法,你可以了解当前的工作线程、任务处理等情况。
(1)获取运行状态的线程总数
public int getRunningThreadCount() {
int rc = 0;
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null && w.isApparentlyUnblocked())
++rc;
}
}
return rc;
}
(2)获取活跃线程数量
public int getActiveThreadCount() {
int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
(3)判断ForkJoinPool是否空闲
public boolean isQuiescent() {
return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}
(4)获取任务窃取数量
public long getStealCount() {
AtomicLong sc = stealCounter;
long count = (sc == null) ? 0 L : sc.get();
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
count += w.nsteals;
}
}
return count;
}
(5)获取队列中的任务数量
public long getQueuedTaskCount() {
long count = 0;
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
count += w.queueSize();
}
}
return count;
}
(6)获取已提交的任务数量
public int getQueuedSubmissionCount() {
int count = 0;
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
count += w.queueSize();
}
}
return count;
}
警惕ForkJoinPool#commonPool
在上文中所示的源码中,你可能已经在多处注意到commonPool
的存在。在ForkJoinPool
中,commonPool
是一个共享的、静态的线程池,并且在实际使用时才会进行懒加载,Java8中的CompletableFuture
和并行流(Parallel Streams
)用的就是它。不过,使用CompletableFuture
时你可以指定自己的线程池,但是并行流在使用时却不可以,这也是我们要警惕的地方。为什么这么说呢?
ForkJoinPool
中的commonPool
设计初衷是为了降低线程池的重复创建,让一些任务共用同一个线程池,毕竟创建线程池和创建线程都是昂贵的。然而,凡事都有两面性,commonPool
在某些场景下确实可以达到线程池复用的目的,但是,如果你决定与别人分享自己空间,那么当你想使用它的时候,它可能不再完全属于你。也就是说,当你想用commonPool
时,它可能已经其他任务填满了。
提交到ForkJoinPool
中的任务一般有两类:计算类型和阻塞类型。考虑一个场景,应用中多处都在使用这个共享线程池,有人在某处做了个不当操作,比如往池子里丢入了阻塞型任务,那么结果会怎样?结果当然是,整个线程池都有可能被阻塞!如此,整个应用都面临着被拖垮的风险。看到这里,对于Java8中的并行流的使用,你就应该高度警惕了。
那怎么避免这种情况发生呢?答案是尽量避免使用commonPool
,并且在需要运行阻塞任务时,应当创建独立的线程池,和系统的其他部分保持隔离,以免风险扩散。
ForkJoinPool总结
Fork/Join是一种基于分治算法的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:
- 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
- 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。
在使用ForkJoinPool时,需要特别注意任务的类型是否为纯函数计算类型,也就是这些任务不应该关心状态或者外界的变化,这样才是最安全的做法。如果是阻塞类型任务,那么你需要谨慎评估技术方案。虽然ForkJoinPool也能处理阻塞类型任务,但可能会带来复杂的管理成本。
而在性能方面,要认识到Fork/Join的性能并不是开箱即来,而是需要你去评估和验证一些重要指标,通过数据对比得出最佳结论。
此外,ForkJoinPool虽然提供了commonPool,但出于潜在的风险考虑,不推荐使用或谨慎使用。