Java 并发编程:CyclicBarrier 实战详解,多线程计算场景下的协作利器
你好,我是你的并发编程助手“并发小能手”。在 Java 并发编程的世界里,协调多个线程的执行顺序和同步操作是一项常见的挑战。今天,咱们就来聊聊 CyclicBarrier
这个强大的工具,看看它是如何在多线程计算场景下大显身手的。
为什么需要 CyclicBarrier?
在很多实际应用中,我们需要将一个大任务拆分成多个小任务,然后交给多个线程并行处理,最后再将各个线程的结果汇总起来。比如,我们要计算一个大型数组中所有元素的总和。我们可以将数组分成多个部分,每个线程负责计算一部分数据的和,最后再将这些部分和加起来得到最终结果。
在这个过程中,我们需要确保所有线程都完成了自己的计算任务后,才能进行最终的结果汇总。如果某个线程还没算完,我们就提前进行了汇总,那么得到的结果肯定是错误的。这时候,CyclicBarrier
就能派上用场了。
CyclicBarrier 是什么?
CyclicBarrier
,顾名思义,就是一个“循环的屏障”。它允许一组线程相互等待,直到所有线程都到达一个共同的屏障点(barrier point),然后再一起继续执行。这个“循环”的含义在于,当所有线程都到达屏障点后,CyclicBarrier
可以被重置并重复使用。
CyclicBarrier
的核心思想是:
- 设置参与线程数: 在创建
CyclicBarrier
对象时,我们需要指定参与同步的线程数量。 - 等待所有线程到达: 每个线程在完成自己的任务后,会调用
CyclicBarrier
的await()
方法,表示自己已经到达了屏障点。await()
方法会阻塞当前线程,直到所有参与线程都调用了await()
方法。 - 执行共同操作(可选): 当所有线程都到达屏障点后,
CyclicBarrier
可以选择执行一个预先定义的 Runnable 任务(可选)。这个任务只会被其中一个线程执行一次。 - 继续执行后续任务: 共同操作执行完毕后(或者没有定义共同操作),所有线程会从
await()
方法返回,继续执行各自的后续任务。
CyclicBarrier 怎么用?
下面,我们就通过一个具体的例子来演示 CyclicBarrier
的用法。假设我们要计算一个大型数组中所有元素的总和,我们可以将数组分成 4 个部分,每个线程负责计算其中一部分的和,最后再将这些部分和加起来。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
public class CyclicBarrierExample {
// 假设我们要计算的数组
private static final int[] data = new int[1000];
// 参与计算的线程数
private static final int THREAD_COUNT = 4;
// 用于保存每个线程计算的部分和
private static final int[] partialSums = new int[THREAD_COUNT];
// 用于保存最终结果
private static final AtomicInteger finalSum = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 初始化数组
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}
// 创建 CyclicBarrier 对象,指定参与线程数和所有线程到达屏障点后要执行的任务
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
// 在所有线程都到达屏障点后,汇总各个线程的部分和
for (int partialSum : partialSums) {
finalSum.addAndGet(partialSum);
}
System.out.println("所有线程计算完毕,最终结果为:" + finalSum.get());
});
// 创建并启动多个线程
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadIndex = i;
threads[i] = new Thread(() -> {
// 计算当前线程负责的部分数据的和
int startIndex = threadIndex * (data.length / THREAD_COUNT);
int endIndex = (threadIndex + 1) * (data.length / THREAD_COUNT);
int sum = 0;
for (int j = startIndex; j < endIndex; j++) {
sum += data[j];
}
partialSums[threadIndex] = sum;
System.out.println("线程 " + Thread.currentThread().getName() + " 计算完毕,部分和为:" + sum);
try {
// 等待其他线程到达屏障点
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
// 线程可以继续执行其他任务(如果有的话)
System.out.println("线程 " + Thread.currentThread().getName() + " 继续执行其他任务...");
});
threads[i].start();
}
// 等待所有线程结束
for(Thread t : threads){
t.join();
}
System.out.println("所有线程任务全部完成");
}
}
代码解读:
- 初始化数据: 我们创建了一个包含 1000 个元素的数组
data
,并初始化了每个元素的值。 - 创建
CyclicBarrier
: 我们创建了一个CyclicBarrier
对象,并指定参与同步的线程数为THREAD_COUNT
(这里是 4)。同时,我们还传入了一个Runnable
任务,这个任务会在所有线程都到达屏障点后执行,负责汇总各个线程的部分和。 - 创建并启动线程: 我们创建了 4 个线程,每个线程负责计算数组中一部分数据的和。每个线程计算完自己的部分和后,会将结果保存到
partialSums
数组中,然后调用barrier.await()
方法等待其他线程。 await()
方法:await()
方法会阻塞当前线程,直到所有参与线程(这里是 4 个)都调用了await()
方法。当所有线程都到达屏障点后,CyclicBarrier
会自动执行我们在第 2 步中定义的Runnable
任务(汇总部分和)。- 汇总结果:
Runnable
任务会遍历partialSums
数组,将各个线程的部分和累加到finalSum
变量中,并打印最终结果。 - 继续后续的任务:
Runnable
任务执行完后,所有之前在await()
方法阻塞的线程都会返回,每个线程可以继续执行自己的其他任务。 - 主线程等待:通过
join()
方法,主线程会等待所有计算线程完成后再继续执行,确保所有任务的完成。
运行结果示例:
线程 Thread-0 计算完毕,部分和为:31375
线程 Thread-2 计算完毕,部分和为:93875
线程 Thread-1 计算完毕,部分和为:62625
线程 Thread-3 计算完毕,部分和为:125125
所有线程计算完毕,最终结果为:313000
线程 Thread-2 继续执行其他任务...
线程 Thread-0 继续执行其他任务...
线程 Thread-3 继续执行其他任务...
线程 Thread-1 继续执行其他任务...
所有线程任务全部完成
从运行结果可以看出,4 个线程分别计算出了自己的部分和,然后 CyclicBarrier
将这些部分和汇总起来,得到了最终结果。所有线程到达CyclicBarrier
的await()
之后,继续执行了各自其他的任务,最终所有线程的任务都完成。
CyclicBarrier 的优势
通过上面的例子,我们可以看到 CyclicBarrier
在简化并发编程方面的优势:
- 简化同步逻辑: 我们不需要手动编写复杂的线程同步代码,只需要创建一个
CyclicBarrier
对象,然后在每个线程完成任务后调用await()
方法即可。 - 可重用性:
CyclicBarrier
是可以重用的。当所有线程都到达屏障点后,CyclicBarrier
会自动重置,可以再次使用。 - 支持共同操作:
CyclicBarrier
可以在所有线程都到达屏障点后执行一个预定义的Runnable
任务,这在某些场景下非常有用,比如汇总各个线程的计算结果。
CyclicBarrier 和 CountDownLatch 的区别
在 Java 并发包中,还有一个和 CyclicBarrier
类似的工具,叫做 CountDownLatch
。它们都可以用来协调多个线程的执行,但也有一些区别:
- 计数方式不同:
CountDownLatch
使用计数器来控制线程的同步,计数器的值在创建时指定,并且只能减小,不能重置。而CyclicBarrier
使用“到达屏障点的线程数”来控制同步,当到达屏障点的线程数达到指定值时,所有线程会被释放,并且CyclicBarrier
可以被重置。 - 可重用性不同:
CountDownLatch
的计数器一旦减到 0,就不能再使用了。而CyclicBarrier
可以被重置并重复使用。 - 应用场景不同:
CountDownLatch
适用于一个线程等待多个线程完成任务的场景,比如主线程等待多个工作线程完成初始化操作。而CyclicBarrier
适用于多个线程相互等待,直到所有线程都到达一个共同点的场景,比如多线程计算任务。
总结
CyclicBarrier
是 Java 并发编程中一个非常有用的工具,它可以帮助我们简化多线程同步逻辑,实现多个线程之间的协作。尤其是在多线程计算场景下,CyclicBarrier
可以让我们更轻松地将大任务拆分成小任务,并最终汇总结果。希望通过今天的讲解,你能对CyclicBarrier
有一个更深入的理解,并能在实际开发中灵活运用。
如果你在并发编程方面还有其他问题,或者想了解更多关于 CyclicBarrier
的高级用法,欢迎随时向我提问。我是“并发小能手”,咱们下次再见!