你好,我是老码农。今天我们来聊聊 Java 并发编程中一个非常实用的工具类:CyclicBarrier
。它就像一个“循环栅栏”,可以让你在多线程协作时,等待所有线程都到达某个屏障点后,再一起继续执行。对于CyclicBarrier
,你可能已经很熟悉它的基本用法了,但你是否真正了解它内部是如何实现线程同步的呢?特别是,它与我们熟知的CountDownLatch
有什么本质区别?今天,我们就一起深入挖掘CyclicBarrier
的内部机制,特别是它如何利用AbstractQueuedSynchronizer
(AQS)来实现线程同步,并对比CountDownLatch
,希望能帮助你更深入地理解 Java 并发编程的底层原理。
一、CyclicBarrier 的基本概念和应用场景
首先,我们来快速回顾一下CyclicBarrier
的基本概念和常见应用场景,方便你更好地理解后面的内容。
1.1 CyclicBarrier 的定义
CyclicBarrier
(循环栅栏)是 Java 并发包java.util.concurrent
中的一个同步辅助类。它允许一组线程在达到一个共同的屏障点(barrier point)时互相等待,直到所有线程都到达该屏障点,然后所有线程才能继续执行。CyclicBarrier
的名字中的“Cyclic”表明了它的可重用性,即在所有线程释放后,可以重复使用。
1.2 CyclicBarrier 的构造方法
CyclicBarrier
提供了两个构造方法:
// 创建一个 CyclicBarrier,指定需要等待的线程数量
public CyclicBarrier(int parties)
// 创建一个 CyclicBarrier,指定需要等待的线程数量,以及当所有线程到达屏障点时,要执行的 Runnable 任务
public CyclicBarrier(int parties, Runnable barrierAction)
parties
:表示需要等待的线程数量。当parties
个线程都调用了await()
方法后,屏障点才会被打开。barrierAction
:一个Runnable
对象,当所有线程到达屏障点时,CyclicBarrier
会执行这个Runnable
任务。这个任务通常用于汇总数据、处理结果等。
1.3 CyclicBarrier 的主要方法
await()
:调用此方法的线程将在此处等待,直到所有线程都到达屏障点。await(long timeout, TimeUnit unit)
:与await()
类似,但设置了超时时间。如果超时时间内,所有线程没有到达屏障点,则抛出TimeoutException
。getNumberWaiting()
:获取当前在屏障点等待的线程数量。getParties()
:获取CyclicBarrier
初始化时指定的线程数量。reset()
:将屏障重置为初始状态。如果正在等待的线程,会抛出BrokenBarrierException
。isBroken()
:判断屏障是否处于损坏状态。当其中一个线程中断或超时时,屏障会被损坏。
1.4 CyclicBarrier 的应用场景
CyclicBarrier
非常适合以下场景:
- 并发计算:例如,将一个大的计算任务拆分成多个子任务,分配给多个线程并发执行,每个线程负责计算一部分数据。当所有线程都完成计算后,需要将计算结果汇总,这时就可以使用
CyclicBarrier
。 - 游戏开发:在多人游戏中,
CyclicBarrier
可以用于等待所有玩家都准备好后,再开始游戏。 - 数据同步:例如,多个线程需要从不同的数据源读取数据,当所有线程都完成数据读取后,再进行数据处理或分析。
- 测试框架:在自动化测试中,
CyclicBarrier
可以用于等待所有测试用例都准备好后,再一起开始测试。
下面是一个简单的例子,演示了如何使用CyclicBarrier
:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 定义需要等待的线程数量
int parties = 3;
// 创建 CyclicBarrier,指定等待的线程数量和屏障动作
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("所有线程都到达屏障点,开始执行汇总操作");
});
// 创建并启动多个线程
for (int i = 0; i < parties; i++) {
int threadId = i + 1;
new Thread(() -> {
System.out.println("线程 " + threadId + " 开始工作...");
try {
// 模拟线程工作
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程 " + threadId + " 完成工作,等待其他线程...");
// 调用 await() 方法,等待其他线程
barrier.await();
System.out.println("线程 " + threadId + " 继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
在这个例子中,我们创建了3个线程,每个线程模拟完成一些工作后,调用barrier.await()
方法,在屏障点等待。当所有3个线程都到达屏障点后,CyclicBarrier
会执行barrierAction
,打印“所有线程都到达屏障点,开始执行汇总操作”,然后所有线程继续执行后面的代码。
二、CyclicBarrier 的内部实现:基于 AQS 的线程同步
现在,我们来深入CyclicBarrier
的内部,看看它是如何实现线程同步的。CyclicBarrier
的核心实现依赖于AbstractQueuedSynchronizer
(AQS)。
2.1 AQS 简介
AbstractQueuedSynchronizer
(AQS)是 Java 并发包java.util.concurrent
中的一个抽象类,它提供了一种构建锁和同步器的框架。许多并发工具类,如ReentrantLock
、Semaphore
、CountDownLatch
等,都是基于 AQS 实现的。AQS 内部维护了一个状态变量(state
),用于表示同步状态,以及一个FIFO 线程等待队列,用于管理等待获取同步状态的线程。AQS 提供了几个关键方法:
getState()
:获取当前同步状态。setState(int newState)
:设置同步状态。compareAndSetState(int expect, int update)
:CAS 操作,尝试原子性地更新同步状态。acquire(int arg)
:尝试以独占模式获取同步状态,如果获取失败,则将当前线程加入等待队列并阻塞。release(int arg)
:以独占模式释放同步状态,唤醒等待队列中的一个或多个线程。
2.2 CyclicBarrier 的核心成员变量
CyclicBarrier
内部主要维护了以下几个核心成员变量:
private final int parties;
:表示需要等待的线程数量,在构造方法中初始化,不可变。private final Runnable barrierCommand;
:当所有线程到达屏障点时,需要执行的Runnable
任务,在构造方法中初始化,可以为 null。private final ReentrantLock lock = new ReentrantLock();
:一个可重入锁,用于保护对CyclicBarrier
状态的访问。private final Condition trip = lock.newCondition();
:一个Condition
对象,用于线程的等待和唤醒。当线程调用await()
方法时,会将线程放入这个Condition
的等待队列中。private int count;
:当前已经到达屏障点的线程数量,初始化为parties
,每次有一个线程到达屏障点时,count
减1。当count
变为0时,所有线程都到达了屏障点。private Generation generation = new Generation();
:用于记录CyclicBarrier
的代数。CyclicBarrier
是可重复使用的,每次屏障被打破(either due to thread interruption or timeout)时,generation
会更新,防止线程在旧的屏障上等待。
2.3 CyclicBarrier 的 await() 方法
CyclicBarrier
的核心方法是await()
,它实现了线程的等待和同步。下面我们来详细分析await()
方法的实现:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(true, 0L);
} catch (TimeoutException toe) {
throw new AssertionError(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(false, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken) // 检查屏障是否已损坏
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier(); // 中断线程,损坏屏障
throw new InterruptedException();
}
int index = --count; // 递减计数器
if (index == 0) { // 最后一个到达的线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 执行 barrierAction
ranAction = true;
nextGeneration(); // 创建新的一代
return 0;
} finally {
if (!ranAction) // 如果 barrierAction 抛出异常,则损坏屏障
breakBarrier();
}
} else {
// 还有线程未到达屏障点
for (;;) {
try {
if (timed)
nanos = trip.awaitNanos(nanos); // 等待,设置超时时间
else
trip.await(); // 等待
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier(); // 中断线程,损坏屏障
throw ie;
} else {
// We're broken, so must reinterrupt.
Thread.currentThread().interrupt();
}
}
// 检查屏障是否已损坏
if (g.broken)
throw new BrokenBarrierException();
// 检查是否到达新的一代
if (g != generation)
return index; // 返回当前线程在屏障中的索引
if (timed && nanos <= 0L)
{
breakBarrier();
throw new TimeoutException();
}
}
}
} finally {
lock.unlock();
}
}
我们来逐步分析dowait()
方法的逻辑:
- 加锁:首先,使用
ReentrantLock
获取锁,确保线程安全。 - 检查屏障是否损坏:如果
generation.broken
为 true,说明屏障已经被损坏,抛出BrokenBarrierException
。 - 检查中断:如果当前线程被中断,则调用
breakBarrier()
方法损坏屏障,并抛出InterruptedException
。 - 递减计数器:
count
递减,表示有一个线程到达了屏障点。 - 判断是否是最后一个到达的线程:
- 如果是最后一个到达的线程(
index == 0
):- 执行
barrierCommand
(如果存在)。 - 调用
nextGeneration()
方法,创建新的一代。 - 唤醒所有等待在
trip
上的线程。 - 返回0,表示当前线程是最后一个到达的线程。
- 执行
- 如果不是最后一个到达的线程:
- 调用
trip.awaitNanos()
(如果设置了超时时间)或trip.await()
方法,使当前线程进入等待状态,直到以下情况之一发生:- 所有线程都到达屏障点(
count
变为0)。 - 发生中断,调用
breakBarrier()
方法,损坏屏障,并抛出InterruptedException
。 - 超时,调用
breakBarrier()
方法,损坏屏障,并抛出TimeoutException
。
- 所有线程都到达屏障点(
- 在等待期间,不断检查屏障是否损坏,以及是否到达新的一代。
- 如果被唤醒,返回当前线程在屏障中的索引。
- 调用
- 如果是最后一个到达的线程(
- 解锁:在
finally
块中释放锁。
2.4 核心流程总结
- 线程调用 await() 方法:线程尝试获取锁。如果获取成功,递减
count
计数器。如果count
变为0,则表示最后一个线程到达屏障点,执行barrierCommand
,然后唤醒所有等待的线程。如果count
不为0,则当前线程在trip
上等待。 - 线程等待:等待期间,如果发生中断或超时,则损坏屏障,并抛出异常。如果所有线程都到达屏障点,则唤醒所有等待的线程,并返回每个线程在屏障中的索引。
- 屏障重置:
nextGeneration()
方法会创建新的一代,重置count
,并释放lock
锁,使得CyclicBarrier
可以被重复使用。
2.5 与 AQS 的关系
CyclicBarrier
虽然没有直接继承 AQS,但它使用了 AQS 提供的Condition
接口。trip
对象实际上是一个Condition
实例,它利用了 AQS 提供的线程等待队列和唤醒机制。await()
方法中的trip.await()
和trip.awaitNanos()
方法,就是通过 AQS 提供的Condition
来实现线程的阻塞和唤醒。ReentrantLock
用于保护对CyclicBarrier
状态的访问,确保线程安全。
三、CyclicBarrier 与 CountDownLatch 的区别
CyclicBarrier
和CountDownLatch
都是 Java 并发包中常用的同步工具类,但它们的功能和使用场景有所不同。理解它们之间的区别,有助于你更好地选择合适的工具类来解决并发问题。
3.1 功能差异
- CyclicBarrier:
- 可重复使用:
CyclicBarrier
是可以重复使用的,在所有线程释放后,可以再次使用。这使得它非常适合于需要多次同步的场景。 - 同步多个线程:
CyclicBarrier
用于同步多个线程,等待所有线程都到达屏障点后,再一起执行。 - 屏障动作:
CyclicBarrier
可以指定一个barrierAction
,当所有线程到达屏障点时,会执行这个Runnable
任务。
- 可重复使用:
- CountDownLatch:
- 不可重复使用:
CountDownLatch
是不可重复使用的,一旦计数器变为0,就无法再次使用。如果需要重复使用,需要重新创建一个新的CountDownLatch
。 - 等待一个或多个线程完成:
CountDownLatch
用于等待一个或多个线程完成某个操作后,主线程或其他线程才能继续执行。 - 计数器:
CountDownLatch
使用一个计数器,当计数器变为0时,所有等待的线程被唤醒。
- 不可重复使用:
3.2 实现原理差异
- CyclicBarrier:基于
ReentrantLock
和Condition
实现,await()
方法使用Condition
的await()
方法阻塞线程,当所有线程到达屏障点时,使用Condition
的signalAll()
方法唤醒所有线程。 - CountDownLatch:基于
AQS
实现,维护一个state
变量,表示计数器的值。countDown()
方法使用CAS
操作递减state
的值,await()
方法使用AQS
的acquireShared()
方法阻塞线程,当state
变为0时,唤醒所有等待的线程。
3.3 使用场景差异
- CyclicBarrier:适用于需要多个线程相互等待,直到所有线程都到达某个屏障点后,再一起执行的场景。例如,并发计算、游戏开发、数据同步等。
- CountDownLatch:适用于一个或多个线程等待其他线程完成某个操作后,才能继续执行的场景。例如,主线程等待多个子线程完成初始化工作,或者等待多个线程下载文件等。
3.4 关键区别总结
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
可重复使用性 | 可重复使用 | 不可重复使用 |
同步线程数量 | 同步多个线程 | 等待一个或多个线程完成 |
屏障动作 | 有 barrierAction,当所有线程到达屏障点时执行 | 无 |
实现原理 | 基于 ReentrantLock 和 Condition | 基于 AQS |
计数器 | 隐含的计数器,由到达屏障点的线程数量决定 | 显式的计数器,可以初始化 |
使用场景 | 并发计算、游戏开发、数据同步等 | 主线程等待子线程完成初始化、等待线程下载文件等 |
四、CyclicBarrier 的注意事项
在使用CyclicBarrier
时,需要注意以下几点:
- 异常处理:
await()
方法会抛出InterruptedException
和BrokenBarrierException
。InterruptedException
表示线程在等待过程中被中断,BrokenBarrierException
表示屏障被损坏。在使用时,需要进行适当的异常处理。 - 屏障损坏:当一个线程在
await()
方法中被中断或超时,或者barrierAction
抛出异常时,屏障会被损坏。一旦屏障被损坏,所有调用await()
方法的线程都会抛出BrokenBarrierException
。 - 线程数量:
CyclicBarrier
的构造方法中指定的线程数量是固定的。如果实际参与同步的线程数量与parties
不一致,会导致线程一直阻塞,无法继续执行。 - reset() 方法:谨慎使用
reset()
方法。reset()
方法会重置屏障,但是所有正在等待的线程都会抛出BrokenBarrierException
。如果需要重置屏障,建议在屏障损坏的情况下使用,或者在确保所有线程都安全的情况下使用。 - 与线程池结合使用:当与线程池结合使用时,需要特别注意。因为线程池中的线程可能被复用,如果在
barrierAction
中使用了线程局部变量,需要确保在每次屏障被打开时,线程局部变量被正确地初始化或清理。
五、总结
今天,我们深入探讨了CyclicBarrier
的内部实现机制,以及它与CountDownLatch
的区别。我们了解到,CyclicBarrier
基于ReentrantLock
和Condition
实现,通过await()
方法实现线程的等待和同步。与CountDownLatch
相比,CyclicBarrier
更适合于需要多个线程相互等待,直到所有线程都到达某个屏障点后,再一起执行的场景。希望通过今天的分享,你对CyclicBarrier
的理解能够更上一层楼,在实际的并发编程中,能够更加灵活地运用它。
如果你喜欢这篇文章,请点个赞,也欢迎在评论区留下你的想法和问题。我们下次再见!