你好,我是老码农,今天咱们聊聊 Java 并发编程里的一个实用小工具——CountDownLatch
。别看名字挺唬人,其实它就像一个倒计时器,用来协调多个线程的执行。如果你经常需要处理并发任务,特别是那些需要等待其他任务完成后才能继续执行的场景,那么 CountDownLatch
绝对是个好帮手。
为什么需要 CountDownLatch?
在多线程编程中,经常会遇到这样的情况:你需要启动多个线程去执行不同的任务,但是这些任务之间可能存在依赖关系。例如,你需要先从数据库读取数据,然后才能对数据进行处理;或者,你需要等待多个服务都启动完毕后,才能启动你的主应用程序。这时候,CountDownLatch
就派上用场了。
它主要解决的问题是:如何让一个或多个线程等待其他线程完成操作后再继续执行。 简单来说,就是实现一种“等待所有线程都准备好,或者等待所有线程都执行完毕”的同步机制。
CountDownLatch 的基本概念
CountDownLatch
位于 java.util.concurrent
包下,它是一个同步辅助类。它通过一个计数器来实现等待和通知机制。
- 计数器 (Count):
CountDownLatch
内部维护一个计数器,计数器的值在创建CountDownLatch
对象时被初始化。 countDown()
方法: 每当一个线程完成一个任务后,可以调用countDown()
方法使计数器的值减 1。await()
方法: 调用await()
方法的线程会一直阻塞,直到计数器的值为 0。
你可以把 CountDownLatch
想象成一个闸门。计数器的初始值就是闸门的数量。当计数器不为 0 时,闸门关闭,所有调用 await()
方法的线程都会被阻塞;当计数器变为 0 时,闸门打开,所有阻塞的线程都会被唤醒。
CountDownLatch 的使用场景
CountDownLatch
的应用场景非常广泛,下面列举几个常见的场景:
- 并发任务的协调: 例如,你需要启动多个线程处理一批数据,每个线程处理一部分。你需要确保所有线程都处理完毕后,才能进行汇总或者下一步操作。
- 等待多个服务启动: 在系统启动时,你可能需要等待多个服务都启动完毕后,才能启动主应用程序。可以使用
CountDownLatch
来协调各个服务的启动顺序。 - 测试并发性能: 在进行并发性能测试时,可以使用
CountDownLatch
来确保所有测试线程都准备好后,才开始测试。这样可以避免测试结果受到启动时间的影响。
CountDownLatch 的代码示例
为了更好地理解 CountDownLatch
的用法,我们来看几个代码示例。
示例 1:等待多个线程完成任务
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3; // 模拟三个工作线程
CountDownLatch latch = new CountDownLatch(workerCount);
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i + 1;
executor.submit(() -> {
try {
System.out.println("Worker " + workerId + " started.");
Thread.sleep((long) (Math.random() * 2000)); // 模拟任务执行时间
System.out.println("Worker " + workerId + " finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 任务完成后,计数器减 1
}
});
}
System.out.println("Main thread waiting for workers to finish...");
latch.await(); // 主线程阻塞,等待计数器变为 0
System.out.println("All workers finished. Main thread continuing...");
executor.shutdown(); // 关闭线程池
}
}
在这个例子中,我们创建了三个工作线程。每个线程模拟执行一个任务,并在任务完成后调用 latch.countDown()
方法。主线程调用 latch.await()
方法,阻塞等待所有工作线程完成。当所有工作线程都完成后,计数器变为 0,主线程被唤醒。
示例 2:模拟并发场景,统计执行时间
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class ConcurrentTestExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 10; // 模拟 10 个并发线程
int taskCount = 1000; // 每个线程执行 1000 个任务
CountDownLatch startLatch = new CountDownLatch(1); // 启动门闩,用于统一启动线程
CountDownLatch endLatch = new CountDownLatch(threadCount); // 结束门闩,用于等待线程结束
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
AtomicLong totalTime = new AtomicLong(0); // 统计总耗时
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
startLatch.await(); // 等待启动信号
long startTime = System.currentTimeMillis();
for (int j = 0; j < taskCount; j++) {
// 模拟任务执行
Thread.sleep(1); // 模拟任务耗时
}
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
totalTime.addAndGet(elapsedTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown(); // 线程完成,计数器减 1
}
});
}
long startTime = System.currentTimeMillis();
startLatch.countDown(); // 释放启动信号,所有线程开始执行
endLatch.await(); // 等待所有线程结束
long endTime = System.currentTimeMillis();
long totalElapsedTime = endTime - startTime;
System.out.println("Total execution time: " + totalElapsedTime + " ms");
System.out.println("Average execution time per thread: " + (double) totalTime.get() / threadCount + " ms");
executor.shutdown();
}
}
在这个例子中,我们模拟了一个并发场景。我们创建了 10 个线程,每个线程执行 1000 个任务。我们使用了两个 CountDownLatch
:
startLatch
:用于统一启动所有线程。主线程调用startLatch.countDown()
方法,释放启动信号,所有线程开始执行。endLatch
:用于等待所有线程结束。主线程调用endLatch.await()
方法,阻塞等待所有线程完成。
通过这种方式,我们可以精确地统计并发任务的执行时间。
CountDownLatch 的工作原理
CountDownLatch
的实现原理非常简单,它内部维护了一个计数器和一个 LockSupport
阻塞队列。
countDown()
方法: 这个方法会原子性地将计数器的值减 1。如果计数器的值变为 0,则会唤醒所有在await()
方法上阻塞的线程。await()
方法: 这个方法会检查计数器的值。如果计数器的值大于 0,则调用LockSupport.park()
方法将当前线程阻塞;如果计数器的值为 0,则直接返回。
LockSupport
是一个非常底层的线程阻塞和唤醒工具,它提供了 park()
和 unpark()
两个方法。
park()
方法:阻塞当前线程。unpark(Thread thread)
方法:唤醒指定的线程。
CountDownLatch
使用 LockSupport
来实现线程的阻塞和唤醒,从而实现等待和通知机制。
CountDownLatch 的注意事项
在使用 CountDownLatch
时,需要注意以下几点:
- 计数器只能被设置一次:
CountDownLatch
的计数器在创建时被初始化,之后不能被修改。这意味着你不能动态地改变需要等待的线程数量。 await()
方法的阻塞是不可中断的: 调用await()
方法的线程会被一直阻塞,直到计数器的值为 0。await()
方法不支持中断操作,即使调用了线程的interrupt()
方法,await()
方法也不会抛出InterruptedException
。不过,await(long timeout, TimeUnit unit)
方法支持超时设置,可以在指定的时间内等待。countDown()
方法的调用顺序不影响结果:countDown()
方法的调用顺序不影响最终的结果。只要所有线程都调用了countDown()
方法,计数器就会变为 0,所有阻塞的线程都会被唤醒。- 避免死锁: 在使用
CountDownLatch
时,需要小心避免死锁。例如,如果一个线程调用了await()
方法,并且在等待过程中需要获取某个锁,而持有该锁的线程正在等待CountDownLatch
的计数器变为 0,就会发生死锁。
CountDownLatch 与其他并发工具的比较
CountDownLatch
只是 Java 并发编程中的一个工具,还有其他一些类似的工具,例如 CyclicBarrier
和 Semaphore
。它们之间有什么区别呢?
CountDownLatch
: 用于一个或多个线程等待其他线程完成操作后再继续执行。计数器只能递减到 0,不能重置。CyclicBarrier
: 用于一组线程互相等待,直到所有线程都到达某个屏障点后,再一起继续执行。计数器可以重置,可以循环使用。Semaphore
: 用于控制同时访问某个资源的线程数量。它维护一个许可数量,线程获取许可才能访问资源,释放许可后,其他线程才能获取许可。
特性 | CountDownLatch | CyclicBarrier | Semaphore |
---|---|---|---|
场景 | 等待其他线程完成操作 | 一组线程互相等待,达到屏障点后一起执行 | 控制同时访问资源的线程数量 |
计数器 | 只能递减到 0,不可重置 | 可重置,可循环使用 | 许可数量,可增加和减少 |
线程阻塞 | await() 方法 |
await() 方法 |
acquire() 方法 |
线程唤醒 | countDown() 方法 |
达到屏障点后,所有线程一起唤醒 | release() 方法 |
可重用性 | 不可重用 | 可重用 | 可重用 |
应用 | 任务协调、启动等待、并发测试 | 并发计算、多阶段任务同步 | 资源控制、限流 |
选择哪个工具取决于你的具体需求。如果只是需要等待其他线程完成操作,那么 CountDownLatch
是一个不错的选择。如果需要一组线程互相等待,那么 CyclicBarrier
更适合。如果需要控制同时访问资源的线程数量,那么 Semaphore
更合适。
CountDownLatch 的进阶用法和优化
1. 超时等待
CountDownLatch
的 await()
方法可以设置超时时间,避免无限期地等待。这在某些场景下非常有用,例如,如果某个线程执行时间过长,或者出现异常导致无法完成任务,那么主线程可以设置超时时间,并在超时后采取相应的处理措施。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CountDownLatchTimeout {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3;
CountDownLatch latch = new CountDownLatch(workerCount);
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i + 1;
executor.submit(() -> {
try {
System.out.println("Worker " + workerId + " started.");
Thread.sleep((long) (Math.random() * 5000)); // 模拟任务执行时间,可能超过超时时间
System.out.println("Worker " + workerId + " finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
System.out.println("Main thread waiting for workers to finish...");
boolean success = latch.await(3, TimeUnit.SECONDS); // 设置超时时间为 3 秒
if (success) {
System.out.println("All workers finished within the timeout.");
} else {
System.out.println("Some workers timed out. Main thread continuing...");
}
executor.shutdown();
}
}
在这个例子中,我们设置了 3 秒的超时时间。如果所有工作线程在 3 秒内都完成了任务,那么 await()
方法返回 true
;否则,返回 false
。主线程可以根据返回值来判断是否发生了超时。
2. 使用线程池优化
在实际开发中,我们通常会使用线程池来管理线程。使用线程池可以有效地复用线程,减少线程的创建和销毁开销。结合 CountDownLatch
,我们可以更灵活地控制并发任务的执行。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchWithThreadPool {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i + 1;
executor.submit(() -> {
try {
System.out.println("Worker " + workerId + " started (Thread: " + Thread.currentThread().getName() + ").");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("Worker " + workerId + " finished (Thread: " + Thread.currentThread().getName() + ").");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
System.out.println("Main thread waiting for workers to finish...");
latch.await();
System.out.println("All workers finished. Main thread continuing...");
executor.shutdown(); // 关闭线程池
}
}
在这个例子中,我们使用了 Executors.newFixedThreadPool(workerCount)
创建了一个固定大小的线程池。工作线程从线程池中获取线程来执行任务。这提高了程序的效率,并更好地控制了线程的数量。
3. 异常处理
在多线程编程中,异常处理非常重要。我们需要确保即使某个线程发生了异常,也不会影响其他线程的执行,也不会导致程序崩溃。 在使用 CountDownLatch
时,我们需要在 try-catch
块中调用 countDown()
方法,以确保即使发生了异常,计数器也能正确地减 1。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExceptionHandling {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3;
CountDownLatch latch = new CountDownLatch(workerCount);
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i + 1;
executor.submit(() -> {
try {
System.out.println("Worker " + workerId + " started.");
if (workerId == 2) {
throw new RuntimeException("Worker " + workerId + " failed!"); // 模拟异常
}
Thread.sleep((long) (Math.random() * 2000));
System.out.println("Worker " + workerId + " finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (RuntimeException e) {
System.err.println(e.getMessage());
} finally {
latch.countDown(); // 确保计数器减 1
}
});
}
System.out.println("Main thread waiting for workers to finish...");
latch.await();
System.out.println("All workers finished. Main thread continuing...");
executor.shutdown();
}
}
在这个例子中,我们模拟了其中一个工作线程抛出异常。即使发生了异常,finally
块中的 latch.countDown()
方法也会被执行,确保计数器正确地减 1,主线程能够正常地等待所有线程完成。
4. 避免过早启动
在某些情况下,我们需要确保所有线程都准备好后,才开始执行任务。可以使用 CountDownLatch
来实现这种同步机制。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchStartSignal {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3;
CountDownLatch startSignal = new CountDownLatch(1); // 启动信号
CountDownLatch doneSignal = new CountDownLatch(workerCount);
ExecutorService executor = Executors.newFixedThreadPool(workerCount);
for (int i = 0; i < workerCount; i++) {
final int workerId = i + 1;
executor.submit(() -> {
try {
System.out.println("Worker " + workerId + " is ready.");
startSignal.await(); // 等待启动信号
System.out.println("Worker " + workerId + " started.");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("Worker " + workerId + " finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneSignal.countDown();
}
});
}
System.out.println("Main thread preparing workers...");
Thread.sleep(2000); // 模拟准备时间
System.out.println("Main thread starting workers...");
startSignal.countDown(); // 发送启动信号
doneSignal.await(); // 等待所有线程完成
System.out.println("All workers finished. Main thread continuing...");
executor.shutdown();
}
}
在这个例子中,我们使用了两个 CountDownLatch
:startSignal
用于控制线程的启动,doneSignal
用于等待线程的完成。主线程先准备好工作线程,然后发送启动信号,所有工作线程才开始执行任务。
总结
CountDownLatch
是 Java 并发编程中一个非常实用的工具,它可以帮助我们协调多个线程的执行,实现等待和通知机制。通过本文的介绍,相信你对 CountDownLatch
的基本概念、使用场景、工作原理和注意事项有了更深入的理解。 记住,在实际开发中,要结合线程池、异常处理、超时等待等技巧,才能更好地发挥 CountDownLatch
的作用。希望这些知识对你有所帮助!
如果你在学习和使用 CountDownLatch
的过程中遇到任何问题,或者有任何疑问,欢迎随时提问,我们一起探讨和学习!
祝你编程愉快!