HOOOS

庖丁解牛 ForkJoinPool:从源码深处剖析其精妙的并行之道

0 45 码农阿泽 Java并发编程ForkJoinPool
Apple

庖丁解牛 ForkJoinPool:从源码深处剖析其精妙的并行之道

你好,我是你的老朋友,码农阿泽。

你是否也曾被 Java 并发编程的复杂性所困扰?多线程、锁、同步……这些概念是否让你感到头疼?别担心,今天我们就来一起深入探索 Java 并发框架中的一位“高手”—— ForkJoinPool,从源码层面彻底搞懂它的工作原理,让你在并发编程的世界里游刃有余。

ForkJoinPool 是什么?

ForkJoinPool 是 Java 7 引入的一个并行计算框架,它的核心思想是“分而治之”。想象一下,你有一大堆的任务需要处理,如果只有一个工人(线程)来做,那效率肯定很低。但如果你把这堆任务分成若干个小任务,然后分配给多个工人同时处理,最后再把结果合并起来,那效率就会大大提高。ForkJoinPool 就是这样做的。

它特别适合那些可以递归拆分的任务,比如快速排序、归并排序、矩阵乘法等等。在这些场景下,ForkJoinPool 可以充分利用多核 CPU 的优势,显著提高程序的执行效率。

为什么选择 ForkJoinPool?

你可能会问,Java 不是已经有线程池了吗?为什么还要搞一个 ForkJoinPool?

传统的线程池,比如 ThreadPoolExecutor,在处理相互独立的任务时表现出色。但当任务之间存在依赖关系,或者需要递归拆分时,ThreadPoolExecutor 就显得力不从心了。因为 ThreadPoolExecutor 中的线程是“平等”的,它们从同一个任务队列中获取任务,无法感知任务之间的依赖关系。

而 ForkJoinPool 则不同,它采用了“工作窃取”(Work-Stealing)算法,每个线程都有自己的任务队列。当一个线程完成了自己的任务后,它可以“偷”其他线程的任务来执行,这样就避免了线程空闲,提高了 CPU 利用率。此外,ForkJoinPool 还针对递归任务做了优化,可以更好地处理任务之间的依赖关系。

ForkJoinPool 的核心组件

要深入理解 ForkJoinPool,我们需要先了解它的几个核心组件:

  • ForkJoinPool: 这是 ForkJoin 框架的核心类,它负责管理工作线程、任务队列,以及任务的提交和执行。
  • ForkJoinWorkerThread: 这是 ForkJoinPool 中的工作线程,它负责执行 ForkJoinTask。
  • ForkJoinTask: 这是 ForkJoin 框架中的任务抽象,它有两个重要的子类:
    • RecursiveAction: 用于没有返回值的递归任务。
    • RecursiveTask: 用于有返回值的递归任务。
  • WorkQueue: 这是 ForkJoinPool 中的工作队列,每个工作线程都有一个自己的 WorkQueue,用于存储待执行的任务。WorkQueue 采用了双端队列(Deque)的数据结构,支持从队列头部和尾部添加和移除任务。

ForkJoinPool 的工作原理:源码级剖析

现在,让我们一起深入 ForkJoinPool 的源码,看看它是如何实现“分而治之”和“工作窃取”的。

1. 任务提交

当我们向 ForkJoinPool 提交一个 ForkJoinTask 时,会发生什么呢?

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws;
    WorkQueue q;
    int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a;
        int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.set শরতInt(q, QLOCK, 0);
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}

submit() 方法首先会检查任务是否为空,然后调用 externalPush() 方法。externalPush() 方法会尝试将任务添加到当前线程的 WorkQueue 中。如果添加成功,就返回;否则,调用 externalSubmit() 方法。

externalSubmit() 方法会做一些更复杂的操作,比如创建新的 WorkQueue、启动工作线程等等。这里我们就不展开讲了,感兴趣的同学可以自己去看看源码。

2. 任务执行

当一个 ForkJoinWorkerThread 开始执行任务时,它会首先从自己的 WorkQueue 中获取任务。如果自己的 WorkQueue 为空,它会尝试从其他线程的 WorkQueue 中“偷”任务。

final void runWorker(WorkQueue w) {
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds probeprobe
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorshift
    for (ForkJoinTask<?> t;;) {
        if ((t = scan(w, r)) != null)
            w.runTask(t);
        else if (!tryAwaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}

final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentTask = task).doExec();
        U.putOrderedObject(this, QCURRENTTASK, null); // release for GC
        execLocalTasks();
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();
    }
}

runWorker() 方法是 ForkJoinWorkerThread 的主循环。它会不断地调用 scan() 方法来获取任务,然后调用 runTask() 方法来执行任务。

scan() 方法会首先尝试从自己的 WorkQueue 中获取任务,如果失败,则会尝试从其他线程的 WorkQueue 中“偷”任务。

runTask() 方法会执行任务的 doExec() 方法。doExec() 方法是 ForkJoinTask 的核心方法,它定义了任务的具体执行逻辑。

对于 RecursiveAction 和 RecursiveTask,doExec() 方法通常会做以下几件事:

  1. 判断任务是否足够小,如果足够小,则直接执行任务。
  2. 如果任务太大,则将任务拆分成若干个子任务。
  3. 调用 fork() 方法将子任务提交到 ForkJoinPool。
  4. 调用 join() 方法等待子任务执行完成。
  5. 合并子任务的结果(对于 RecursiveTask)。

3. 工作窃取

“工作窃取”是 ForkJoinPool 的核心特性之一。当一个线程完成了自己的任务后,它可以“偷”其他线程的任务来执行,这样就避免了线程空闲,提高了 CPU 利用率。

private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws;
    int m;
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
        int ss = w.scanState;
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
            WorkQueue q;
            ForkJoinTask<?>[] a;
            if ((q = ws[k]) != null) {
                int b, n;
                long i;
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {      // non-empty
                    i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i));
                    if (q.base == b) {
                        if (ss >= 0) {
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                q.base = b + 1;
                                if (n < -1)
                                    signalWork(ws, q);
                                return t;
                            }
                        }
                        else if (oldSum == 0 &&   // try to activate
                                 w.scanState < 0)
                            tryRelease(c, ws[m & (int)c], AC_UNIT);
                    }
                }
                if (ss < 0)                   // refresh
                    ss = w.scanState;
                r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                origin = k = r & m;           // move and rescan
                oldSum = checkSum = 0;
                continue;
            }
            checkSum += q.config;
            if ((k = (k + 1) & m) == origin) {
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0) // already inactive
                        break;
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                               (UC_MASK & ((c - AC_UNIT) << AC_SHIFT)));
                    w.stackPred = (int)c;
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;
            }
        }
    }
    return null;
}

scan()方法首先会尝试从自己的WorkQueue中获取任务, 如果获取不到, 会遍历其他线程的WorkQueue, 尝试从其他线程的队列中窃取任务.

ForkJoinPool 的最佳实践

了解了 ForkJoinPool 的工作原理后,我们来看看如何在实际开发中更好地使用它。

  1. 选择合适的任务粒度: 任务粒度太小,会导致频繁的任务拆分和合并,增加开销;任务粒度太大,又无法充分利用多核 CPU 的优势。因此,我们需要根据实际情况,选择合适的任务粒度。

  2. 避免阻塞操作: 在 ForkJoinTask 中,应尽量避免阻塞操作,比如 I/O 操作、锁等待等等。因为阻塞操作会导致工作线程被挂起,无法执行其他任务,从而降低 ForkJoinPool 的效率。如果必须进行阻塞操作,可以考虑使用 ManagedBlocker 接口。

  3. 合理设置并行度: ForkJoinPool 的并行度(parallelism)决定了工作线程的数量。并行度太小,无法充分利用 CPU 资源;并行度太大,又会导致线程切换开销增加。通常,我们可以将并行度设置为 CPU 核心数,或者略大于 CPU 核心数。

  4. 注意异常处理: ForkJoinTask的执行过程中可能抛出异常, 需要妥善处理, 避免影响其他任务的执行.

  5. 监控和调优: 可以通过getPoolSize(), getActiveThreadCount(), getRunningThreadCount(), getQueuedSubmissionCount(), getQueuedTaskCount()等方法来监控ForkJoinPool的状态, 并根据实际情况进行调优.

总结

ForkJoinPool 是 Java 并发框架中的一个强大工具,它可以帮助我们更轻松地编写高效的并行程序。通过深入理解其工作原理和最佳实践,我们可以更好地利用 ForkJoinPool 的优势,提升程序的性能。

希望这篇文章能帮助你更深入地理解 ForkJoinPool。如果你有任何问题或者想法,欢迎在评论区留言,我们一起交流学习。

记住, 并发编程并非易事, 需要不断学习和实践, 才能掌握其精髓. 加油, 未来的并发编程大师!

点评评价

captcha
健康