ForkJoinPool ⭐


Fork/Join 框架概述

Java 从 JDK 7 开始引入了一个专门为分治型并行计算设计的线程池框架 —— ForkJoinPool。它位于 java.util.concurrent 包下,由并发大师 Doug Lea 设计并实现。与传统的 ThreadPoolExecutor 不同,ForkJoinPool 不是为了处理大量独立的、互不相关的小任务而设计的,它的核心使命是:把一个大任务递归地拆分(Fork)成若干个足够小的子任务,分别计算后再合并(Join)结果

这个思想并不新鲜 —— 它本质上就是算法设计中经典的 Divide and Conquer(分治) 策略。归并排序、快速排序、MapReduce 都是分治思想的典型应用。ForkJoinPool 做的事情,是把这种算法范式提升到了多线程并发执行的层面,并通过一种叫做 Work-Stealing(工作窃取) 的调度算法,让多核 CPU 的利用率达到极致。

为什么需要 ForkJoinPool

在理解 ForkJoinPool 的价值之前,我们先看看用传统 ThreadPoolExecutor 处理分治问题会遇到什么困境。

假设你要对一个包含 1 亿个元素的数组求和。你可能会想:把数组分成 8 段,提交 8 个任务到线程池,每段独立求和,最后汇总。这种方式确实可行,但问题在于——你只做了一层拆分。如果任务的"可拆分性"是递归的、层次更深的呢?比如你需要递归地把每段继续拆分,直到粒度足够小,那么每次递归都会产生新的子任务,而这些子任务需要等待更深层子任务的结果

ThreadPoolExecutor 中,当一个线程在等待子任务完成时,它会阻塞(Block),白白占据线程池中的一个线程槽位。如果递归层数较深,很快所有线程都会陷入等待状态,没有线程去真正执行计算 —— 这就是经典的线程饥饿死锁(Thread Starvation Deadlock)

ForkJoinPool 通过两个关键机制解决了这个问题:

  1. Work-Stealing 调度:每个工作线程拥有自己的双端队列(Deque),空闲线程会主动从其他忙碌线程的队列尾部"偷"任务来执行,最大化 CPU 利用率。
  2. 任务补偿机制:当一个线程因为 join() 而等待子任务结果时,它不会简单地阻塞,而是会尝试从队列中取出其他任务来执行,避免线程空转浪费。

整体架构一览

下面这张架构图展示了 ForkJoinPool 的核心组件及其协作关系:

从图中可以看出整个框架形成了一个循环:应用层提交任务到 ForkJoinPool,工作线程取出任务执行 compute() 方法,compute() 内部可能再次 fork() 出子任务重新进入池中,最后通过 join() 汇聚结果。这种递归提交 + 自动调度的模式,是 ForkJoinPool 区别于普通线程池的根本所在。

与 ThreadPoolExecutor 的本质区别

很多初学者会疑惑:既然都是线程池,ForkJoinPoolThreadPoolExecutor 到底有什么不同?这里做一个系统的对比:

对比维度ThreadPoolExecutorForkJoinPool
任务模型独立任务,互不依赖分治任务,父子依赖关系
队列结构所有线程共享一个阻塞队列每个线程拥有私有双端队列
调度策略线程从共享队列竞争取任务Work-Stealing,空闲线程窃取他人任务
适用场景I/O 密集型、独立请求处理CPU 密集型、递归可拆分计算
阻塞处理线程阻塞就阻塞,无补偿join() 时尝试执行其他任务,减少空闲
默认实例需手动创建ForkJoinPool.commonPool() 全局共享

关键的区别在于队列结构ThreadPoolExecutor 使用单一的共享阻塞队列(如 LinkedBlockingQueue),所有工作线程都从同一个队列头部竞争取任务,高并发下锁竞争严重。而 ForkJoinPool 为每个工作线程分配了一个独立的 双端队列(Deque),线程优先从自己队列的头部取任务(LIFO),只有当自己队列为空时才去其他线程队列的尾部窃取(FIFO)。这种设计大幅降低了线程间的竞争。

commonPool —— 全局共享的默认池

从 JDK 8 开始,ForkJoinPool 提供了一个静态的全局共享实例 ForkJoinPool.commonPool()。Java 的 并行流(Parallel Stream)CompletableFuture 的默认异步执行器都使用这个 common pool。它的默认并行度(parallelism)等于 Runtime.getRuntime().availableProcessors() - 1,也就是 CPU 核心数减一。

Java
// 获取 common pool 实例
ForkJoinPool commonPool = ForkJoinPool.commonPool(); // 全局共享,所有并行流默认使用这个池
 
// 查看默认并行度
int parallelism = commonPool.getParallelism(); // 通常等于 CPU 核心数 - 1
System.out.println("Common pool parallelism: " + parallelism); // 例如 8 核机器输出 7
 
// 也可以通过 JVM 参数修改默认并行度
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=16

这意味着如果你在一个应用中大量使用并行流,又在某个并行流任务里执行了阻塞 I/O 操作(比如 HTTP 调用、数据库查询),你会拖慢所有使用 common pool 的并行计算。这是生产环境中非常常见的性能陷阱。遇到这种情况,推荐为阻塞型任务创建独立的 ForkJoinPool 实例:

Java
// 为特定任务创建独立的 ForkJoinPool,避免污染 common pool
ForkJoinPool customPool = new ForkJoinPool(4); // 指定并行度为 4
customPool.submit(() -> {
    // 这里的并行流操作将使用 customPool 而非 commonPool
    myList.parallelStream()
          .map(item -> blockingApiCall(item)) // 阻塞操作不会影响全局 common pool
          .collect(Collectors.toList());
}).get(); // 等待完成
customPool.shutdown(); // 使用完毕后关闭自定义池

适用场景与不适用场景

适合使用 ForkJoinPool 的场景:

  • CPU 密集型的递归计算:大数组求和、归并排序、矩阵乘法、图像处理中的分块渲染等。
  • 可递归拆分的数据结构遍历:树的遍历、文件系统扫描(计算目录大小)等。
  • 并行流(Parallel Stream):Java 8+ 的并行流底层就是 ForkJoinPool

不适合使用 ForkJoinPool 的场景:

  • I/O 密集型任务:网络请求、磁盘读写等会导致线程长时间阻塞,浪费 ForkJoinPool 的计算线程。这类任务应使用普通线程池或虚拟线程。
  • 任务无法拆分:如果任务本身不具备可分解性,用 ForkJoinPool 没有任何优势。
  • 子任务间有共享可变状态:分治的前提是子任务独立,如果子任务之间需要频繁同步,会严重降低并行效率。

一个直观的类比

为了更好地理解 ForkJoinPool 的工作模式,可以做一个生活化的类比:

想象你是一家餐厅的厨师长,接到一个 200 人的宴会订单(大任务)。你不会一个人做完所有菜,而是把菜单拆分成冷菜组、热菜组、汤品组、甜点组(Fork),每组分配给一位厨师。每位厨师如果觉得自己的工作量还是太大,可以继续拆分给帮厨(递归 Fork)。当某位厨师提前做完了自己的部分,他不会闲着,而是去帮最忙的那位分担一些任务(Work-Stealing)。最后所有菜品汇总上桌(Join),宴会开始。

这就是 ForkJoinPool 的核心哲学:拆分、并行、窃取、合并

Fork/Join 框架的核心 API 总览

在后续章节深入讲解之前,先对核心类做一个概览,建立全局认知:

Java
// === ForkJoinPool:线程池本身 ===
ForkJoinPool pool = new ForkJoinPool();          // 使用默认并行度(CPU核心数)
ForkJoinPool pool2 = new ForkJoinPool(8);        // 指定 8 个工作线程
ForkJoinPool common = ForkJoinPool.commonPool();  // 获取全局共享池
 
// === ForkJoinTask:所有 Fork/Join 任务的抽象基类 ===
// 通常不直接使用,而是使用下面两个子类
 
// === RecursiveTask<V>:有返回值的分治任务 ===
// 必须实现 compute() 方法,返回类型为 V
class SumTask extends RecursiveTask<Long> {
    @Override
    protected Long compute() {
        // 拆分逻辑 + 计算逻辑
        return result; // 返回计算结果
    }
}
 
// === RecursiveAction:无返回值的分治任务 ===
// 必须实现 compute() 方法,返回 void
class SortTask extends RecursiveAction {
    @Override
    protected void compute() {
        // 拆分逻辑 + 原地排序等操作,无需返回值
    }
}
 
// === 核心方法 ===
task.fork();           // 异步提交任务到当前线程的工作队列
Long result = task.join(); // 阻塞等待任务完成并获取结果
pool.invoke(task);     // 提交任务并等待结果(等于 fork + join)
pool.submit(task);     // 异步提交任务,返回 ForkJoinTask(Future)

这些 API 将在后续的 ForkJoinTask使用示例 等章节中配合完整代码深入讲解。此处只需建立一个"全景地图"即可。


📝 练习题

关于 ForkJoinPoolThreadPoolExecutor 的区别,下列说法错误的是:

A. ThreadPoolExecutor 的所有工作线程共享一个任务队列,而 ForkJoinPool 的每个工作线程都有自己的双端队列

B. ForkJoinPool 采用 Work-Stealing 机制,空闲线程可以从其他线程的队列中窃取任务执行

C. ForkJoinPool.commonPool() 的默认并行度等于 CPU 核心数,即 Runtime.getRuntime().availableProcessors()

D. ForkJoinPool 更适合 CPU 密集型的可递归拆分任务,而非 I/O 密集型任务

【答案】 C

【解析】 ForkJoinPool.commonPool() 的默认并行度是 availableProcessors() - 1,而不是 availableProcessors()。之所以减 1,是因为提交任务的调用者线程(main 线程或其他线程)本身也可能参与计算(尤其在使用 invoke() 时),所以 common pool 预留了一个位置。选项 A 准确描述了两者队列结构的差异;选项 B 正确描述了 Work-Stealing 机制;选项 D 正确指出了 ForkJoinPool 的适用场景。因此 C 是错误的说法。


分治思想(Divide and Conquer)

分治思想是计算机科学中最经典、最强大的算法设计范式之一,也是整个 Fork/Join 框架的理论基石。理解分治,才能真正理解 ForkJoinPool 为什么要这样设计、为什么能带来并行加速。

什么是分治

分治(Divide and Conquer)的核心哲学可以浓缩为三个动词:分(Divide)→ 治(Conquer)→ 合(Combine)

  1. Divide(分解):将一个规模为 N 的大问题,拆分成若干个规模更小、结构相同的子问题(Sub-problems)。
  2. Conquer(求解):当子问题的规模缩小到足够简单(即达到 Base Case / 基准条件)时,直接求解;否则继续递归拆分。
  3. Combine(合并):将所有子问题的解逐层向上汇总、合并,最终得到原始问题的完整解。

这个过程天然形成一棵递归树(Recursion Tree)。树的根节点是原始问题,叶子节点是可以直接求解的最小子问题,而中间每一层都在做"拆"和"合"的工作。

分治的经典串行案例:归并排序

在正式进入并行世界之前,我们先用最经典的 Merge Sort(归并排序) 来感受分治的精髓。归并排序是分治思想教科书级的体现——把一个无序数组一分为二,分别排序后再合并。

Java
public class MergeSort {
 
    // 入口方法:对 arr 的 [left, right] 区间进行归并排序
    public static void mergeSort(int[] arr, int left, int right) {
        // Base Case:当区间只剩一个元素(或无效区间),无需排序,直接返回
        if (left >= right) {
            return;
        }
 
        // ① Divide:计算中间位置,将数组一分为二
        int mid = left + (right - left) / 2;
 
        // ② Conquer:递归地对左半部分排序
        mergeSort(arr, left, mid);
 
        // ② Conquer:递归地对右半部分排序
        mergeSort(arr, mid + 1, right);
 
        // ③ Combine:将两个已排序的子数组合并为一个有序数组
        merge(arr, left, mid, right);
    }
 
    // 合并 [left, mid] 和 [mid+1, right] 两个有序子数组
    private static void merge(int[] arr, int left, int mid, int right) {
        // 创建临时数组,用于暂存合并结果
        int[] temp = new int[right - left + 1];
 
        // i 指向左半部分的起始位置
        int i = left;
        // j 指向右半部分的起始位置
        int j = mid + 1;
        // k 指向临时数组的当前写入位置
        int k = 0;
 
        // 依次比较两个子数组的元素,将较小的放入 temp
        while (i <= mid && j <= right) {
            if (arr[i] <= arr[j]) {
                // 左侧元素更小(或相等),写入 temp 并移动左指针
                temp[k++] = arr[i++];
            } else {
                // 右侧元素更小,写入 temp 并移动右指针
                temp[k++] = arr[j++];
            }
        }
 
        // 左半部分可能还有剩余元素,全部追加到 temp
        while (i <= mid) {
            temp[k++] = arr[i++];
        }
 
        // 右半部分可能还有剩余元素,全部追加到 temp
        while (j <= right) {
            temp[k++] = arr[j++];
        }
 
        // 将合并结果从 temp 复制回原数组的对应位置
        System.arraycopy(temp, 0, arr, left, temp.length);
    }
}

从这段代码可以清楚看到分治的三步曲:

阶段对应代码说明
Divideint mid = left + (right - left) / 2找到中点,将问题一分为二
Conquer两次递归调用 mergeSort(...)递归解决左右两个子问题
Combinemerge(arr, left, mid, right)将两个有序子数组合并成一个有序数组

从串行分治到并行分治

上面的归并排序虽然体现了完美的分治结构,但它有一个致命特征——两次递归调用是顺序执行的:先排完左半部分,再排右半部分。在单线程环境下这无可厚非,但仔细想想:左半部分和右半部分的排序是完全独立的,它们之间没有任何数据依赖!

这正是并行化的天然机会。分治思想与并行计算的结合点在于:

如果子问题之间相互独立(Independent Sub-problems),那么它们就可以被分配到不同的线程/处理器上同时执行。

将上面归并排序的串行递归改为并行递归,概念上只需要一步变化:

Java
// 串行版本:顺序执行
mergeSort(arr, left, mid);      // 先执行这个
mergeSort(arr, mid + 1, right); // 等上面完成后才执行这个
 
// 并行版本(伪代码):同时执行
fork(() -> mergeSort(arr, left, mid));      // 异步提交给其他线程
fork(() -> mergeSort(arr, mid + 1, right)); // 异步提交给其他线程
joinAll();  // 等待两个子任务都完成
merge(arr, left, mid, right);  // 合并结果

这就是 Fork/Join 框架 名称的由来:fork 负责将子任务异步提交执行,join 负责等待子任务完成并获取结果。

分治并行化的数学直觉

假设一个问题被均匀地分成 2 个子问题,每层分解的开销忽略不计。在串行模式下,递归树的每一层都必须逐个处理,总时间与树的节点数成正比。而在理想并行模式下,同一层的所有子任务可以同时执行,所以总时间只与树的高度成正比。

对于一个大小为 N 的问题,递归树高度为 log₂N。举一个直观的例子:

数组大小 N串行归并排序 O(N log N)理想并行时间 O(N)*
1,000,000~20,000,000 次操作利用多核大幅缩减
10,000,000~230,000,000 次操作加速比接近核心数

*理想并行归并排序的 work 仍然是 O(N log N),但 span(关键路径)降为 O(N),在 P 个处理器下实际时间约为 O(N log N / P)。

当然,现实中不可能达到理想加速——线程创建与调度有开销、子任务粒度过细反而得不偿失。这也是 Fork/Join 框架引入 阈值(Threshold) 概念的原因:当子问题规模小于某个阈值时,不再继续拆分,而是直接用串行方式处理。这个阈值的选取是性能调优的关键。

分治适用场景的四个特征

并非所有问题都适合用分治来解决(或并行化)。一个问题适合分治并行化,通常需要满足以下四个条件:

下面逐一展开:

① 可分解性(Decomposable):大问题能够按照某种规则(通常是按数据范围或空间划分)拆成若干更小的同构子问题。例如数组求和可以拆成左半段求和 + 右半段求和;矩阵乘法可以按分块拆分。

② 子问题独立性(Independence):这是并行化的前提条件。如果子问题 A 的结果需要依赖子问题 B 的输出,那它们就不能并行执行。归并排序中,左右两半的排序互不干扰,是完美独立的。反例如斐波那契数列 f(n) = f(n-1) + f(n-2),虽然形式上是递归分解,但 f(n-1)f(n-2) 存在大量重叠子问题,更适合动态规划而非分治并行。

③ 可合并性(Combinable):子问题的解必须能够高效地 Combine 成原问题的解。如果合并步骤本身极其昂贵(比如时间复杂度比子问题求解还高),那分治反而会成为负担。

④ Base Case 存在且可直接求解:递归必须有终止条件。对于排序,Base Case 是长度为 1 的数组(天然有序);对于求和,Base Case 可以是单个元素或少于阈值的小数组(直接循环累加)。

分治思想在 Java ForkJoinPool 中的映射

理解了分治的理论后,我们来看它如何精确映射到 Java 的 ForkJoinPool API 上:

分治阶段Java ForkJoinPool 对应说明
Dividecompute() 方法中将任务拆分为子任务手动创建子 ForkJoinTask 对象
ForksubTask.fork()将子任务异步提交到线程池中
Conquer子任务的 compute() 递归执行到达 Base Case 时直接计算
JoinsubTask.join()阻塞等待子任务完成,获取返回值
Combinecompute() 中合并 join() 的结果leftResult + rightResult

下面是一段概念性代码,展示分治三步曲如何在 ForkJoinTask 中体现(后续章节会给出完整可运行示例):

Java
// 继承 RecursiveTask<Long>,表示有返回值的分治任务
public class SumTask extends RecursiveTask<Long> {
 
    // 阈值:当数组片段小于此值时,不再拆分,直接串行计算
    private static final int THRESHOLD = 10_000;
 
    // 待处理的数组
    private final long[] array;
    // 当前任务负责的区间 [start, end)
    private final int start;
    private final int end;
 
    // 构造方法:指定数组和处理范围
    public SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Long compute() {
        int length = end - start;
 
        // ===== Base Case:规模足够小,直接串行求和 =====
        if (length <= THRESHOLD) {
            long sum = 0;
            // 简单循环累加,不再递归拆分
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }
 
        // ===== Divide:将任务一分为二 =====
        int mid = start + length / 2;
        // 创建左半部分子任务
        SumTask leftTask = new SumTask(array, start, mid);
        // 创建右半部分子任务
        SumTask rightTask = new SumTask(array, mid, end);
 
        // ===== Fork:异步提交左子任务到线程池 =====
        leftTask.fork();
 
        // ===== Conquer:当前线程直接计算右子任务(避免浪费当前线程) =====
        long rightResult = rightTask.compute();
 
        // ===== Join:等待左子任务完成并获取结果 =====
        long leftResult = leftTask.join();
 
        // ===== Combine:合并两个子结果 =====
        return leftResult + rightResult;
    }
}

注意代码中的一个重要优化模式:左子任务 fork() 出去让其他线程处理,而右子任务则由当前线程直接 compute()。这样避免了当前线程 fork 完就闲着等待的浪费。这种 "fork 一个,自己算一个" 的模式是 Fork/Join 编程的最佳实践(Best Practice)

分治的粒度控制:Threshold 的艺术

分治并行化中最重要的工程决策之一就是阈值(Threshold) 的设定。阈值太大,并行度不够,无法充分利用多核;阈值太小,任务拆分过细,线程调度和任务管理的开销反而超过了并行带来的收益。

一般经验法则:

  • Doug Lea(Fork/Join 框架的作者)建议:任务数量大约是处理器核心数的 4 倍到 16 倍,这样可以在保证足够并行度的同时避免过度拆分。
  • 对于纯计算型任务(CPU-bound),阈值可以设大一些(如数万到数十万级别)。
  • 对于涉及 I/O 或锁竞争的混合任务,阈值需要根据实际 profiling 来调整。

用一个简单的公式来理解:

Code
建议任务总数 ≈ 可用处理器数 × (4 ~ 16)
建议阈值 ≈ 总数据量 / 建议任务总数

例如,一个 1,000,000 元素的数组在 8 核 CPU 上:建议任务总数 ≈ 8 × 8 = 64,因此阈值 ≈ 1,000,000 / 64 ≈ 15,625。


📝 练习题

关于分治思想在 Fork/Join 框架中的应用,以下说法错误的是?

A. 分治要求子问题之间相互独立,这是并行化的前提条件

B. 在 ForkJoinTask 的 compute() 中,推荐对所有子任务都调用 fork(),然后逐一 join(),这样并行度最高

C. 阈值设置过小会导致任务拆分过细,线程调度开销可能超过并行收益

D. 分治的 Combine 阶段负责将子问题的结果汇总为原问题的最终结果

【答案】 B

【解析】 选项 B 的说法是错误的。在 Fork/Join 编程的最佳实践中,不应该对所有子任务都 fork()。正确的做法是:对 N 个子任务中的 N-1 个调用 fork() 异步提交,而最后一个子任务由当前线程直接调用 compute() 来执行。如果所有子任务都 fork 出去,当前线程就会空闲地等待(阻塞在 join() 上),白白浪费一个线程资源。以二分情况为例,应该 leftTask.fork() + rightTask.compute() + leftTask.join(),而不是两个都 fork 再两个都 join。选项 A、C、D 的描述均正确。


工作窃取(Work-Stealing)⭐

在传统的线程池模型(如 ThreadPoolExecutor)中,所有线程共享一个全局的任务队列(Shared BlockingQueue)。当并发量极高时,多个线程同时争抢队列的锁,形成激烈的 锁竞争(Lock Contention),这将严重拖慢吞吐量。ForkJoinPool 的设计哲学完全不同——它为 每一个工作线程(Worker Thread) 分配了一个 私有的双端队列(Deque),大部分时间线程只访问自己的队列,从根源上消除了全局锁瓶颈。

但问题随之而来:分治法天然会导致任务分配不均。某些子问题递归层次深、计算量大,对应的线程忙得不可开交;而另一些子问题很快就结束了,线程陷入空闲。如果放任不管,就会出现 负载失衡(Load Imbalance)——有的核心 100% 跑满,有的核心却在睡觉,整体 CPU 利用率远低于理想值。

Work-Stealing(工作窃取) 就是为了解决这个问题而生的调度策略。其核心思想只有一句话:

空闲的线程不会干等,而是主动去其他繁忙线程的队列中"偷"任务来执行。

这套机制让 ForkJoinPool 在面对 不规则并行(Irregular Parallelism) 时,依然能够自动地、动态地实现负载均衡,几乎不需要程序员手动干预。下面我们深入拆解其内部构造。


双端队列(Deque)

Work-Stealing 的基石是一种特殊的数据结构—— 双端队列(Double-Ended Queue, Deque)。在 ForkJoinPool 的实现中,每个 ForkJoinWorkerThread 内部都持有一个名为 WorkQueue 的结构,它本质上就是一个基于数组的 无锁双端队列

所谓"双端",是指这个队列 两端都可以进行操作

  • 头部(Top / Head):队列的"顶端",由 拥有者线程自己 进行 push 和 pop。
  • 尾部(Base / Bottom):队列的"底端",由 其他窃取线程 进行 take(steal)。

为什么要用双端队列而不是普通队列?关键在于 减少竞争。请看下面的对比:

特性普通共享队列Work-Stealing Deque
队列归属所有线程共享一个每个线程私有一个
入队/出队端同一端,多线程竞争两端分离,竞争极少
锁策略全局锁或 CAS 高频冲突Owner 端无锁,Steal 端轻量 CAS
缓存友好性差(多核缓存反复失效)好(Owner 操作局部性强)

在 ForkJoinPool 的实际实现中,Owner 线程对 Top 端的 push/pop 操作采用简单的 数组下标自增/自减 + volatile 写,几乎没有同步开销。只有当队列中仅剩最后一个元素时,Owner 的 pop 和 Stealer 的 steal 可能同时触碰同一个位置,此时才需要一次 CAS(Compare-And-Swap) 来仲裁,失败的一方就当队列为空处理。

这种设计的精妙之处在于:99% 的操作都是无竞争的。Owner 在自己的 Top 端忙碌,Stealer 在远处的 Base 端悄悄拿走任务,两者几乎互不干扰。

下面用 Mermaid 图展示每个 Worker Thread 拥有独立 Deque 的整体结构:

接下来用 ASCII 图细致展示 单个 Deque 的内部结构与两端操作方向

Java
/*
 *  WorkQueue (基于数组的双端队列)
 *
 *  ┌───────────────────────────────────────────────────┐
 *  │  array[]  (环形数组,容量为 2 的幂)                  │
 *  │                                                     │
 *  │  Index:   0     1     2     3     4     5     6     │
 *  │         ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐│
 *  │         │     │ T-A │ T-B │ T-C │ T-D │ T-E │     ││
 *  │         └─────┴─────┴─────┴─────┴─────┴─────┴─────┘│
 *  │           ▲                               ▲         │
 *  │           │                               │         │
 *  │         base                             top        │
 *  │      (Stealer 端)                    (Owner 端)     │
 *  │                                                     │
 *  │   ◀── steal() 从 base 端取      push()/pop() ──▶   │
 *  │       (其他空闲线程)              (拥有者线程自己)     │
 *  └───────────────────────────────────────────────────┘
 *
 *  - Owner 线程调用 fork() 时 → push 到 top 端 (top++)
 *  - Owner 线程需要子结果时 → pop 从 top 端取 (top--)
 *  - Stealer 窃取时 → 从 base 端取走最老的任务 (base++)
 *  - 仅当 top == base + 1(剩最后一个元素)时才需要 CAS 竞争
 */

这里有一个非常重要的细节值得深入思考—— 为什么 Owner 从 Top 端操作,而 Stealer 从 Base 端窃取? 原因有二:

第一,LIFO vs FIFO 的语义差异。 Owner 线程以 LIFO(后进先出) 的方式处理自己的任务。在分治递归中,最后被 fork 出来的子任务往往是 粒度最小的,处理起来最快,能迅速返回结果给父任务。这符合递归调用栈的天然顺序,也对 CPU 缓存(Cache Locality) 最友好——刚刚创建的任务数据大概率还热乎乎地躺在 L1/L2 Cache 里。

第二,Stealer 以 FIFO(先进先出)拿走最老的任务。 最早被 push 进队列的任务,通常是 粒度最大的(靠近递归树的根部)。窃取一个大任务,意味着 Stealer 拿回去之后还能继续拆分(fork),自给自足地忙活很久,减少后续再次窃取的频率。如果反过来偷最小的任务,Stealer 瞬间就做完了,又得去偷,频繁窃取反而增加了开销。


空闲线程从其他队列尾部窃取

理解了 Deque 的结构之后,我们来完整地走一遍 Work-Stealing 的运行时流程。

场景设定: 假设 ForkJoinPool 有 4 个 Worker Thread(W0 ~ W3),正在并行执行一个大型分治任务。

阶段一:正常工作。 每个 Worker 从自己 Deque 的 Top 端 pop 任务执行。如果执行过程中 fork 了新的子任务,就 push 回自己 Deque 的 Top 端。此阶段各线程完全独立,零竞争。

阶段二:负载失衡出现。 W2 的任务恰好都是轻量级的,很快全部执行完毕,Deque 变空。而 W0 的 Deque 中还积压着大量任务。

阶段三:窃取发生。 W2 发现自己的 Deque 为空后,不会阻塞等待,而是立即进入 窃取模式(Stealing Mode)。它会 随机选择 一个其他 Worker(比如 W0),然后从 W0 的 Deque 的 Base 端(尾部)偷走一个任务,放到自己的 Deque 中开始执行。

阶段四:自给自足。 由于偷来的任务通常粒度较大,W2 在执行过程中会继续 fork 出子任务,push 到自己的 Deque 里。这样 W2 就不需要频繁地去偷,自己就能忙活一阵子了。

阶段五:重复循环。 如果 W2 又空了,继续偷;如果所有队列都空了,说明整个任务已经完成。

让我们通过一个时序图来直观地展示这个过程:

在实际的 JDK 源码(ForkJoinPool.java)中,窃取逻辑的核心步骤可以简化为以下伪代码:

Java
// === ForkJoinPool 工作窃取核心伪代码 ===
 
// 每个 Worker Thread 的主循环
void runWorker(ForkJoinWorkerThread wt) {
    WorkQueue myQueue = wt.workQueue;   // 获取当前线程的私有双端队列
 
    while (!isTerminated()) {           // 线程池未关闭就一直循环
        ForkJoinTask<?> task;
 
        // 第一步:尝试从自己的 Deque Top 端 pop 任务(LIFO)
        task = myQueue.pop();           // 无锁操作,最快路径
 
        if (task != null) {
            task.doExec();              // 直接执行任务
            continue;                   // 继续下一轮循环
        }
 
        // 第二步:自己队列为空,进入窃取模式
        task = scan(wt, myQueue);       // 随机扫描其他 Worker 的队列
 
        if (task != null) {
            task.doExec();              // 执行偷来的任务
            continue;                   // 偷到了就继续干活
        }
 
        // 第三步:所有队列都空了,尝试休眠等待
        awaitWork(wt);                  // 挂起线程,等待新任务到来
    }
}
 
// scan 方法:随机选择一个 victim 队列,从 Base 端窃取
ForkJoinTask<?> scan(ForkJoinWorkerThread wt, WorkQueue myQueue) {
    WorkQueue[] ws = workQueues;        // 获取所有 Worker 的队列数组
    int n = ws.length;                  // 队列总数
    int r = ThreadLocalRandom.current().nextInt(n); // 随机起始位置
 
    for (int i = 0; i < n; i++) {       // 遍历所有队列
        WorkQueue victim = ws[(r + i) & (n - 1)]; // 从随机位置开始轮询
 
        if (victim != null && victim.base < victim.top) { // 该队列有任务
            ForkJoinTask<?> t = victim.poll();  // 从 Base 端取(FIFO, CAS)
            if (t != null) {
                return t;               // 窃取成功,返回任务
            }
        }
    }
    return null;                        // 全部扫描完都没有,返回 null
}

关于随机性(Randomized Stealing):为什么不按固定顺序扫描,而要随机选择起始位置?这是为了 避免多个空闲线程同时窃取同一个 victim,从而减少 CAS 冲突。随机化让窃取请求分散到不同的 victim 队列上,统计意义上能显著降低竞争概率。

关于 CAS 的使用:Stealer 从 victim 的 Base 端窃取时,需要使用 CAS 来原子地更新 base 下标。如果两个 Stealer 同时瞄准了同一个 victim 的同一个位置,CAS 保证只有一个成功,另一个重试或换目标。这比传统的锁机制轻量得多——失败的代价仅仅是一次 CAS 重试,而非线程阻塞与上下文切换

最后,让我们用一张综合全景图来总结 Work-Stealing 的完整机制:

Work-Stealing 的性能优势总结

维度传统线程池ForkJoinPool + Work-Stealing
队列结构全局共享队列每线程私有 Deque
锁竞争频率高(每次存取都竞争)极低(仅窃取时偶尔 CAS)
负载均衡静态分配,易失衡动态窃取,自适应均衡
CPU 利用率受限于最慢线程接近 100%(空闲线程自动找活干)
Cache 友好性差(共享队列频繁失效)好(Owner 局部性强)
适用场景I/O 密集、任务粒度均匀CPU 密集、递归分治、不规则并行

ForkJoinTask

ForkJoinTask<V> 是整个 Fork/Join 框架的任务基石(task foundation)。它是一个轻量级的、可以在 ForkJoinPool 中被调度执行的抽象类,实现了 Future<V> 接口。与普通的 ThreadRunnable 不同,ForkJoinTask 的设计哲学是:一个任务可以递归地将自身拆分(fork)为更小的子任务,然后汇总(join)子任务的结果。这正是分治思想在代码层面的直接体现。

从类层次结构来看,ForkJoinTask 本身是一个抽象类,开发者几乎不会直接继承它,而是使用其两个核心子类:RecursiveTask<V>(有返回值)和 RecursiveAction(无返回值)。二者都只要求你实现一个方法——compute(),在其中编写拆分逻辑和计算逻辑。

值得一提的是,图中还出现了 CountedCompleter<V>,它是 JDK 8 引入的第三个子类,适用于完成触发式的任务编排(completion-triggered tasks),例如并行流(parallel stream)的底层实现就大量依赖它。但在日常开发中,RecursiveTaskRecursiveAction 是你接触最多的两位主角。


RecursiveTask(有返回值)

RecursiveTask<V> 用于需要返回计算结果的分治场景。泛型参数 V 即为返回值类型。你需要覆写唯一的抽象方法 compute(),在方法体中实现两件事:

  1. Base case(基准条件):当任务粒度足够小时,直接计算并返回结果。
  2. Recursive case(递归拆分):将大任务拆分为若干子任务,调用 fork() 异步提交,最后调用 join() 汇总结果。

下面以数组求和为例,展示 RecursiveTask 的标准写法:

Java
import java.util.concurrent.RecursiveTask;
 
/**
 * 使用 RecursiveTask 实现并行数组求和
 * 泛型参数 Long 表示 compute() 的返回值类型
 */
public class SumTask extends RecursiveTask<Long> {
 
    // 阈值:当子数组长度 <= THRESHOLD 时,直接线性求和,不再拆分
    private static final int THRESHOLD = 1000;
 
    // 待求和数组的引用
    private final long[] array;
 
    // 当前任务负责的起始索引(包含)
    private final int start;
 
    // 当前任务负责的结束索引(不包含)
    private final int end;
 
    // 构造器:接收数组引用和当前任务负责的区间 [start, end)
    public SumTask(long[] array, int start, int end) {
        this.array = array;   // 所有子任务共享同一个数组引用,不会拷贝
        this.start = start;
        this.end = end;
    }
 
    /**
     * 核心计算方法 - 由 ForkJoinPool 中的工作线程调用
     * @return 区间 [start, end) 内所有元素的和
     */
    @Override
    protected Long compute() {
        // 计算当前任务负责的元素数量
        int length = end - start;
 
        // ★ Base case: 任务粒度足够小,直接计算
        if (length <= THRESHOLD) {
            long sum = 0;                     // 累加器
            for (int i = start; i < end; i++) {
                sum += array[i];              // 逐个累加
            }
            return sum;                       // 直接返回结果,不再拆分
        }
 
        // ★ Recursive case: 任务太大,一分为二
        int mid = start + (length >> 1);      // 取中点,等价于 start + length/2
 
        // 创建左半部分子任务 [start, mid)
        SumTask leftTask = new SumTask(array, start, mid);
 
        // 创建右半部分子任务 [mid, end)
        SumTask rightTask = new SumTask(array, mid, end);
 
        // 将左子任务异步提交到 ForkJoinPool(由其他工作线程执行)
        leftTask.fork();
 
        // ★ 关键优化:右子任务在【当前线程】直接执行,避免不必要的线程切换
        Long rightResult = rightTask.compute();
 
        // 等待左子任务完成并获取其结果
        Long leftResult = leftTask.join();
 
        // 合并两个子任务的结果
        return leftResult + rightResult;
    }
}

💡 为什么不对两个子任务都调用 fork()

这是一个非常经典的性能陷阱。如果你对 leftTaskrightTask 都调用 fork(),那么当前线程在 fork 之后就会空闲等待(idle),白白浪费了一个工作线程。正确的做法是:一个 fork,一个 compute——让当前线程继续承担一半的计算工作。这被称为 "fork-once, compute-once" 惯用法(idiom),是 Fork/Join 编程中最重要的性能准则之一。

来看一下执行时的任务拆分过程:

THRESHOLD = 1000 且数组长度为 10000 时,最终会产生约 10 个叶子任务,每个任务负责 1000 个元素的线性求和。这些叶子任务会被 ForkJoinPool 中的多个工作线程并行执行,空闲线程还会通过 Work-Stealing 从其他线程的队列中偷取任务。


RecursiveAction(无返回值)

RecursiveAction 的结构与 RecursiveTask 几乎一模一样,唯一区别是 compute() 方法没有返回值void)。它适用于只需要产生副作用(side effect)而不需要汇总结果的场景,典型例子包括:

  • 并行排序(parallel sort)—— 对数组进行原地修改
  • 并行遍历与修改(parallel transformation)—— 批量更新集合元素
  • 并行文件处理 —— 递归扫描目录并处理文件

以下是用 RecursiveAction 实现并行数组元素翻倍的示例:

Java
import java.util.concurrent.RecursiveAction;
 
/**
 * 使用 RecursiveAction 将数组中每个元素翻倍(原地修改,无返回值)
 */
public class DoubleTask extends RecursiveAction {
 
    // 阈值:当子数组长度 <= THRESHOLD 时,直接处理
    private static final int THRESHOLD = 1000;
 
    // 待处理的数组引用(所有子任务共享同一数组)
    private final long[] array;
 
    // 当前任务负责的起始索引(包含)
    private final int start;
 
    // 当前任务负责的结束索引(不包含)
    private final int end;
 
    // 构造器
    public DoubleTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
 
    /**
     * 核心计算方法 —— 注意返回值为 void
     */
    @Override
    protected void compute() {
        // 计算当前任务负责的元素数量
        int length = end - start;
 
        // ★ Base case: 直接处理
        if (length <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                array[i] *= 2;               // 每个元素翻倍(原地修改)
            }
            return;                           // 无返回值,直接 return
        }
 
        // ★ Recursive case: 拆分为两个子任务
        int mid = start + (length >> 1);
 
        // 创建左、右子任务
        DoubleTask left = new DoubleTask(array, start, mid);
        DoubleTask right = new DoubleTask(array, mid, end);
 
        // invokeAll: 同时提交两个子任务并等待它们全部完成
        // 因为没有返回值需要合并,所以用 invokeAll 比 fork+compute 更简洁
        invokeAll(left, right);
    }
}

💡 invokeAll() vs fork() + compute()

对于 RecursiveAction,由于不需要合并返回值,可以直接使用 invokeAll(task1, task2) 一次性提交两个子任务。invokeAll 内部的实现逻辑其实等价于:对第二个任务调用 fork(),对第一个任务调用 compute()(当前线程执行),再对第二个任务调用 join()。所以它并不会浪费当前线程,本质上是对 "fork-once, compute-once" 模式的封装。

下面是一张对比表,帮助你快速区分 RecursiveTaskRecursiveAction

特征RecursiveTask<V>RecursiveAction
返回值✅ 有返回值(泛型 V❌ 无返回值(void
compute() 签名protected V compute()protected void compute()
典型场景求和、求最大值、归并排序结果合并并行排序(原地)、批量修改、文件处理
结果合并通过 return + join() 手动合并无需合并,副作用已在数组/集合中就位
推荐拆分写法leftTask.fork() + rightTask.compute()invokeAll(left, right)

fork(异步执行)

fork()ForkJoinTask 中触发任务异步执行的关键方法。调用 task.fork() 后,该任务会被推入当前工作线程的本地双端队列(Work Queue)的头部,等待被调度执行。注意几个要点:

1. fork() 不会创建新线程

new Thread(task).start() 完全不同,fork() 只是将任务放入队列,由 ForkJoinPool已有的工作线程来消费。线程池中的工作线程数量通常等于 CPU 核心数(Runtime.getRuntime().availableProcessors()),不会因为 fork() 调用次数多而无限膨胀。

2. fork() 的推入位置是队列头部(LIFO 端)

当前线程自己取任务时,从队列头部取(LIFO,后进先出)。这样做的好处是:最近 fork 出来的子任务粒度最小、最可能已经准备好数据(cache locality 更好),优先执行它们可以快速完成并释放内存。

3. fork() 必须在 ForkJoinPool 上下文中调用

如果在普通线程(非 ForkJoinWorkerThread)中调用 fork(),任务会被提交到 common pool(公共池)。但这通常不是你期望的行为,因此 fork() 应当在 compute() 方法内部调用,此时当前线程一定是 ForkJoinPool 的工作线程。

下面是 fork() 的执行流程:

从图中可以清晰地看到:fork 是 push 操作(推入头部),当前线程从头部取(LIFO),窃取线程从尾部取(FIFO)。这个双端队列(Deque)的无锁设计是 Fork/Join 框架高性能的核心秘密之一。


join(等待结果)

join() 是与 fork() 配对的方法,用于阻塞当前线程,直到目标任务完成并返回其结果。如果是 RecursiveTaskjoin() 返回 compute() 的返回值;如果是 RecursiveActionjoin() 返回 null

但这里的"阻塞"与传统的 Thread.join()Future.get() 有着本质区别

1. "帮忙执行"式等待(Work-Helping)

当 Worker-1 调用 leftTask.join() 时,如果 leftTask 还在自己的队列里(尚未被执行),Worker-1 会直接把它取出来自己执行,而不是傻等。这避免了线程空转,极大提升了效率。

2. 如果任务已被窃取

如果 leftTask 已经被其他线程(比如 Worker-3)窃取走了,Worker-1 不会干等。它会尝试从自己的队列中取其他任务来执行(继续干活),或者去窃取别人的任务。只有在实在无事可做时,才会进入等待状态。

3. 异常传播

如果子任务在执行过程中抛出了异常,join() 会将该异常包装为 RuntimeException(或 Error)并重新抛出。这意味着你可以在调用 join() 的位置捕获子任务的异常。

join() vs Future.get() 对比

特征ForkJoinTask.join()Future.get()
等待方式智能等待:先尝试自己执行或帮忙执行纯阻塞等待(Blocking wait)
异常类型抛出未检查异常(unchecked exception)抛出 ExecutionException(checked)
超时支持❌ 不支持(可用 get(timeout, unit) 替代)✅ 支持 get(long, TimeUnit)
线程利用率高:等待期间不浪费线程低:线程完全阻塞

最后,用一段伪代码总结 fork + join 的标准配合模式:

Java
@Override
protected Long compute() {
    if (任务足够小) {
        return 直接计算();                     // Base case
    }
 
    // 拆分
    SubTask left = new SubTask(左半部分);      // 创建左子任务
    SubTask right = new SubTask(右半部分);      // 创建右子任务
 
    left.fork();                               // 左子任务异步推入队列
    Long rightResult = right.compute();        // 右子任务当前线程直接执行(关键优化!)
    Long leftResult = left.join();             // 等待左子任务完成
 
    return leftResult + rightResult;           // 合并结果
}

⚠️ 注意调用顺序:必须先 fork(),再 compute(),最后 join() 如果你先 join()compute(),当前线程会在 join() 处阻塞等待一个尚未开始的任务——结果就是死锁(deadlock)或串行退化。这是 Fork/Join 编程中最常见的 bug 之一。


📝 练习题

以下代码是一个 RecursiveTaskcompute() 方法实现,哪一种写法的性能最优

Java
// 写法 A
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join();
 
// 写法 B
leftTask.fork();
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
 
// 写法 C
Long leftResult = leftTask.compute();
rightTask.fork();
Long rightResult = rightTask.join();
return leftResult + rightResult;
 
// 写法 D
invokeAll(leftTask, rightTask);
return leftTask.join() + rightTask.join();

A. 写法 A

B. 写法 B

C. 写法 C

D. 写法 D

【答案】 B

【解析】

  • 写法 A:两个任务都 fork() 出去了,当前线程在两次 join() 期间自己不做任何计算工作,白白浪费了一个工作线程。虽然 join() 内部有 Work-Helping 机制可以部分弥补,但不如显式 compute() 高效。
  • 写法 B ✅:这是标准的 "fork-once, compute-once" 惯用法。leftTask.fork() 异步提交后,当前线程立刻通过 rightTask.compute() 参与计算,最后 leftTask.join() 获取左侧结果。当前线程全程无空闲,线程利用率最高。
  • 写法 C:先 compute() 左任务(当前线程同步执行完毕),然后才 fork() 右任务。这意味着左右两个任务是串行执行的,完全丧失了并行性。
  • 写法 DinvokeAll 更适合 RecursiveAction(无返回值场景)。对于 RecursiveTask,虽然功能正确,但 invokeAll 之后还需要额外调用两次 join() 来获取结果,写法不如 B 直观且存在微小的额外开销。

使用示例

前面的章节中,我们已经深入理解了 Fork/Join 框架的核心理论——分治思想、工作窃取算法、以及 RecursiveTaskRecursiveAction 两大任务抽象。现在,是时候将这些理论付诸实践了。本节将通过 四个由浅入深的完整案例,演示 ForkJoinPool 在不同业务场景下的实际运用。每个示例都将严格遵循"拆分 → fork → join → 合并"的经典模式,并附带详尽的逐行注释,确保你不仅能写出正确的 Fork/Join 代码,更能理解每一行背后的设计意图。


示例一:大数组求和(RecursiveTask)

这是 Fork/Join 最经典的入门案例。给定一个包含海量元素的 long[] 数组,利用分治策略将其拆分成小块并行求和,最后汇总结果。这个场景完美契合 RecursiveTask<Long>——因为我们需要每个子任务返回一个求和结果。

完整代码

Java
import java.util.concurrent.ForkJoinPool;   // Fork/Join 线程池
import java.util.concurrent.RecursiveTask;  // 有返回值的递归任务
 
/**
 * 大数组并行求和任务
 * 继承 RecursiveTask<Long> 表示该任务会返回一个 Long 类型的计算结果
 */
public class ArraySumTask extends RecursiveTask<Long> {
 
    // 阈值:当子数组长度 <= THRESHOLD 时,直接顺序计算,不再继续拆分
    // 这个值的选取至关重要——太小会导致任务过多、调度开销大;太大则并行度不足
    private static final int THRESHOLD = 10_000;
 
    // 待求和的原始数组(所有子任务共享同一个数组引用,不复制数据)
    private final long[] array;
 
    // 当前子任务负责计算的区间:[start, end)(左闭右开)
    private final int start;
    private final int end;
 
    /**
     * 构造方法:指定数组及当前任务负责的区间
     * @param array 原始数组
     * @param start 起始索引(包含)
     * @param end   结束索引(不包含)
     */
    public ArraySumTask(long[] array, int start, int end) {
        this.array = array;  // 保存数组引用
        this.start = start;  // 保存起始位置
        this.end = end;      // 保存结束位置
    }
 
    /**
     * 核心计算逻辑——Fork/Join 框架会自动调用此方法
     * 这里体现了分治算法的精髓:要么直接算,要么拆分后递归
     */
    @Override
    protected Long compute() {
        // 计算当前区间的长度
        int length = end - start;
 
        // ====== 基线条件 (Base Case) ======
        // 如果区间足够小,直接用简单循环求和,避免继续拆分的开销
        if (length <= THRESHOLD) {
            long sum = 0L;                    // 初始化局部求和变量
            for (int i = start; i < end; i++) { // 遍历当前负责的区间
                sum += array[i];              // 逐个累加元素
            }
            return sum;                       // 返回这个小区间的和
        }
 
        // ====== 递归拆分 (Recursive Split) ======
        // 计算区间中点,将任务一分为二
        int mid = start + (length / 2);  // 避免 (start+end)/2 可能的溢出
 
        // 创建左半部分子任务:负责 [start, mid)
        ArraySumTask leftTask = new ArraySumTask(array, start, mid);
 
        // 创建右半部分子任务:负责 [mid, end)
        ArraySumTask rightTask = new ArraySumTask(array, mid, end);
 
        // ====== Fork 阶段 ======
        // 将左子任务异步提交到 ForkJoinPool 的工作队列
        // fork() 不会阻塞当前线程——它只是把任务推入双端队列的头部
        leftTask.fork();
 
        // ====== 关键优化:当前线程直接计算右子任务 ======
        // 不要对两个子任务都 fork()!那样当前线程就空闲了,浪费一个线程
        // 正确做法:fork 一个,当前线程直接 compute 另一个
        long rightResult = rightTask.compute();
 
        // ====== Join 阶段 ======
        // 等待左子任务完成并获取其结果
        // 如果左子任务已经完成,join() 会立即返回
        // 如果尚未完成,当前线程会尝试"窃取"其他任务来执行(而非空等)
        long leftResult = leftTask.join();
 
        // ====== 合并结果 ======
        // 将左右两半的和相加,得到当前区间的总和
        return leftResult + rightResult;
    }
 
    /**
     * 主方法:演示如何创建 ForkJoinPool 并提交任务
     */
    public static void main(String[] args) {
        // 准备测试数据:创建一个包含 1000 万个元素的数组
        int size = 10_000_000;
        long[] numbers = new long[size];
 
        // 用简单规则填充数组:array[i] = i + 1,即 1, 2, 3, ..., 10000000
        for (int i = 0; i < size; i++) {
            numbers[i] = i + 1L;  // +1L 确保使用 long 运算
        }
 
        // 创建 ForkJoinPool 实例
        // 无参构造器默认使用 Runtime.availableProcessors() 个工作线程
        // 例如 8 核 CPU 就创建 8 个工作线程
        ForkJoinPool pool = new ForkJoinPool();
 
        // 创建根任务:覆盖整个数组区间 [0, size)
        ArraySumTask rootTask = new ArraySumTask(numbers, 0, size);
 
        // invoke() = 同步提交 + 等待结果,等价于 submit(task).join()
        // 它会阻塞当前主线程,直到整个分治计算完成
        long result = pool.invoke(rootTask);
 
        // 输出结果
        System.out.println("Fork/Join 求和结果: " + result);
 
        // 验证正确性:等差数列求和公式 n*(n+1)/2
        long expected = (long) size * (size + 1) / 2;
        System.out.println("数学公式验证结果: " + expected);
        System.out.println("结果是否一致: " + (result == expected));
 
        // 输出线程池状态信息(调试用)
        System.out.println("并行度 (Parallelism): " + pool.getParallelism());
        System.out.println("线程池大小 (Pool Size): " + pool.getPoolSize());
        System.out.println("窃取次数 (Steal Count): " + pool.getStealCount());
 
        // 关闭线程池,释放资源
        pool.shutdown();
    }
}

运行输出

Text
Fork/Join 求和结果: 50000005000000
数学公式验证结果: 50000005000000
结果是否一致: true
并行度 (Parallelism): 8
线程池大小 (Pool Size): 8
窃取次数 (Steal Count): 12

fork/compute 顺序的深度剖析

上面代码中有一个极其容易出错的细节——leftTask.fork()rightTask.compute() 的调用顺序。这不是随意的,让我们通过对比来理解为什么:

三种写法的核心差异:

写法效果问题
both fork()两个子任务都丢入队列当前线程空转,白白浪费一个 worker
fork → compute → join左任务入队,当前线程算右任务,再等左结果最优:零线程浪费
fork → join → compute先 fork 左,立刻 join 等左完成,再算右退化为串行,完全丧失并行优势

🔑 黄金法则:总是 fork() 一个子任务,然后当前线程直接 compute() 另一个,最后再 join() 之前 fork 的那个。


示例二:文件目录大小统计(RecursiveTask + 实际IO)

数组求和只是"教科书场景"。在真实开发中,Fork/Join 更常用于递归结构的并行遍历——比如计算一个文件目录树的总大小。目录本身就是天然的树形结构,完美匹配分治思想:每个子目录作为一个独立子任务并行计算。

Java
import java.io.File;                        // 文件/目录操作
import java.util.ArrayList;                  // 存储子任务列表
import java.util.List;                       // List 接口
import java.util.concurrent.ForkJoinPool;    // Fork/Join 线程池
import java.util.concurrent.RecursiveTask;   // 有返回值的递归任务
 
/**
 * 递归统计目录总大小(字节数)
 * 每个目录拆分为"子目录任务" + "当前目录下文件大小直接累加"
 */
public class DirectorySizeTask extends RecursiveTask<Long> {
 
    // 当前任务需要统计的目录
    private final File directory;
 
    /**
     * 构造方法
     * @param directory 要统计大小的目录
     */
    public DirectorySizeTask(File directory) {
        this.directory = directory;  // 保存目录引用
    }
 
    @Override
    protected Long compute() {
        long totalSize = 0L;  // 当前目录的累计大小
 
        // 获取目录下的所有文件和子目录
        // listFiles() 可能返回 null(如果路径不存在或没有权限)
        File[] files = directory.listFiles();
 
        // 防御性检查:目录为空或无法访问时直接返回 0
        if (files == null || files.length == 0) {
            return 0L;
        }
 
        // 用于收集所有子目录对应的异步任务
        List<DirectorySizeTask> subTasks = new ArrayList<>();
 
        // 遍历目录下的每个条目
        for (File file : files) {
            if (file.isFile()) {
                // 如果是普通文件 → 直接累加其大小(基线条件)
                totalSize += file.length();
            } else if (file.isDirectory()) {
                // 如果是子目录 → 创建新的子任务
                DirectorySizeTask subTask = new DirectorySizeTask(file);
                subTask.fork();          // 异步提交到工作队列
                subTasks.add(subTask);   // 记录下来,稍后 join
            }
        }
 
        // 等待所有子目录任务完成,并累加它们的结果
        for (DirectorySizeTask subTask : subTasks) {
            totalSize += subTask.join();  // join() 阻塞直到该子任务完成
        }
 
        // 返回当前目录(含所有子目录)的总大小
        return totalSize;
    }
 
    public static void main(String[] args) {
        // 指定要统计的根目录(请替换为你系统上的实际路径)
        File rootDir = new File("/Users/developer/projects");
 
        // 使用公共 ForkJoinPool(也可以 new ForkJoinPool())
        ForkJoinPool pool = ForkJoinPool.commonPool();
 
        // 创建根任务
        DirectorySizeTask rootTask = new DirectorySizeTask(rootDir);
 
        // 同步执行并获取结果
        long totalBytes = pool.invoke(rootTask);
 
        // 格式化输出
        System.out.printf("目录: %s%n", rootDir.getAbsolutePath());
        System.out.printf("总大小: %,d bytes (%.2f MB)%n",
                totalBytes,                           // 原始字节数
                totalBytes / (1024.0 * 1024.0));      // 转换为 MB
    }
}

这个示例中有一个与"数组求和"不同的模式——我们对所有子目录都调用了 fork()。这是因为子目录数量不确定(可能有 0 个、1 个、10 个……),我们无法简单地"fork 一个、compute 一个"。不过,对当前目录下的文件,我们直接在当前线程中累加大小(totalSize += file.length()),这本身就是当前线程在做有用的工作,所以并不算浪费。


示例三:并行归并排序(RecursiveAction)

前两个示例都使用了 RecursiveTask(有返回值)。现在来看一个 RecursiveAction(无返回值)的经典场景——原地归并排序 (In-place Merge Sort)。排序操作直接修改原数组,不需要返回值,因此用 RecursiveAction 更合适。

完整代码

Java
import java.util.Arrays;                      // Arrays.copyOfRange, Arrays.toString
import java.util.concurrent.ForkJoinPool;      // Fork/Join 线程池
import java.util.concurrent.RecursiveAction;   // 无返回值的递归任务
 
/**
 * 并行归并排序
 * 继承 RecursiveAction —— 原地修改数组,不返回结果
 */
public class ParallelMergeSort extends RecursiveAction {
 
    // 阈值:小于此长度时退化为插入排序(小数组插入排序更快)
    private static final int THRESHOLD = 4096;
 
    // 待排序数组
    private final int[] array;
 
    // 当前任务负责的区间 [start, end)
    private final int start;
    private final int end;
 
    /**
     * 构造方法
     * @param array 待排序数组(原地修改)
     * @param start 起始索引(包含)
     * @param end   结束索引(不包含)
     */
    public ParallelMergeSort(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected void compute() {  // 注意:返回类型是 void(RecursiveAction 的特征)
        int length = end - start;
 
        // ====== 基线条件:小数组使用插入排序 ======
        if (length <= THRESHOLD) {
            insertionSort(array, start, end);  // 插入排序对小数组极其高效
            return;                             // 直接返回,不再拆分
        }
 
        // ====== 递归拆分 ======
        int mid = start + (length / 2);  // 计算中点
 
        // 创建左半部分排序子任务
        ParallelMergeSort leftSort = new ParallelMergeSort(array, start, mid);
 
        // 创建右半部分排序子任务
        ParallelMergeSort rightSort = new ParallelMergeSort(array, mid, end);
 
        // ====== 并行执行两个子任务 ======
        // invokeAll() 是一个便捷方法,等价于:
        //   leftSort.fork();
        //   rightSort.compute();
        //   leftSort.join();
        // 但 invokeAll 内部做了更好的优化——它会自动决定哪个任务 fork、哪个 compute
        invokeAll(leftSort, rightSort);
 
        // ====== 合并两个已排序的子数组 ======
        // 此时 array[start..mid) 和 array[mid..end) 各自已有序
        // 需要将它们合并成一个有序序列
        merge(array, start, mid, end);
    }
 
    /**
     * 经典的双指针归并操作
     * 将 array[start..mid) 和 array[mid..end) 合并为有序序列
     */
    private void merge(int[] arr, int start, int mid, int end) {
        // 创建左半部分的临时拷贝
        int[] left = Arrays.copyOfRange(arr, start, mid);
        // 创建右半部分的临时拷贝
        int[] right = Arrays.copyOfRange(arr, mid, end);
 
        int i = 0;      // 左数组的游标
        int j = 0;      // 右数组的游标
        int k = start;   // 原数组的写入位置
 
        // 双指针逐一比较,较小的先放入原数组
        while (i < left.length && j < right.length) {
            if (left[i] <= right[j]) {       // <= 保证稳定性(stable sort)
                arr[k++] = left[i++];        // 取左边元素
            } else {
                arr[k++] = right[j++];       // 取右边元素
            }
        }
 
        // 左数组可能还有剩余元素
        while (i < left.length) {
            arr[k++] = left[i++];
        }
 
        // 右数组可能还有剩余元素
        while (j < right.length) {
            arr[k++] = right[j++];
        }
    }
 
    /**
     * 插入排序:对小规模数组非常高效
     * 时间复杂度 O(n^2),但常数因子小,且对近乎有序的数组接近 O(n)
     */
    private void insertionSort(int[] arr, int start, int end) {
        for (int i = start + 1; i < end; i++) {  // 从第二个元素开始
            int key = arr[i];                     // 当前要插入的元素
            int j = i - 1;                        // 从 key 左边开始向前扫描
            while (j >= start && arr[j] > key) {  // 找到 key 应该插入的位置
                arr[j + 1] = arr[j];              // 元素右移,腾出空位
                j--;                               // 继续向前
            }
            arr[j + 1] = key;                     // 将 key 插入正确位置
        }
    }
 
    public static void main(String[] args) {
        // 构造测试数据:100 万个随机整数
        int size = 1_000_000;
        int[] data = new int[size];
        java.util.Random random = new java.util.Random(42);  // 固定种子保证可复现
        for (int i = 0; i < size; i++) {
            data[i] = random.nextInt(size);  // [0, size) 范围内的随机整数
        }
 
        // 复制一份用于验证(Arrays.sort 作为参照基准)
        int[] expected = data.clone();  // 深拷贝
 
        // ====== Fork/Join 并行排序 ======
        ForkJoinPool pool = new ForkJoinPool();
        long startTime = System.nanoTime();                         // 开始计时
 
        // 创建根任务并执行
        pool.invoke(new ParallelMergeSort(data, 0, data.length));
 
        long elapsed = System.nanoTime() - startTime;               // 计算耗时
        System.out.printf("Fork/Join 并行归并排序: %.3f ms%n",
                elapsed / 1_000_000.0);
 
        // ====== 对照组:JDK 内置排序 ======
        startTime = System.nanoTime();
        Arrays.sort(expected);                                       // 双轴快排
        elapsed = System.nanoTime() - startTime;
        System.out.printf("Arrays.sort 排序:      %.3f ms%n",
                elapsed / 1_000_000.0);
 
        // 验证排序结果是否一致
        System.out.println("排序结果一致: " + Arrays.equals(data, expected));
 
        pool.shutdown();  // 释放资源
    }
}

invokeAll() 的内部行为

上面的代码使用了 invokeAll(leftSort, rightSort) 而非手动 fork + computeinvokeAllForkJoinTask 提供的静态便捷方法,其内部逻辑近似如下:

Java
// invokeAll(task1, task2) 的等价伪代码
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
    t2.fork();       // 把第二个任务异步提交
    t1.compute();    // 当前线程直接执行第一个任务(不浪费线程!)
    t2.join();       // 等待第二个任务完成
}

所以 invokeAll 本质上帮我们遵循了 "fork one, compute the other" 的最佳实践,只是封装得更优雅。当你有两个子任务时,推荐使用 invokeAll 来减少出错概率。


示例四:批量图片缩放(RecursiveAction + 实际业务)

最后来看一个更贴近真实业务的场景——批量处理图片。假设你有数千张图片需要生成缩略图,这是一个 CPU 密集 + 无返回值 的典型场景。

Java
import java.util.List;                        // 任务列表
import java.util.concurrent.ForkJoinPool;      // Fork/Join 线程池
import java.util.concurrent.RecursiveAction;   // 无返回值任务
 
/**
 * 批量图片处理任务
 * 将大批量图片列表拆分成小批次并行处理
 */
public class ImageProcessTask extends RecursiveAction {
 
    // 阈值:每批最多处理 5 张图片
    private static final int THRESHOLD = 5;
 
    // 待处理的图片路径列表
    private final List<String> imagePaths;
 
    // 当前任务负责的区间 [start, end)
    private final int start;
    private final int end;
 
    public ImageProcessTask(List<String> imagePaths, int start, int end) {
        this.imagePaths = imagePaths;  // 共享同一个列表引用
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected void compute() {
        int length = end - start;
 
        // ====== 基线条件:直接处理这批图片 ======
        if (length <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                processImage(imagePaths.get(i));  // 逐张处理
            }
            return;
        }
 
        // ====== 拆分成两个子批次 ======
        int mid = start + (length / 2);
 
        ImageProcessTask leftBatch = new ImageProcessTask(imagePaths, start, mid);
        ImageProcessTask rightBatch = new ImageProcessTask(imagePaths, mid, end);
 
        // invokeAll 自动处理 fork/compute/join 的最优顺序
        invokeAll(leftBatch, rightBatch);
    }
 
    /**
     * 模拟单张图片的处理逻辑(实际项目中这里是 BufferedImage 缩放操作)
     */
    private void processImage(String path) {
        // 获取当前执行线程的名称(便于观察并行效果)
        String threadName = Thread.currentThread().getName();
        System.out.printf("[%s] 正在处理: %s%n", threadName, path);
 
        try {
            Thread.sleep(50);  // 模拟图片处理耗时 50ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();  // 恢复中断标志
        }
    }
 
    public static void main(String[] args) {
        // 构造 20 张模拟图片路径
        List<String> images = new java.util.ArrayList<>();
        for (int i = 1; i <= 20; i++) {
            images.add("/photos/image_" + String.format("%03d", i) + ".jpg");
        }
 
        ForkJoinPool pool = new ForkJoinPool(4);  // 指定 4 个工作线程
 
        long startTime = System.currentTimeMillis();
 
        // 提交根任务:处理全部 20 张图片
        pool.invoke(new ImageProcessTask(images, 0, images.size()));
 
        long elapsed = System.currentTimeMillis() - startTime;
 
        System.out.println("============================");
        System.out.printf("全部处理完成! 总耗时: %d ms%n", elapsed);
        System.out.printf("理论串行耗时: %d ms%n", 20 * 50);  // 20 * 50 = 1000ms
        System.out.printf("并行加速比: %.2fx%n", (20.0 * 50) / elapsed);
 
        pool.shutdown();
    }
}

运行输出(节选)

Text
[ForkJoinPool-1-worker-1] 正在处理: /photos/image_001.jpg
[ForkJoinPool-1-worker-3] 正在处理: /photos/image_011.jpg
[ForkJoinPool-1-worker-2] 正在处理: /photos/image_006.jpg
[ForkJoinPool-1-worker-4] 正在处理: /photos/image_016.jpg
... (四个线程交错并行)
============================
全部处理完成! 总耗时: 263 ms
理论串行耗时: 1000 ms
并行加速比: 3.80x

4 个工作线程将 1000ms 的串行工作压缩到约 263ms,加速比接近理论上限的 4x,这就是 Fork/Join 并行的威力。


四种提交方式的选择指南

在上面四个示例中,我们用到了 invoke()fork()submit() 等不同的提交方式。它们之间的区别和适用场景如下:

用代码总结一下这四种调用方式:

Java
// ① invoke: 同步提交并阻塞,直到拿到结果(最常用)
Long result = pool.invoke(task);
 
// ② submit: 异步提交,立即返回 ForkJoinTask(即 Future)
ForkJoinTask<Long> future = pool.submit(task);
Long result = future.get();  // 可在稍后某个时刻获取结果
 
// ③ execute: 异步提交,无返回值(配合 RecursiveAction)
pool.execute(task);
// ... 做其他事 ...
task.join();  // 需要时手动等待
 
// ④ fork: 只在 compute() 方法内部使用,将子任务推入队列
leftTask.fork();            // 提交到当前工作线程的双端队列
Long right = rightTask.compute();  // 当前线程直接计算
Long left = leftTask.join();       // 等待 fork 出去的任务

使用 ForkJoinPool 的注意事项与最佳实践

通过四个示例的实战,我们可以总结出以下关键经验:

1. 阈值 (Threshold) 的选择至关重要

阈值太小会导致任务数爆炸,创建和调度任务的开销可能超过计算本身;阈值太大则并行度不足。一般的经验法则是:让任务总数约为线程数的 10~100 倍。例如 8 核 CPU,100 万元素数组,阈值设为 1_000_000 / (8 * 50) ≈ 2500 左右是比较合理的起点,然后通过基准测试微调。

2. 避免在任务中执行阻塞 IO

Fork/Join 框架假设任务是 CPU 密集型 (CPU-bound) 的。如果任务中包含数据库查询、网络请求等阻塞操作,工作线程会被长时间占用,严重影响工作窃取的效率。对于 IO 密集型场景,应使用 CompletableFuture + 自定义 ExecutorService

3. 不要在任务中使用 synchronized 或重量级锁

Fork/Join 的高效性建立在"各任务独立、无共享可变状态"的前提上。引入锁会导致线程阻塞,破坏工作窃取的节奏,甚至引发死锁。

4. commonPool() vs new ForkJoinPool()

Java
// 方式一:使用 JVM 全局共享的公共池(Java 8+ 的并行流也用这个池)
ForkJoinPool common = ForkJoinPool.commonPool();
 
// 方式二:创建独立的池(推荐用于生产环境的关键路径)
ForkJoinPool dedicated = new ForkJoinPool(
    Runtime.getRuntime().availableProcessors()  // 并行度
);

commonPool 方便但不可控——如果其他代码(比如并行流 parallelStream())也在用它,你的任务可能被排队等待。对于生产环境中的关键计算任务,建议创建专用池以保证资源隔离。


📝 练习题

以下代码运行在一个 4 核 CPU 上,使用默认并行度的 ForkJoinPool。假设数组长度为 10,000,THRESHOLD 为 2,500,compute() 方法中正确使用了 fork + compute + join 模式。请问整个计算过程中,大约会创建多少个 ArraySumTask 实例?

A. 4 个

B. 7 个

C. 15 个

D. 取决于 CPU 核心数,每个核心处理一个任务

【答案】 B

【解析】 这是一道关于分治树结构的题目,与 CPU 核心数无关。数组长度 10,000,阈值 2,500:

  • 第 0 层(根):[0, 10000) → 长度 10,000 > 2,500 → 拆分。1 个任务。
  • 第 1 层:[0, 5000)[5000, 10000) → 长度 5,000 > 2,500 → 各自拆分。2 个任务。
  • 第 2 层:[0, 2500) / [2500, 5000) / [5000, 7500) / [7500, 10000) → 长度 2,500 ≤ 2,500 → 直接计算。4 个任务。

总计 1 + 2 + 4 = 7 个任务实例。这构成了一棵完全二叉树。核心数 4 决定的是"有多少个线程同时处理这 7 个任务",而不是"创建多少个任务"。所以 D 选项的说法是错误的。选 B


本章小结

本章围绕 ForkJoinPool 这一 Java 并发框架的核心组件,从设计哲学到实战落地进行了系统性梳理。以下从几个维度进行回顾与提炼。


核心知识脉络回顾

Fork/Join 框架的本质是将 分治算法 (Divide and Conquer)多线程并行执行 相结合的一套基础设施。它诞生于 JDK 7,由 Doug Lea 设计,目标是充分利用多核 CPU 的算力来加速 可递归拆分 的计算任务。与传统 ThreadPoolExecutor 不同,ForkJoinPool 并非面向独立的、互不相关的任务调度,而是面向 一个大任务被不断拆解为子任务、子任务的结果再被逐层合并 这样的计算模型。

分治思想是整个框架的灵魂。任何使用 Fork/Join 的场景,都必须满足一个前提——问题能被分解为结构相同但规模更小的子问题,且子问题的解可以被合并为原问题的解。框架只是提供了将这一数学思想映射到多线程世界的工程手段。

工作窃取 (Work-Stealing) 是 ForkJoinPool 区别于其他线程池的核心调度策略。每个 Worker Thread 拥有自己的 双端队列 (Deque),自身从队列 头部 (top) 取任务执行,而空闲线程从其他忙碌线程的队列 尾部 (base) 窃取任务。这种设计在绝大多数场景下避免了锁竞争,实现了接近线性的并行加速比 (near-linear speedup)。

ForkJoinTask 是提交到 ForkJoinPool 的任务抽象。它有两个核心子类:RecursiveTask〈V〉(有返回值,适用于归并求和等场景)和 RecursiveAction(无返回值,适用于批量数据处理等场景)。fork() 将子任务异步推入当前线程的工作队列,join() 则阻塞等待子任务完成并获取结果——但这里的"阻塞"并非真正的线程挂起,而是 ForkJoinPool 内部会让等待线程去执行其他可用任务,从而避免资源浪费。


全章知识体系总览


关键结论与最佳实践

维度要点
适用场景可递归拆解的 CPU 密集型任务;不适合 I/O 阻塞型任务
阈值选择compute() 中的 threshold 直接影响性能——太小导致任务对象创建开销过大,太大则并行度不足
commonPoolForkJoinPool.commonPool() 是 JVM 全局共享池,parallelStream 默认使用它;生产环境中应警惕共享池被阻塞任务拖慢
避免阻塞在 ForkJoinPool 中执行 I/O 或 synchronized 阻塞操作会破坏 Work-Stealing 效率,导致线程饥饿 (thread starvation)
fork/join 顺序推荐 task1.fork() → task2.compute() → task1.join() 的模式,让当前线程直接计算其中一个子任务而非两个都 fork,减少一次任务入队开销
与 Stream 的关系parallelStream 的底层引擎正是 ForkJoinPool.commonPool(),理解 Fork/Join 是理解并行流性能调优的基础

一句话总结:ForkJoinPool = 分治算法 + 工作窃取调度 + 轻量级任务抽象,三者协同构成了 Java 并行计算的基石。掌握它不仅是使用 parallelStream 的前提,更是理解现代 JVM 并发模型演进(从线程池到 Virtual Threads)的重要一环。


📝 练习题 1

关于 ForkJoinPool 的工作窃取 (Work-Stealing) 机制,以下描述正确的是?

A. 每个工作线程从自己双端队列的尾部 (tail/base) 取任务,窃取线程从头部 (top/head) 偷任务

B. 所有工作线程共享同一个任务队列,通过 CAS 竞争获取任务

C. 每个工作线程从自己双端队列的头部 (top/head) 取任务,窃取线程从其他线程队列的尾部 (tail/base) 偷任务

D. 工作窃取仅发生在任务提交阶段,一旦任务开始执行就不会再发生窃取

【答案】 C

【解析】 Work-Stealing 的核心设计是:Owner Thread 从自己 Deque 的 头部 (top) 以 LIFO 方式取任务(保持递归局部性,cache 友好),而 Thief Thread 从其他线程 Deque 的 尾部 (base) 以 FIFO 方式窃取任务(窃取的是较大的、较早 fork 出来的任务,拆分后能产生更多子任务供自己执行)。选项 A 恰好反了方向;选项 B 描述的是传统 ThreadPoolExecutor 的共享队列模型;选项 D 也不正确——窃取在整个执行过程中持续发生,只要有线程空闲就会尝试窃取。


📝 练习题 2

RecursiveTaskcompute() 方法中,以下哪种 fork/join 调用模式效率最高?

A. left.fork(); right.fork(); return left.join() + right.join();

B. left.fork(); long rightResult = right.compute(); return left.join() + rightResult;

C. left.compute(); right.compute(); return left.join() + right.join();

D. return left.invoke() + right.invoke();

【答案】 B

【解析】 选项 B 是 Fork/Join 框架推荐的最佳模式。left.fork() 将左子任务异步推入工作队列,然后当前线程 直接执行 right.compute() 而不是将其也入队——这样当前线程不会空转等待,省去了一次任务入队和出队的开销。最后 left.join() 等待左子任务的结果并合并。选项 A 虽然正确但多了一次不必要的 fork(当前线程 fork 了两个子任务后自己反而要等待,浪费了一个线程的算力)。选项 C 中两个都用 compute() 意味着完全串行执行,丧失了并行性。选项 D 中 invoke() 等价于 fork() + join() 的同步调用,两个都 invoke 同样是串行的。