HOOOS

Java 并发编程:CyclicBarrier 实战详解,多线程计算场景下的协作利器

0 47 并发小能手 Java并发CyclicBarrier多线程
Apple

Java 并发编程:CyclicBarrier 实战详解,多线程计算场景下的协作利器

你好,我是你的并发编程助手“并发小能手”。在 Java 并发编程的世界里,协调多个线程的执行顺序和同步操作是一项常见的挑战。今天,咱们就来聊聊 CyclicBarrier 这个强大的工具,看看它是如何在多线程计算场景下大显身手的。

为什么需要 CyclicBarrier?

在很多实际应用中,我们需要将一个大任务拆分成多个小任务,然后交给多个线程并行处理,最后再将各个线程的结果汇总起来。比如,我们要计算一个大型数组中所有元素的总和。我们可以将数组分成多个部分,每个线程负责计算一部分数据的和,最后再将这些部分和加起来得到最终结果。

在这个过程中,我们需要确保所有线程都完成了自己的计算任务后,才能进行最终的结果汇总。如果某个线程还没算完,我们就提前进行了汇总,那么得到的结果肯定是错误的。这时候,CyclicBarrier 就能派上用场了。

CyclicBarrier 是什么?

CyclicBarrier,顾名思义,就是一个“循环的屏障”。它允许一组线程相互等待,直到所有线程都到达一个共同的屏障点(barrier point),然后再一起继续执行。这个“循环”的含义在于,当所有线程都到达屏障点后,CyclicBarrier 可以被重置并重复使用。

CyclicBarrier 的核心思想是:

  1. 设置参与线程数: 在创建 CyclicBarrier 对象时,我们需要指定参与同步的线程数量。
  2. 等待所有线程到达: 每个线程在完成自己的任务后,会调用 CyclicBarrierawait() 方法,表示自己已经到达了屏障点。await() 方法会阻塞当前线程,直到所有参与线程都调用了 await() 方法。
  3. 执行共同操作(可选): 当所有线程都到达屏障点后,CyclicBarrier 可以选择执行一个预先定义的 Runnable 任务(可选)。这个任务只会被其中一个线程执行一次。
  4. 继续执行后续任务: 共同操作执行完毕后(或者没有定义共同操作),所有线程会从 await() 方法返回,继续执行各自的后续任务。

CyclicBarrier 怎么用?

下面,我们就通过一个具体的例子来演示 CyclicBarrier 的用法。假设我们要计算一个大型数组中所有元素的总和,我们可以将数组分成 4 个部分,每个线程负责计算其中一部分的和,最后再将这些部分和加起来。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class CyclicBarrierExample {

    // 假设我们要计算的数组
    private static final int[] data = new int[1000];

    // 参与计算的线程数
    private static final int THREAD_COUNT = 4;

    // 用于保存每个线程计算的部分和
    private static final int[] partialSums = new int[THREAD_COUNT];

    // 用于保存最终结果
    private static final AtomicInteger finalSum = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        // 初始化数组
        for (int i = 0; i < data.length; i++) {
            data[i] = i + 1;
        }

        // 创建 CyclicBarrier 对象,指定参与线程数和所有线程到达屏障点后要执行的任务
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
            // 在所有线程都到达屏障点后,汇总各个线程的部分和
            for (int partialSum : partialSums) {
                finalSum.addAndGet(partialSum);
            }
            System.out.println("所有线程计算完毕,最终结果为:" + finalSum.get());
        });

        // 创建并启动多个线程
        Thread[] threads = new Thread[THREAD_COUNT];
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadIndex = i;
            threads[i] = new Thread(() -> {
                // 计算当前线程负责的部分数据的和
                int startIndex = threadIndex * (data.length / THREAD_COUNT);
                int endIndex = (threadIndex + 1) * (data.length / THREAD_COUNT);
                int sum = 0;
                for (int j = startIndex; j < endIndex; j++) {
                    sum += data[j];
                }
                partialSums[threadIndex] = sum;
                System.out.println("线程 " + Thread.currentThread().getName() + " 计算完毕,部分和为:" + sum);

                try {
                    // 等待其他线程到达屏障点
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                // 线程可以继续执行其他任务(如果有的话)
                System.out.println("线程 " + Thread.currentThread().getName() + " 继续执行其他任务...");
            });
            threads[i].start();
        }
          // 等待所有线程结束
        for(Thread t : threads){
             t.join();
        }
        System.out.println("所有线程任务全部完成");
    }
}

代码解读:

  1. 初始化数据: 我们创建了一个包含 1000 个元素的数组 data,并初始化了每个元素的值。
  2. 创建 CyclicBarrier 我们创建了一个 CyclicBarrier 对象,并指定参与同步的线程数为 THREAD_COUNT(这里是 4)。同时,我们还传入了一个 Runnable 任务,这个任务会在所有线程都到达屏障点后执行,负责汇总各个线程的部分和。
  3. 创建并启动线程: 我们创建了 4 个线程,每个线程负责计算数组中一部分数据的和。每个线程计算完自己的部分和后,会将结果保存到 partialSums 数组中,然后调用 barrier.await() 方法等待其他线程。
  4. await() 方法: await() 方法会阻塞当前线程,直到所有参与线程(这里是 4 个)都调用了 await() 方法。当所有线程都到达屏障点后,CyclicBarrier 会自动执行我们在第 2 步中定义的 Runnable 任务(汇总部分和)。
  5. 汇总结果: Runnable 任务会遍历 partialSums 数组,将各个线程的部分和累加到 finalSum 变量中,并打印最终结果。
  6. 继续后续的任务: Runnable任务执行完后,所有之前在await()方法阻塞的线程都会返回,每个线程可以继续执行自己的其他任务。
  7. 主线程等待:通过join()方法,主线程会等待所有计算线程完成后再继续执行,确保所有任务的完成。

运行结果示例:

线程 Thread-0 计算完毕,部分和为:31375
线程 Thread-2 计算完毕,部分和为:93875
线程 Thread-1 计算完毕,部分和为:62625
线程 Thread-3 计算完毕,部分和为:125125
所有线程计算完毕,最终结果为:313000
线程 Thread-2 继续执行其他任务...
线程 Thread-0 继续执行其他任务...
线程 Thread-3 继续执行其他任务...
线程 Thread-1 继续执行其他任务...
所有线程任务全部完成

从运行结果可以看出,4 个线程分别计算出了自己的部分和,然后 CyclicBarrier 将这些部分和汇总起来,得到了最终结果。所有线程到达CyclicBarrierawait()之后,继续执行了各自其他的任务,最终所有线程的任务都完成。

CyclicBarrier 的优势

通过上面的例子,我们可以看到 CyclicBarrier 在简化并发编程方面的优势:

  • 简化同步逻辑: 我们不需要手动编写复杂的线程同步代码,只需要创建一个 CyclicBarrier 对象,然后在每个线程完成任务后调用 await() 方法即可。
  • 可重用性: CyclicBarrier 是可以重用的。当所有线程都到达屏障点后,CyclicBarrier 会自动重置,可以再次使用。
  • 支持共同操作: CyclicBarrier 可以在所有线程都到达屏障点后执行一个预定义的 Runnable 任务,这在某些场景下非常有用,比如汇总各个线程的计算结果。

CyclicBarrier 和 CountDownLatch 的区别

在 Java 并发包中,还有一个和 CyclicBarrier 类似的工具,叫做 CountDownLatch。它们都可以用来协调多个线程的执行,但也有一些区别:

  • 计数方式不同: CountDownLatch 使用计数器来控制线程的同步,计数器的值在创建时指定,并且只能减小,不能重置。而 CyclicBarrier 使用“到达屏障点的线程数”来控制同步,当到达屏障点的线程数达到指定值时,所有线程会被释放,并且 CyclicBarrier 可以被重置。
  • 可重用性不同: CountDownLatch 的计数器一旦减到 0,就不能再使用了。而 CyclicBarrier 可以被重置并重复使用。
  • 应用场景不同: CountDownLatch 适用于一个线程等待多个线程完成任务的场景,比如主线程等待多个工作线程完成初始化操作。而 CyclicBarrier 适用于多个线程相互等待,直到所有线程都到达一个共同点的场景,比如多线程计算任务。

总结

CyclicBarrier 是 Java 并发编程中一个非常有用的工具,它可以帮助我们简化多线程同步逻辑,实现多个线程之间的协作。尤其是在多线程计算场景下,CyclicBarrier 可以让我们更轻松地将大任务拆分成小任务,并最终汇总结果。希望通过今天的讲解,你能对CyclicBarrier有一个更深入的理解,并能在实际开发中灵活运用。

如果你在并发编程方面还有其他问题,或者想了解更多关于 CyclicBarrier 的高级用法,欢迎随时向我提问。我是“并发小能手”,咱们下次再见!

点评评价

captcha
健康