HOOOS

Java 多线程进阶:CountDownLatch 在任务调度中的实战与技巧

0 75 老码农 Java多线程CountDownLatch并发编程
Apple

你好,我是老码农,今天咱们聊聊 Java 并发编程里的一个实用小工具——CountDownLatch。别看名字挺唬人,其实它就像一个倒计时器,用来协调多个线程的执行。如果你经常需要处理并发任务,特别是那些需要等待其他任务完成后才能继续执行的场景,那么 CountDownLatch 绝对是个好帮手。

为什么需要 CountDownLatch?

在多线程编程中,经常会遇到这样的情况:你需要启动多个线程去执行不同的任务,但是这些任务之间可能存在依赖关系。例如,你需要先从数据库读取数据,然后才能对数据进行处理;或者,你需要等待多个服务都启动完毕后,才能启动你的主应用程序。这时候,CountDownLatch 就派上用场了。

它主要解决的问题是:如何让一个或多个线程等待其他线程完成操作后再继续执行。 简单来说,就是实现一种“等待所有线程都准备好,或者等待所有线程都执行完毕”的同步机制。

CountDownLatch 的基本概念

CountDownLatch 位于 java.util.concurrent 包下,它是一个同步辅助类。它通过一个计数器来实现等待和通知机制。

  • 计数器 (Count): CountDownLatch 内部维护一个计数器,计数器的值在创建 CountDownLatch 对象时被初始化。
  • countDown() 方法: 每当一个线程完成一个任务后,可以调用 countDown() 方法使计数器的值减 1。
  • await() 方法: 调用 await() 方法的线程会一直阻塞,直到计数器的值为 0。

你可以把 CountDownLatch 想象成一个闸门。计数器的初始值就是闸门的数量。当计数器不为 0 时,闸门关闭,所有调用 await() 方法的线程都会被阻塞;当计数器变为 0 时,闸门打开,所有阻塞的线程都会被唤醒。

CountDownLatch 的使用场景

CountDownLatch 的应用场景非常广泛,下面列举几个常见的场景:

  1. 并发任务的协调: 例如,你需要启动多个线程处理一批数据,每个线程处理一部分。你需要确保所有线程都处理完毕后,才能进行汇总或者下一步操作。
  2. 等待多个服务启动: 在系统启动时,你可能需要等待多个服务都启动完毕后,才能启动主应用程序。可以使用 CountDownLatch 来协调各个服务的启动顺序。
  3. 测试并发性能: 在进行并发性能测试时,可以使用 CountDownLatch 来确保所有测试线程都准备好后,才开始测试。这样可以避免测试结果受到启动时间的影响。

CountDownLatch 的代码示例

为了更好地理解 CountDownLatch 的用法,我们来看几个代码示例。

示例 1:等待多个线程完成任务

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        int workerCount = 3; // 模拟三个工作线程
        CountDownLatch latch = new CountDownLatch(workerCount);
        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i + 1;
            executor.submit(() -> {
                try {
                    System.out.println("Worker " + workerId + " started.");
                    Thread.sleep((long) (Math.random() * 2000)); // 模拟任务执行时间
                    System.out.println("Worker " + workerId + " finished.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 任务完成后,计数器减 1
                }
            });
        }

        System.out.println("Main thread waiting for workers to finish...");
        latch.await(); // 主线程阻塞,等待计数器变为 0
        System.out.println("All workers finished. Main thread continuing...");

        executor.shutdown(); // 关闭线程池
    }
}

在这个例子中,我们创建了三个工作线程。每个线程模拟执行一个任务,并在任务完成后调用 latch.countDown() 方法。主线程调用 latch.await() 方法,阻塞等待所有工作线程完成。当所有工作线程都完成后,计数器变为 0,主线程被唤醒。

示例 2:模拟并发场景,统计执行时间

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentTestExample {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 10; // 模拟 10 个并发线程
        int taskCount = 1000; // 每个线程执行 1000 个任务
        CountDownLatch startLatch = new CountDownLatch(1); // 启动门闩,用于统一启动线程
        CountDownLatch endLatch = new CountDownLatch(threadCount); // 结束门闩,用于等待线程结束
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        AtomicLong totalTime = new AtomicLong(0); // 统计总耗时

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                try {
                    startLatch.await(); // 等待启动信号
                    long startTime = System.currentTimeMillis();
                    for (int j = 0; j < taskCount; j++) {
                        // 模拟任务执行
                        Thread.sleep(1); // 模拟任务耗时
                    }
                    long endTime = System.currentTimeMillis();
                    long elapsedTime = endTime - startTime;
                    totalTime.addAndGet(elapsedTime);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown(); // 线程完成,计数器减 1
                }
            });
        }

        long startTime = System.currentTimeMillis();
        startLatch.countDown(); // 释放启动信号,所有线程开始执行
        endLatch.await(); // 等待所有线程结束
        long endTime = System.currentTimeMillis();
        long totalElapsedTime = endTime - startTime;

        System.out.println("Total execution time: " + totalElapsedTime + " ms");
        System.out.println("Average execution time per thread: " + (double) totalTime.get() / threadCount + " ms");

        executor.shutdown();
    }
}

在这个例子中,我们模拟了一个并发场景。我们创建了 10 个线程,每个线程执行 1000 个任务。我们使用了两个 CountDownLatch

  • startLatch:用于统一启动所有线程。主线程调用 startLatch.countDown() 方法,释放启动信号,所有线程开始执行。
  • endLatch:用于等待所有线程结束。主线程调用 endLatch.await() 方法,阻塞等待所有线程完成。

通过这种方式,我们可以精确地统计并发任务的执行时间。

CountDownLatch 的工作原理

CountDownLatch 的实现原理非常简单,它内部维护了一个计数器和一个 LockSupport 阻塞队列。

  • countDown() 方法: 这个方法会原子性地将计数器的值减 1。如果计数器的值变为 0,则会唤醒所有在 await() 方法上阻塞的线程。
  • await() 方法: 这个方法会检查计数器的值。如果计数器的值大于 0,则调用 LockSupport.park() 方法将当前线程阻塞;如果计数器的值为 0,则直接返回。

LockSupport 是一个非常底层的线程阻塞和唤醒工具,它提供了 park()unpark() 两个方法。

  • park() 方法:阻塞当前线程。
  • unpark(Thread thread) 方法:唤醒指定的线程。

CountDownLatch 使用 LockSupport 来实现线程的阻塞和唤醒,从而实现等待和通知机制。

CountDownLatch 的注意事项

在使用 CountDownLatch 时,需要注意以下几点:

  1. 计数器只能被设置一次: CountDownLatch 的计数器在创建时被初始化,之后不能被修改。这意味着你不能动态地改变需要等待的线程数量。
  2. await() 方法的阻塞是不可中断的: 调用 await() 方法的线程会被一直阻塞,直到计数器的值为 0。await() 方法不支持中断操作,即使调用了线程的 interrupt() 方法,await() 方法也不会抛出 InterruptedException。不过,await(long timeout, TimeUnit unit) 方法支持超时设置,可以在指定的时间内等待。
  3. countDown() 方法的调用顺序不影响结果: countDown() 方法的调用顺序不影响最终的结果。只要所有线程都调用了 countDown() 方法,计数器就会变为 0,所有阻塞的线程都会被唤醒。
  4. 避免死锁: 在使用 CountDownLatch 时,需要小心避免死锁。例如,如果一个线程调用了 await() 方法,并且在等待过程中需要获取某个锁,而持有该锁的线程正在等待 CountDownLatch 的计数器变为 0,就会发生死锁。

CountDownLatch 与其他并发工具的比较

CountDownLatch 只是 Java 并发编程中的一个工具,还有其他一些类似的工具,例如 CyclicBarrierSemaphore。它们之间有什么区别呢?

  • CountDownLatch: 用于一个或多个线程等待其他线程完成操作后再继续执行。计数器只能递减到 0,不能重置。
  • CyclicBarrier: 用于一组线程互相等待,直到所有线程都到达某个屏障点后,再一起继续执行。计数器可以重置,可以循环使用。
  • Semaphore: 用于控制同时访问某个资源的线程数量。它维护一个许可数量,线程获取许可才能访问资源,释放许可后,其他线程才能获取许可。
特性 CountDownLatch CyclicBarrier Semaphore
场景 等待其他线程完成操作 一组线程互相等待,达到屏障点后一起执行 控制同时访问资源的线程数量
计数器 只能递减到 0,不可重置 可重置,可循环使用 许可数量,可增加和减少
线程阻塞 await() 方法 await() 方法 acquire() 方法
线程唤醒 countDown() 方法 达到屏障点后,所有线程一起唤醒 release() 方法
可重用性 不可重用 可重用 可重用
应用 任务协调、启动等待、并发测试 并发计算、多阶段任务同步 资源控制、限流

选择哪个工具取决于你的具体需求。如果只是需要等待其他线程完成操作,那么 CountDownLatch 是一个不错的选择。如果需要一组线程互相等待,那么 CyclicBarrier 更适合。如果需要控制同时访问资源的线程数量,那么 Semaphore 更合适。

CountDownLatch 的进阶用法和优化

1. 超时等待

CountDownLatchawait() 方法可以设置超时时间,避免无限期地等待。这在某些场景下非常有用,例如,如果某个线程执行时间过长,或者出现异常导致无法完成任务,那么主线程可以设置超时时间,并在超时后采取相应的处理措施。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTimeout {

    public static void main(String[] args) throws InterruptedException {
        int workerCount = 3;
        CountDownLatch latch = new CountDownLatch(workerCount);
        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i + 1;
            executor.submit(() -> {
                try {
                    System.out.println("Worker " + workerId + " started.");
                    Thread.sleep((long) (Math.random() * 5000)); // 模拟任务执行时间,可能超过超时时间
                    System.out.println("Worker " + workerId + " finished.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }

        System.out.println("Main thread waiting for workers to finish...");
        boolean success = latch.await(3, TimeUnit.SECONDS); // 设置超时时间为 3 秒

        if (success) {
            System.out.println("All workers finished within the timeout.");
        } else {
            System.out.println("Some workers timed out. Main thread continuing...");
        }

        executor.shutdown();
    }
}

在这个例子中,我们设置了 3 秒的超时时间。如果所有工作线程在 3 秒内都完成了任务,那么 await() 方法返回 true;否则,返回 false。主线程可以根据返回值来判断是否发生了超时。

2. 使用线程池优化

在实际开发中,我们通常会使用线程池来管理线程。使用线程池可以有效地复用线程,减少线程的创建和销毁开销。结合 CountDownLatch,我们可以更灵活地控制并发任务的执行。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchWithThreadPool {

    public static void main(String[] args) throws InterruptedException {
        int workerCount = 5;
        CountDownLatch latch = new CountDownLatch(workerCount);
        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i + 1;
            executor.submit(() -> {
                try {
                    System.out.println("Worker " + workerId + " started (Thread: " + Thread.currentThread().getName() + ").");
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println("Worker " + workerId + " finished (Thread: " + Thread.currentThread().getName() + ").");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }

        System.out.println("Main thread waiting for workers to finish...");
        latch.await();
        System.out.println("All workers finished. Main thread continuing...");

        executor.shutdown(); // 关闭线程池
    }
}

在这个例子中,我们使用了 Executors.newFixedThreadPool(workerCount) 创建了一个固定大小的线程池。工作线程从线程池中获取线程来执行任务。这提高了程序的效率,并更好地控制了线程的数量。

3. 异常处理

在多线程编程中,异常处理非常重要。我们需要确保即使某个线程发生了异常,也不会影响其他线程的执行,也不会导致程序崩溃。 在使用 CountDownLatch 时,我们需要在 try-catch 块中调用 countDown() 方法,以确保即使发生了异常,计数器也能正确地减 1。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExceptionHandling {

    public static void main(String[] args) throws InterruptedException {
        int workerCount = 3;
        CountDownLatch latch = new CountDownLatch(workerCount);
        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i + 1;
            executor.submit(() -> {
                try {
                    System.out.println("Worker " + workerId + " started.");
                    if (workerId == 2) {
                        throw new RuntimeException("Worker " + workerId + " failed!"); // 模拟异常
                    }
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("Worker " + workerId + " finished.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (RuntimeException e) {
                    System.err.println(e.getMessage());
                } finally {
                    latch.countDown(); // 确保计数器减 1
                }
            });
        }

        System.out.println("Main thread waiting for workers to finish...");
        latch.await();
        System.out.println("All workers finished. Main thread continuing...");

        executor.shutdown();
    }
}

在这个例子中,我们模拟了其中一个工作线程抛出异常。即使发生了异常,finally 块中的 latch.countDown() 方法也会被执行,确保计数器正确地减 1,主线程能够正常地等待所有线程完成。

4. 避免过早启动

在某些情况下,我们需要确保所有线程都准备好后,才开始执行任务。可以使用 CountDownLatch 来实现这种同步机制。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchStartSignal {

    public static void main(String[] args) throws InterruptedException {
        int workerCount = 3;
        CountDownLatch startSignal = new CountDownLatch(1); // 启动信号
        CountDownLatch doneSignal = new CountDownLatch(workerCount);
        ExecutorService executor = Executors.newFixedThreadPool(workerCount);

        for (int i = 0; i < workerCount; i++) {
            final int workerId = i + 1;
            executor.submit(() -> {
                try {
                    System.out.println("Worker " + workerId + " is ready.");
                    startSignal.await(); // 等待启动信号
                    System.out.println("Worker " + workerId + " started.");
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("Worker " + workerId + " finished.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    doneSignal.countDown();
                }
            });
        }

        System.out.println("Main thread preparing workers...");
        Thread.sleep(2000); // 模拟准备时间
        System.out.println("Main thread starting workers...");
        startSignal.countDown(); // 发送启动信号
        doneSignal.await(); // 等待所有线程完成
        System.out.println("All workers finished. Main thread continuing...");

        executor.shutdown();
    }
}

在这个例子中,我们使用了两个 CountDownLatchstartSignal 用于控制线程的启动,doneSignal 用于等待线程的完成。主线程先准备好工作线程,然后发送启动信号,所有工作线程才开始执行任务。

总结

CountDownLatch 是 Java 并发编程中一个非常实用的工具,它可以帮助我们协调多个线程的执行,实现等待和通知机制。通过本文的介绍,相信你对 CountDownLatch 的基本概念、使用场景、工作原理和注意事项有了更深入的理解。 记住,在实际开发中,要结合线程池、异常处理、超时等待等技巧,才能更好地发挥 CountDownLatch 的作用。希望这些知识对你有所帮助!

如果你在学习和使用 CountDownLatch 的过程中遇到任何问题,或者有任何疑问,欢迎随时提问,我们一起探讨和学习!

祝你编程愉快!

点评评价

captcha
健康