庖丁解牛 ForkJoinPool:从源码深处剖析其精妙的并行之道
你好,我是你的老朋友,码农阿泽。
你是否也曾被 Java 并发编程的复杂性所困扰?多线程、锁、同步……这些概念是否让你感到头疼?别担心,今天我们就来一起深入探索 Java 并发框架中的一位“高手”—— ForkJoinPool,从源码层面彻底搞懂它的工作原理,让你在并发编程的世界里游刃有余。
ForkJoinPool 是什么?
ForkJoinPool 是 Java 7 引入的一个并行计算框架,它的核心思想是“分而治之”。想象一下,你有一大堆的任务需要处理,如果只有一个工人(线程)来做,那效率肯定很低。但如果你把这堆任务分成若干个小任务,然后分配给多个工人同时处理,最后再把结果合并起来,那效率就会大大提高。ForkJoinPool 就是这样做的。
它特别适合那些可以递归拆分的任务,比如快速排序、归并排序、矩阵乘法等等。在这些场景下,ForkJoinPool 可以充分利用多核 CPU 的优势,显著提高程序的执行效率。
为什么选择 ForkJoinPool?
你可能会问,Java 不是已经有线程池了吗?为什么还要搞一个 ForkJoinPool?
传统的线程池,比如 ThreadPoolExecutor,在处理相互独立的任务时表现出色。但当任务之间存在依赖关系,或者需要递归拆分时,ThreadPoolExecutor 就显得力不从心了。因为 ThreadPoolExecutor 中的线程是“平等”的,它们从同一个任务队列中获取任务,无法感知任务之间的依赖关系。
而 ForkJoinPool 则不同,它采用了“工作窃取”(Work-Stealing)算法,每个线程都有自己的任务队列。当一个线程完成了自己的任务后,它可以“偷”其他线程的任务来执行,这样就避免了线程空闲,提高了 CPU 利用率。此外,ForkJoinPool 还针对递归任务做了优化,可以更好地处理任务之间的依赖关系。
ForkJoinPool 的核心组件
要深入理解 ForkJoinPool,我们需要先了解它的几个核心组件:
- ForkJoinPool: 这是 ForkJoin 框架的核心类,它负责管理工作线程、任务队列,以及任务的提交和执行。
- ForkJoinWorkerThread: 这是 ForkJoinPool 中的工作线程,它负责执行 ForkJoinTask。
- ForkJoinTask: 这是 ForkJoin 框架中的任务抽象,它有两个重要的子类:- RecursiveAction: 用于没有返回值的递归任务。
- RecursiveTask: 用于有返回值的递归任务。
 
- WorkQueue: 这是 ForkJoinPool 中的工作队列,每个工作线程都有一个自己的 WorkQueue,用于存储待执行的任务。WorkQueue 采用了双端队列(Deque)的数据结构,支持从队列头部和尾部添加和移除任务。
ForkJoinPool 的工作原理:源码级剖析
现在,让我们一起深入 ForkJoinPool 的源码,看看它是如何实现“分而治之”和“工作窃取”的。
1. 任务提交
当我们向 ForkJoinPool 提交一个 ForkJoinTask 时,会发生什么呢?
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws;
    WorkQueue q;
    int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a;
        int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.set শরতInt(q, QLOCK, 0);
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}
submit() 方法首先会检查任务是否为空,然后调用 externalPush() 方法。externalPush() 方法会尝试将任务添加到当前线程的 WorkQueue 中。如果添加成功,就返回;否则,调用 externalSubmit() 方法。
externalSubmit() 方法会做一些更复杂的操作,比如创建新的 WorkQueue、启动工作线程等等。这里我们就不展开讲了,感兴趣的同学可以自己去看看源码。
2. 任务执行
当一个 ForkJoinWorkerThread 开始执行任务时,它会首先从自己的 WorkQueue 中获取任务。如果自己的 WorkQueue 为空,它会尝试从其他线程的 WorkQueue 中“偷”任务。
final void runWorker(WorkQueue w) {
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds probeprobe
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorshift
    for (ForkJoinTask<?> t;;) {
        if ((t = scan(w, r)) != null)
            w.runTask(t);
        else if (!tryAwaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}
final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentTask = task).doExec();
        U.putOrderedObject(this, QCURRENTTASK, null); // release for GC
        execLocalTasks();
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();
    }
}
runWorker() 方法是 ForkJoinWorkerThread 的主循环。它会不断地调用 scan() 方法来获取任务,然后调用 runTask() 方法来执行任务。
scan() 方法会首先尝试从自己的 WorkQueue 中获取任务,如果失败,则会尝试从其他线程的 WorkQueue 中“偷”任务。
runTask() 方法会执行任务的 doExec() 方法。doExec() 方法是 ForkJoinTask 的核心方法,它定义了任务的具体执行逻辑。
对于 RecursiveAction 和 RecursiveTask,doExec() 方法通常会做以下几件事:
- 判断任务是否足够小,如果足够小,则直接执行任务。
- 如果任务太大,则将任务拆分成若干个子任务。
- 调用 fork()方法将子任务提交到 ForkJoinPool。
- 调用 join()方法等待子任务执行完成。
- 合并子任务的结果(对于 RecursiveTask)。
3. 工作窃取
“工作窃取”是 ForkJoinPool 的核心特性之一。当一个线程完成了自己的任务后,它可以“偷”其他线程的任务来执行,这样就避免了线程空闲,提高了 CPU 利用率。
private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws;
    int m;
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
        int ss = w.scanState;
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
            WorkQueue q;
            ForkJoinTask<?>[] a;
            if ((q = ws[k]) != null) {
                int b, n;
                long i;
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {      // non-empty
                    i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i));
                    if (q.base == b) {
                        if (ss >= 0) {
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                q.base = b + 1;
                                if (n < -1)
                                    signalWork(ws, q);
                                return t;
                            }
                        }
                        else if (oldSum == 0 &&   // try to activate
                                 w.scanState < 0)
                            tryRelease(c, ws[m & (int)c], AC_UNIT);
                    }
                }
                if (ss < 0)                   // refresh
                    ss = w.scanState;
                r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                origin = k = r & m;           // move and rescan
                oldSum = checkSum = 0;
                continue;
            }
            checkSum += q.config;
            if ((k = (k + 1) & m) == origin) {
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0) // already inactive
                        break;
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                               (UC_MASK & ((c - AC_UNIT) << AC_SHIFT)));
                    w.stackPred = (int)c;
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;
            }
        }
    }
    return null;
}
scan()方法首先会尝试从自己的WorkQueue中获取任务, 如果获取不到, 会遍历其他线程的WorkQueue, 尝试从其他线程的队列中窃取任务.
ForkJoinPool 的最佳实践
了解了 ForkJoinPool 的工作原理后,我们来看看如何在实际开发中更好地使用它。
- 选择合适的任务粒度: 任务粒度太小,会导致频繁的任务拆分和合并,增加开销;任务粒度太大,又无法充分利用多核 CPU 的优势。因此,我们需要根据实际情况,选择合适的任务粒度。 
- 避免阻塞操作: 在 ForkJoinTask 中,应尽量避免阻塞操作,比如 I/O 操作、锁等待等等。因为阻塞操作会导致工作线程被挂起,无法执行其他任务,从而降低 ForkJoinPool 的效率。如果必须进行阻塞操作,可以考虑使用 ManagedBlocker 接口。 
- 合理设置并行度: ForkJoinPool 的并行度(parallelism)决定了工作线程的数量。并行度太小,无法充分利用 CPU 资源;并行度太大,又会导致线程切换开销增加。通常,我们可以将并行度设置为 CPU 核心数,或者略大于 CPU 核心数。 
- 注意异常处理: ForkJoinTask的执行过程中可能抛出异常, 需要妥善处理, 避免影响其他任务的执行. 
- 监控和调优: 可以通过 - getPoolSize(),- getActiveThreadCount(),- getRunningThreadCount(),- getQueuedSubmissionCount(),- getQueuedTaskCount()等方法来监控ForkJoinPool的状态, 并根据实际情况进行调优.
总结
ForkJoinPool 是 Java 并发框架中的一个强大工具,它可以帮助我们更轻松地编写高效的并行程序。通过深入理解其工作原理和最佳实践,我们可以更好地利用 ForkJoinPool 的优势,提升程序的性能。
希望这篇文章能帮助你更深入地理解 ForkJoinPool。如果你有任何问题或者想法,欢迎在评论区留言,我们一起交流学习。
记住, 并发编程并非易事, 需要不断学习和实践, 才能掌握其精髓. 加油, 未来的并发编程大师!

