你好,我是老码农。今天我们来聊聊 Java 并发编程中一个非常实用的工具类:CyclicBarrier。它就像一个“循环栅栏”,可以让你在多线程协作时,等待所有线程都到达某个屏障点后,再一起继续执行。对于CyclicBarrier,你可能已经很熟悉它的基本用法了,但你是否真正了解它内部是如何实现线程同步的呢?特别是,它与我们熟知的CountDownLatch有什么本质区别?今天,我们就一起深入挖掘CyclicBarrier的内部机制,特别是它如何利用AbstractQueuedSynchronizer(AQS)来实现线程同步,并对比CountDownLatch,希望能帮助你更深入地理解 Java 并发编程的底层原理。
一、CyclicBarrier 的基本概念和应用场景
首先,我们来快速回顾一下CyclicBarrier的基本概念和常见应用场景,方便你更好地理解后面的内容。
1.1 CyclicBarrier 的定义
CyclicBarrier(循环栅栏)是 Java 并发包java.util.concurrent中的一个同步辅助类。它允许一组线程在达到一个共同的屏障点(barrier point)时互相等待,直到所有线程都到达该屏障点,然后所有线程才能继续执行。CyclicBarrier的名字中的“Cyclic”表明了它的可重用性,即在所有线程释放后,可以重复使用。
1.2 CyclicBarrier 的构造方法
CyclicBarrier提供了两个构造方法:
// 创建一个 CyclicBarrier,指定需要等待的线程数量
public CyclicBarrier(int parties)
// 创建一个 CyclicBarrier,指定需要等待的线程数量,以及当所有线程到达屏障点时,要执行的 Runnable 任务
public CyclicBarrier(int parties, Runnable barrierAction)
- parties:表示需要等待的线程数量。当- parties个线程都调用了- await()方法后,屏障点才会被打开。
- barrierAction:一个- Runnable对象,当所有线程到达屏障点时,- CyclicBarrier会执行这个- Runnable任务。这个任务通常用于汇总数据、处理结果等。
1.3 CyclicBarrier 的主要方法
- await():调用此方法的线程将在此处等待,直到所有线程都到达屏障点。
- await(long timeout, TimeUnit unit):与- await()类似,但设置了超时时间。如果超时时间内,所有线程没有到达屏障点,则抛出- TimeoutException。
- getNumberWaiting():获取当前在屏障点等待的线程数量。
- getParties():获取- CyclicBarrier初始化时指定的线程数量。
- reset():将屏障重置为初始状态。如果正在等待的线程,会抛出- BrokenBarrierException。
- isBroken():判断屏障是否处于损坏状态。当其中一个线程中断或超时时,屏障会被损坏。
1.4 CyclicBarrier 的应用场景
CyclicBarrier非常适合以下场景:
- 并发计算:例如,将一个大的计算任务拆分成多个子任务,分配给多个线程并发执行,每个线程负责计算一部分数据。当所有线程都完成计算后,需要将计算结果汇总,这时就可以使用CyclicBarrier。
- 游戏开发:在多人游戏中,CyclicBarrier可以用于等待所有玩家都准备好后,再开始游戏。
- 数据同步:例如,多个线程需要从不同的数据源读取数据,当所有线程都完成数据读取后,再进行数据处理或分析。
- 测试框架:在自动化测试中,CyclicBarrier可以用于等待所有测试用例都准备好后,再一起开始测试。
下面是一个简单的例子,演示了如何使用CyclicBarrier:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 定义需要等待的线程数量
        int parties = 3;
        // 创建 CyclicBarrier,指定等待的线程数量和屏障动作
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有线程都到达屏障点,开始执行汇总操作");
        });
        // 创建并启动多个线程
        for (int i = 0; i < parties; i++) {
            int threadId = i + 1;
            new Thread(() -> {
                System.out.println("线程 " + threadId + " 开始工作...");
                try {
                    // 模拟线程工作
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("线程 " + threadId + " 完成工作,等待其他线程...");
                    // 调用 await() 方法,等待其他线程
                    barrier.await();
                    System.out.println("线程 " + threadId + " 继续执行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
在这个例子中,我们创建了3个线程,每个线程模拟完成一些工作后,调用barrier.await()方法,在屏障点等待。当所有3个线程都到达屏障点后,CyclicBarrier会执行barrierAction,打印“所有线程都到达屏障点,开始执行汇总操作”,然后所有线程继续执行后面的代码。
二、CyclicBarrier 的内部实现:基于 AQS 的线程同步
现在,我们来深入CyclicBarrier的内部,看看它是如何实现线程同步的。CyclicBarrier的核心实现依赖于AbstractQueuedSynchronizer(AQS)。
2.1 AQS 简介
AbstractQueuedSynchronizer(AQS)是 Java 并发包java.util.concurrent中的一个抽象类,它提供了一种构建锁和同步器的框架。许多并发工具类,如ReentrantLock、Semaphore、CountDownLatch等,都是基于 AQS 实现的。AQS 内部维护了一个状态变量(state),用于表示同步状态,以及一个FIFO 线程等待队列,用于管理等待获取同步状态的线程。AQS 提供了几个关键方法:
- getState():获取当前同步状态。
- setState(int newState):设置同步状态。
- compareAndSetState(int expect, int update):CAS 操作,尝试原子性地更新同步状态。
- acquire(int arg):尝试以独占模式获取同步状态,如果获取失败,则将当前线程加入等待队列并阻塞。
- release(int arg):以独占模式释放同步状态,唤醒等待队列中的一个或多个线程。
2.2 CyclicBarrier 的核心成员变量
CyclicBarrier内部主要维护了以下几个核心成员变量:
- private final int parties;:表示需要等待的线程数量,在构造方法中初始化,不可变。
- private final Runnable barrierCommand;:当所有线程到达屏障点时,需要执行的- Runnable任务,在构造方法中初始化,可以为 null。
- private final ReentrantLock lock = new ReentrantLock();:一个可重入锁,用于保护对- CyclicBarrier状态的访问。
- private final Condition trip = lock.newCondition();:一个- Condition对象,用于线程的等待和唤醒。当线程调用- await()方法时,会将线程放入这个- Condition的等待队列中。
- private int count;:当前已经到达屏障点的线程数量,初始化为- parties,每次有一个线程到达屏障点时,- count减1。当- count变为0时,所有线程都到达了屏障点。
- private Generation generation = new Generation();:用于记录- CyclicBarrier的代数。- CyclicBarrier是可重复使用的,每次屏障被打破(either due to thread interruption or timeout)时,- generation会更新,防止线程在旧的屏障上等待。
2.3 CyclicBarrier 的 await() 方法
CyclicBarrier的核心方法是await(),它实现了线程的等待和同步。下面我们来详细分析await()方法的实现:
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(true, 0L);
    } catch (TimeoutException toe) {
        throw new AssertionError(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
    return dowait(false, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken) // 检查屏障是否已损坏
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier(); // 中断线程,损坏屏障
            throw new InterruptedException();
        }
        int index = --count; // 递减计数器
        if (index == 0) { // 最后一个到达的线程
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run(); // 执行 barrierAction
                ranAction = true;
                nextGeneration(); // 创建新的一代
                return 0;
            } finally {
                if (!ranAction) // 如果 barrierAction 抛出异常,则损坏屏障
                    breakBarrier();
            }
        } else {
            // 还有线程未到达屏障点
            for (;;) {
                try {
                    if (timed)
                        nanos = trip.awaitNanos(nanos); // 等待,设置超时时间
                    else
                        trip.await(); // 等待
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier(); // 中断线程,损坏屏障
                        throw ie;
                    } else {
                        // We're broken, so must reinterrupt.
                        Thread.currentThread().interrupt();
                    }
                }
                // 检查屏障是否已损坏
                if (g.broken)
                    throw new BrokenBarrierException();
                // 检查是否到达新的一代
                if (g != generation)
                    return index; // 返回当前线程在屏障中的索引
                if (timed && nanos <= 0L)
                {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        }
    } finally {
        lock.unlock();
    }
}
我们来逐步分析dowait()方法的逻辑:
- 加锁:首先,使用ReentrantLock获取锁,确保线程安全。
- 检查屏障是否损坏:如果generation.broken为 true,说明屏障已经被损坏,抛出BrokenBarrierException。
- 检查中断:如果当前线程被中断,则调用breakBarrier()方法损坏屏障,并抛出InterruptedException。
- 递减计数器:count递减,表示有一个线程到达了屏障点。
- 判断是否是最后一个到达的线程:- 如果是最后一个到达的线程(index == 0):- 执行barrierCommand(如果存在)。
- 调用nextGeneration()方法,创建新的一代。
- 唤醒所有等待在trip上的线程。
- 返回0,表示当前线程是最后一个到达的线程。
 
- 执行
- 如果不是最后一个到达的线程:- 调用trip.awaitNanos()(如果设置了超时时间)或trip.await()方法,使当前线程进入等待状态,直到以下情况之一发生:- 所有线程都到达屏障点(count变为0)。
- 发生中断,调用breakBarrier()方法,损坏屏障,并抛出InterruptedException。
- 超时,调用breakBarrier()方法,损坏屏障,并抛出TimeoutException。
 
- 所有线程都到达屏障点(
- 在等待期间,不断检查屏障是否损坏,以及是否到达新的一代。
- 如果被唤醒,返回当前线程在屏障中的索引。
 
- 调用
 
- 如果是最后一个到达的线程(
- 解锁:在finally块中释放锁。
2.4 核心流程总结
- 线程调用 await() 方法:线程尝试获取锁。如果获取成功,递减count计数器。如果count变为0,则表示最后一个线程到达屏障点,执行barrierCommand,然后唤醒所有等待的线程。如果count不为0,则当前线程在trip上等待。
- 线程等待:等待期间,如果发生中断或超时,则损坏屏障,并抛出异常。如果所有线程都到达屏障点,则唤醒所有等待的线程,并返回每个线程在屏障中的索引。
- 屏障重置:nextGeneration()方法会创建新的一代,重置count,并释放lock锁,使得CyclicBarrier可以被重复使用。
2.5 与 AQS 的关系
CyclicBarrier虽然没有直接继承 AQS,但它使用了 AQS 提供的Condition接口。trip对象实际上是一个Condition实例,它利用了 AQS 提供的线程等待队列和唤醒机制。await()方法中的trip.await()和trip.awaitNanos()方法,就是通过 AQS 提供的Condition来实现线程的阻塞和唤醒。ReentrantLock用于保护对CyclicBarrier状态的访问,确保线程安全。
三、CyclicBarrier 与 CountDownLatch 的区别
CyclicBarrier和CountDownLatch都是 Java 并发包中常用的同步工具类,但它们的功能和使用场景有所不同。理解它们之间的区别,有助于你更好地选择合适的工具类来解决并发问题。
3.1 功能差异
- CyclicBarrier:- 可重复使用:CyclicBarrier是可以重复使用的,在所有线程释放后,可以再次使用。这使得它非常适合于需要多次同步的场景。
- 同步多个线程:CyclicBarrier用于同步多个线程,等待所有线程都到达屏障点后,再一起执行。
- 屏障动作:CyclicBarrier可以指定一个barrierAction,当所有线程到达屏障点时,会执行这个Runnable任务。
 
- 可重复使用:
- CountDownLatch:- 不可重复使用:CountDownLatch是不可重复使用的,一旦计数器变为0,就无法再次使用。如果需要重复使用,需要重新创建一个新的CountDownLatch。
- 等待一个或多个线程完成:CountDownLatch用于等待一个或多个线程完成某个操作后,主线程或其他线程才能继续执行。
- 计数器:CountDownLatch使用一个计数器,当计数器变为0时,所有等待的线程被唤醒。
 
- 不可重复使用:
3.2 实现原理差异
- CyclicBarrier:基于ReentrantLock和Condition实现,await()方法使用Condition的await()方法阻塞线程,当所有线程到达屏障点时,使用Condition的signalAll()方法唤醒所有线程。
- CountDownLatch:基于AQS实现,维护一个state变量,表示计数器的值。countDown()方法使用CAS操作递减state的值,await()方法使用AQS的acquireShared()方法阻塞线程,当state变为0时,唤醒所有等待的线程。
3.3 使用场景差异
- CyclicBarrier:适用于需要多个线程相互等待,直到所有线程都到达某个屏障点后,再一起执行的场景。例如,并发计算、游戏开发、数据同步等。
- CountDownLatch:适用于一个或多个线程等待其他线程完成某个操作后,才能继续执行的场景。例如,主线程等待多个子线程完成初始化工作,或者等待多个线程下载文件等。
3.4 关键区别总结
| 特性 | CyclicBarrier | CountDownLatch | 
|---|---|---|
| 可重复使用性 | 可重复使用 | 不可重复使用 | 
| 同步线程数量 | 同步多个线程 | 等待一个或多个线程完成 | 
| 屏障动作 | 有 barrierAction,当所有线程到达屏障点时执行 | 无 | 
| 实现原理 | 基于 ReentrantLock 和 Condition | 基于 AQS | 
| 计数器 | 隐含的计数器,由到达屏障点的线程数量决定 | 显式的计数器,可以初始化 | 
| 使用场景 | 并发计算、游戏开发、数据同步等 | 主线程等待子线程完成初始化、等待线程下载文件等 | 
四、CyclicBarrier 的注意事项
在使用CyclicBarrier时,需要注意以下几点:
- 异常处理:await()方法会抛出InterruptedException和BrokenBarrierException。InterruptedException表示线程在等待过程中被中断,BrokenBarrierException表示屏障被损坏。在使用时,需要进行适当的异常处理。
- 屏障损坏:当一个线程在await()方法中被中断或超时,或者barrierAction抛出异常时,屏障会被损坏。一旦屏障被损坏,所有调用await()方法的线程都会抛出BrokenBarrierException。
- 线程数量:CyclicBarrier的构造方法中指定的线程数量是固定的。如果实际参与同步的线程数量与parties不一致,会导致线程一直阻塞,无法继续执行。
- reset() 方法:谨慎使用reset()方法。reset()方法会重置屏障,但是所有正在等待的线程都会抛出BrokenBarrierException。如果需要重置屏障,建议在屏障损坏的情况下使用,或者在确保所有线程都安全的情况下使用。
- 与线程池结合使用:当与线程池结合使用时,需要特别注意。因为线程池中的线程可能被复用,如果在barrierAction中使用了线程局部变量,需要确保在每次屏障被打开时,线程局部变量被正确地初始化或清理。
五、总结
今天,我们深入探讨了CyclicBarrier的内部实现机制,以及它与CountDownLatch的区别。我们了解到,CyclicBarrier基于ReentrantLock和Condition实现,通过await()方法实现线程的等待和同步。与CountDownLatch相比,CyclicBarrier更适合于需要多个线程相互等待,直到所有线程都到达某个屏障点后,再一起执行的场景。希望通过今天的分享,你对CyclicBarrier的理解能够更上一层楼,在实际的并发编程中,能够更加灵活地运用它。
如果你喜欢这篇文章,请点个赞,也欢迎在评论区留下你的想法和问题。我们下次再见!

