HOOOS

深入揭秘 CyclicBarrier:从 AQS 实现到与 CountDownLatch 的差异

0 55 老码农 Java并发编程CyclicBarrier
Apple

你好,我是老码农。今天我们来聊聊 Java 并发编程中一个非常实用的工具类:CyclicBarrier。它就像一个“循环栅栏”,可以让你在多线程协作时,等待所有线程都到达某个屏障点后,再一起继续执行。对于CyclicBarrier,你可能已经很熟悉它的基本用法了,但你是否真正了解它内部是如何实现线程同步的呢?特别是,它与我们熟知的CountDownLatch有什么本质区别?今天,我们就一起深入挖掘CyclicBarrier的内部机制,特别是它如何利用AbstractQueuedSynchronizer(AQS)来实现线程同步,并对比CountDownLatch,希望能帮助你更深入地理解 Java 并发编程的底层原理。

一、CyclicBarrier 的基本概念和应用场景

首先,我们来快速回顾一下CyclicBarrier的基本概念和常见应用场景,方便你更好地理解后面的内容。

1.1 CyclicBarrier 的定义

CyclicBarrier(循环栅栏)是 Java 并发包java.util.concurrent中的一个同步辅助类。它允许一组线程在达到一个共同的屏障点(barrier point)时互相等待,直到所有线程都到达该屏障点,然后所有线程才能继续执行。CyclicBarrier的名字中的“Cyclic”表明了它的可重用性,即在所有线程释放后,可以重复使用。

1.2 CyclicBarrier 的构造方法

CyclicBarrier提供了两个构造方法:

// 创建一个 CyclicBarrier,指定需要等待的线程数量
public CyclicBarrier(int parties)

// 创建一个 CyclicBarrier,指定需要等待的线程数量,以及当所有线程到达屏障点时,要执行的 Runnable 任务
public CyclicBarrier(int parties, Runnable barrierAction)
  • parties:表示需要等待的线程数量。当parties个线程都调用了await()方法后,屏障点才会被打开。
  • barrierAction:一个Runnable对象,当所有线程到达屏障点时,CyclicBarrier会执行这个Runnable任务。这个任务通常用于汇总数据、处理结果等。

1.3 CyclicBarrier 的主要方法

  • await():调用此方法的线程将在此处等待,直到所有线程都到达屏障点。
  • await(long timeout, TimeUnit unit):与await()类似,但设置了超时时间。如果超时时间内,所有线程没有到达屏障点,则抛出TimeoutException
  • getNumberWaiting():获取当前在屏障点等待的线程数量。
  • getParties():获取CyclicBarrier初始化时指定的线程数量。
  • reset():将屏障重置为初始状态。如果正在等待的线程,会抛出BrokenBarrierException
  • isBroken():判断屏障是否处于损坏状态。当其中一个线程中断或超时时,屏障会被损坏。

1.4 CyclicBarrier 的应用场景

CyclicBarrier非常适合以下场景:

  • 并发计算:例如,将一个大的计算任务拆分成多个子任务,分配给多个线程并发执行,每个线程负责计算一部分数据。当所有线程都完成计算后,需要将计算结果汇总,这时就可以使用CyclicBarrier
  • 游戏开发:在多人游戏中,CyclicBarrier可以用于等待所有玩家都准备好后,再开始游戏。
  • 数据同步:例如,多个线程需要从不同的数据源读取数据,当所有线程都完成数据读取后,再进行数据处理或分析。
  • 测试框架:在自动化测试中,CyclicBarrier可以用于等待所有测试用例都准备好后,再一起开始测试。

下面是一个简单的例子,演示了如何使用CyclicBarrier

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {

    public static void main(String[] args) {
        // 定义需要等待的线程数量
        int parties = 3;

        // 创建 CyclicBarrier,指定等待的线程数量和屏障动作
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有线程都到达屏障点,开始执行汇总操作");
        });

        // 创建并启动多个线程
        for (int i = 0; i < parties; i++) {
            int threadId = i + 1;
            new Thread(() -> {
                System.out.println("线程 " + threadId + " 开始工作...");
                try {
                    // 模拟线程工作
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("线程 " + threadId + " 完成工作,等待其他线程...");
                    // 调用 await() 方法,等待其他线程
                    barrier.await();
                    System.out.println("线程 " + threadId + " 继续执行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在这个例子中,我们创建了3个线程,每个线程模拟完成一些工作后,调用barrier.await()方法,在屏障点等待。当所有3个线程都到达屏障点后,CyclicBarrier会执行barrierAction,打印“所有线程都到达屏障点,开始执行汇总操作”,然后所有线程继续执行后面的代码。

二、CyclicBarrier 的内部实现:基于 AQS 的线程同步

现在,我们来深入CyclicBarrier的内部,看看它是如何实现线程同步的。CyclicBarrier的核心实现依赖于AbstractQueuedSynchronizer(AQS)。

2.1 AQS 简介

AbstractQueuedSynchronizer(AQS)是 Java 并发包java.util.concurrent中的一个抽象类,它提供了一种构建锁和同步器的框架。许多并发工具类,如ReentrantLockSemaphoreCountDownLatch等,都是基于 AQS 实现的。AQS 内部维护了一个状态变量state),用于表示同步状态,以及一个FIFO 线程等待队列,用于管理等待获取同步状态的线程。AQS 提供了几个关键方法:

  • getState():获取当前同步状态。
  • setState(int newState):设置同步状态。
  • compareAndSetState(int expect, int update):CAS 操作,尝试原子性地更新同步状态。
  • acquire(int arg):尝试以独占模式获取同步状态,如果获取失败,则将当前线程加入等待队列并阻塞。
  • release(int arg):以独占模式释放同步状态,唤醒等待队列中的一个或多个线程。

2.2 CyclicBarrier 的核心成员变量

CyclicBarrier内部主要维护了以下几个核心成员变量:

  • private final int parties;:表示需要等待的线程数量,在构造方法中初始化,不可变。
  • private final Runnable barrierCommand;:当所有线程到达屏障点时,需要执行的Runnable任务,在构造方法中初始化,可以为 null。
  • private final ReentrantLock lock = new ReentrantLock();:一个可重入锁,用于保护对CyclicBarrier状态的访问。
  • private final Condition trip = lock.newCondition();:一个Condition对象,用于线程的等待和唤醒。当线程调用await()方法时,会将线程放入这个Condition的等待队列中。
  • private int count;:当前已经到达屏障点的线程数量,初始化为parties,每次有一个线程到达屏障点时,count减1。当count变为0时,所有线程都到达了屏障点。
  • private Generation generation = new Generation();:用于记录CyclicBarrier的代数。CyclicBarrier是可重复使用的,每次屏障被打破(either due to thread interruption or timeout)时,generation会更新,防止线程在旧的屏障上等待。

2.3 CyclicBarrier 的 await() 方法

CyclicBarrier的核心方法是await(),它实现了线程的等待和同步。下面我们来详细分析await()方法的实现:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(true, 0L);
    } catch (TimeoutException toe) {
        throw new AssertionError(toe); // cannot happen
    }
}

public int await(long timeout, TimeUnit unit)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
    return dowait(false, unit.toNanos(timeout));
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken) // 检查屏障是否已损坏
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier(); // 中断线程,损坏屏障
            throw new InterruptedException();
        }

        int index = --count; // 递减计数器
        if (index == 0) { // 最后一个到达的线程
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run(); // 执行 barrierAction
                ranAction = true;
                nextGeneration(); // 创建新的一代
                return 0;
            } finally {
                if (!ranAction) // 如果 barrierAction 抛出异常,则损坏屏障
                    breakBarrier();
            }
        } else {
            // 还有线程未到达屏障点
            for (;;) {
                try {
                    if (timed)
                        nanos = trip.awaitNanos(nanos); // 等待,设置超时时间
                    else
                        trip.await(); // 等待
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier(); // 中断线程,损坏屏障
                        throw ie;
                    } else {
                        // We're broken, so must reinterrupt.
                        Thread.currentThread().interrupt();
                    }
                }
                // 检查屏障是否已损坏
                if (g.broken)
                    throw new BrokenBarrierException();

                // 检查是否到达新的一代
                if (g != generation)
                    return index; // 返回当前线程在屏障中的索引

                if (timed && nanos <= 0L)
                {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        }
    } finally {
        lock.unlock();
    }
}

我们来逐步分析dowait()方法的逻辑:

  1. 加锁:首先,使用ReentrantLock获取锁,确保线程安全。
  2. 检查屏障是否损坏:如果generation.broken为 true,说明屏障已经被损坏,抛出BrokenBarrierException
  3. 检查中断:如果当前线程被中断,则调用breakBarrier()方法损坏屏障,并抛出InterruptedException
  4. 递减计数器count递减,表示有一个线程到达了屏障点。
  5. 判断是否是最后一个到达的线程
    • 如果是最后一个到达的线程index == 0):
      • 执行barrierCommand(如果存在)。
      • 调用nextGeneration()方法,创建新的一代。
      • 唤醒所有等待在trip上的线程。
      • 返回0,表示当前线程是最后一个到达的线程。
    • 如果不是最后一个到达的线程
      • 调用trip.awaitNanos()(如果设置了超时时间)或trip.await()方法,使当前线程进入等待状态,直到以下情况之一发生:
        • 所有线程都到达屏障点(count变为0)。
        • 发生中断,调用breakBarrier()方法,损坏屏障,并抛出InterruptedException
        • 超时,调用breakBarrier()方法,损坏屏障,并抛出TimeoutException
      • 在等待期间,不断检查屏障是否损坏,以及是否到达新的一代。
      • 如果被唤醒,返回当前线程在屏障中的索引。
  6. 解锁:在finally块中释放锁。

2.4 核心流程总结

  1. 线程调用 await() 方法:线程尝试获取锁。如果获取成功,递减count计数器。如果count变为0,则表示最后一个线程到达屏障点,执行barrierCommand,然后唤醒所有等待的线程。如果count不为0,则当前线程在trip上等待。
  2. 线程等待:等待期间,如果发生中断或超时,则损坏屏障,并抛出异常。如果所有线程都到达屏障点,则唤醒所有等待的线程,并返回每个线程在屏障中的索引。
  3. 屏障重置nextGeneration()方法会创建新的一代,重置count,并释放lock锁,使得CyclicBarrier可以被重复使用。

2.5 与 AQS 的关系

CyclicBarrier虽然没有直接继承 AQS,但它使用了 AQS 提供的Condition接口。trip对象实际上是一个Condition实例,它利用了 AQS 提供的线程等待队列和唤醒机制。await()方法中的trip.await()trip.awaitNanos()方法,就是通过 AQS 提供的Condition来实现线程的阻塞和唤醒。ReentrantLock用于保护对CyclicBarrier状态的访问,确保线程安全。

三、CyclicBarrier 与 CountDownLatch 的区别

CyclicBarrierCountDownLatch都是 Java 并发包中常用的同步工具类,但它们的功能和使用场景有所不同。理解它们之间的区别,有助于你更好地选择合适的工具类来解决并发问题。

3.1 功能差异

  • CyclicBarrier
    • 可重复使用CyclicBarrier是可以重复使用的,在所有线程释放后,可以再次使用。这使得它非常适合于需要多次同步的场景。
    • 同步多个线程CyclicBarrier用于同步多个线程,等待所有线程都到达屏障点后,再一起执行。
    • 屏障动作CyclicBarrier可以指定一个barrierAction,当所有线程到达屏障点时,会执行这个Runnable任务。
  • CountDownLatch
    • 不可重复使用CountDownLatch是不可重复使用的,一旦计数器变为0,就无法再次使用。如果需要重复使用,需要重新创建一个新的CountDownLatch
    • 等待一个或多个线程完成CountDownLatch用于等待一个或多个线程完成某个操作后,主线程或其他线程才能继续执行。
    • 计数器CountDownLatch使用一个计数器,当计数器变为0时,所有等待的线程被唤醒。

3.2 实现原理差异

  • CyclicBarrier:基于ReentrantLockCondition实现,await()方法使用Conditionawait()方法阻塞线程,当所有线程到达屏障点时,使用ConditionsignalAll()方法唤醒所有线程。
  • CountDownLatch:基于AQS实现,维护一个state变量,表示计数器的值。countDown()方法使用CAS操作递减state的值,await()方法使用AQSacquireShared()方法阻塞线程,当state变为0时,唤醒所有等待的线程。

3.3 使用场景差异

  • CyclicBarrier:适用于需要多个线程相互等待,直到所有线程都到达某个屏障点后,再一起执行的场景。例如,并发计算、游戏开发、数据同步等。
  • CountDownLatch:适用于一个或多个线程等待其他线程完成某个操作后,才能继续执行的场景。例如,主线程等待多个子线程完成初始化工作,或者等待多个线程下载文件等。

3.4 关键区别总结

特性 CyclicBarrier CountDownLatch
可重复使用性 可重复使用 不可重复使用
同步线程数量 同步多个线程 等待一个或多个线程完成
屏障动作 有 barrierAction,当所有线程到达屏障点时执行
实现原理 基于 ReentrantLock 和 Condition 基于 AQS
计数器 隐含的计数器,由到达屏障点的线程数量决定 显式的计数器,可以初始化
使用场景 并发计算、游戏开发、数据同步等 主线程等待子线程完成初始化、等待线程下载文件等

四、CyclicBarrier 的注意事项

在使用CyclicBarrier时,需要注意以下几点:

  1. 异常处理await()方法会抛出InterruptedExceptionBrokenBarrierExceptionInterruptedException表示线程在等待过程中被中断,BrokenBarrierException表示屏障被损坏。在使用时,需要进行适当的异常处理。
  2. 屏障损坏:当一个线程在await()方法中被中断或超时,或者barrierAction抛出异常时,屏障会被损坏。一旦屏障被损坏,所有调用await()方法的线程都会抛出BrokenBarrierException
  3. 线程数量CyclicBarrier的构造方法中指定的线程数量是固定的。如果实际参与同步的线程数量与parties不一致,会导致线程一直阻塞,无法继续执行。
  4. reset() 方法:谨慎使用reset()方法。reset()方法会重置屏障,但是所有正在等待的线程都会抛出BrokenBarrierException。如果需要重置屏障,建议在屏障损坏的情况下使用,或者在确保所有线程都安全的情况下使用。
  5. 与线程池结合使用:当与线程池结合使用时,需要特别注意。因为线程池中的线程可能被复用,如果在barrierAction中使用了线程局部变量,需要确保在每次屏障被打开时,线程局部变量被正确地初始化或清理。

五、总结

今天,我们深入探讨了CyclicBarrier的内部实现机制,以及它与CountDownLatch的区别。我们了解到,CyclicBarrier基于ReentrantLockCondition实现,通过await()方法实现线程的等待和同步。与CountDownLatch相比,CyclicBarrier更适合于需要多个线程相互等待,直到所有线程都到达某个屏障点后,再一起执行的场景。希望通过今天的分享,你对CyclicBarrier的理解能够更上一层楼,在实际的并发编程中,能够更加灵活地运用它。

如果你喜欢这篇文章,请点个赞,也欢迎在评论区留下你的想法和问题。我们下次再见!

点评评价

captcha
健康