HOOOS

Java并发编程进阶:Semaphore与ReentrantLock、CountDownLatch组合拳出击

0 61 码农老兵 Java并发编程多线程
Apple

Java并发编程进阶:Semaphore与ReentrantLock、CountDownLatch组合拳出击

大家好,我是你们的并发编程引路人,码农老兵。

在Java并发编程的世界里,我们经常会遇到各种各样的“拦路虎”,比如资源竞争、线程同步等等。为了解决这些问题,Java提供了丰富的并发工具类,其中SemaphoreReentrantLockCountDownLatch就是三位“猛将”。今天,咱们就来聊聊如何将这三位猛将组合起来,打出一套漂亮的“组合拳”,解决更复杂的并发问题。

1. 认识三位“猛将”

在“出招”之前,我们先来简单认识一下这三位“猛将”:

  • Semaphore(信号量):控制同时访问特定资源的线程数量。你可以把它想象成一个“许可证管理员”,它手里握着一定数量的“许可证”,只有拿到许可证的线程才能访问资源。如果许可证被发完了,其他线程就只能乖乖排队等待。
  • ReentrantLock(可重入锁):一种更灵活的锁机制,与synchronized关键字类似,但提供了更多的功能,比如可中断锁、公平锁、非公平锁等。你可以把它想象成一把“万能钥匙”,只有持有这把钥匙的线程才能进入“房间”(临界区)。
  • CountDownLatch(倒计时门闩):允许一个或多个线程等待其他线程完成操作。你可以把它想象成一个“发令枪”,只有当所有参赛选手都准备好后,裁判员才会扣动扳机,比赛才能开始。

2. 为什么要“组合出拳”?

单独使用这三位“猛将”中的任何一位,都能解决一些常见的并发问题。但是,在现实世界的“战场”上,情况往往更加复杂,单一的“招式”可能难以应对。这时,我们就需要将它们组合起来,发挥出更大的威力。

举个例子:假设我们有一个数据库连接池,最多允许10个线程同时访问数据库。同时,为了保证数据的一致性,我们还需要对某些关键操作加锁。此外,我们还希望在所有线程都完成数据库操作后,再执行一些后续处理。这时,单独使用SemaphoreReentrantLockCountDownLatch都无法完美解决问题,我们需要将它们“组合出拳”。

3. “组合拳”实战演练

下面,我们就通过几个具体的场景,来演示如何将SemaphoreReentrantLockCountDownLatch组合起来,解决复杂的并发问题。

3.1 场景一:数据库连接池限流与关键操作加锁

问题描述:

我们有一个数据库连接池,最多允许10个线程同时访问数据库。为了保证数据的一致性,我们需要对某些关键操作(比如更新操作)加锁。

解决方案:

我们可以使用Semaphore来限制同时访问数据库的线程数量,使用ReentrantLock来对关键操作加锁。

代码示例:

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class DatabaseConnectionPool {

    private static final int MAX_CONNECTIONS = 10;
    private final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS, true); // 使用公平信号量
    private final ReentrantLock lock = new ReentrantLock();
    private final Connection[] connections = new Connection[MAX_CONNECTIONS];

    public DatabaseConnectionPool() {
        // 初始化连接池
        for (int i = 0; i < MAX_CONNECTIONS; i++) {
            connections[i] = new Connection();
        }
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire(); // 获取许可证
        return getAvailableConnection();
    }

    public void releaseConnection(Connection connection) {
        if (markAsUnused(connection)) {
            semaphore.release(); // 释放许可证
        }
    }

     public void updateData(Connection connection,String data) {

        lock.lock();
        try {
            //模拟数据库更新
            System.out.println(Thread.currentThread().getName() + " 正在更新数据...");
            connection.update(data);
            Thread.sleep(1000); // 模拟耗时操作
            System.out.println(Thread.currentThread().getName() + " 更新数据完成");
        } catch(InterruptedException e){
             e.printStackTrace();
        }finally {
            lock.unlock(); // 释放锁
        }
    }


    private synchronized Connection getAvailableConnection() {
        for (int i = 0; i < MAX_CONNECTIONS; i++) {
            if (connections[i].isAvailable) {
                connections[i].isAvailable = false;
                return connections[i];
            }
        }
        return null; // 理论上不会执行到这里,因为Semaphore已经控制了数量
    }

    private synchronized boolean markAsUnused(Connection connection) {
        for (int i = 0; i < MAX_CONNECTIONS; i++) {
            if (connections[i] == connection) {
                connections[i].isAvailable = true;
                return true;
            }
        }
        return false;
    }

    private static class Connection {
         boolean isAvailable = true;

         public void update(String data){
            //真实数据库更新操作
         }
    }


    public static void main(String[] args) {
        DatabaseConnectionPool pool = new DatabaseConnectionPool();
        ExecutorService executor = Executors.newFixedThreadPool(20); // 创建20个线程

        for (int i = 0; i < 20; i++) {
            executor.execute(() -> {
                try {
                    Connection connection = pool.getConnection();
                    System.out.println(Thread.currentThread().getName() + " 获取到连接");
                    //模拟一些操作
                    Thread.sleep(500);
                    //更新数据
                    pool.updateData(connection,"new Data");

                    pool.releaseConnection(connection);
                    System.out.println(Thread.currentThread().getName() + " 释放连接");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

代码解读:

  1. Semaphore:初始化时设置许可证数量为MAX_CONNECTIONS(10),表示最多允许10个线程同时访问数据库。getConnection()方法中使用semaphore.acquire()获取许可证,releaseConnection()方法中使用semaphore.release()释放许可证。
  2. ReentrantLock:在updateData方法中使用,对更新数据的操作加锁,确保同一时刻只有一个线程能执行更新操作,保证原子性。
  3. Connection类中的isAvailable属性,用于标记连接是否可用。 getAvailableConnection()markAsUnused使用了synchronized保证原子性。
  4. main函数中,模拟20个线程去获取并操作数据库,可以观察到同时执行的线程数不会超过10个。

通过这个例子,我们可以看到,SemaphoreReentrantLock的组合使用,既实现了连接池的限流,又保证了关键操作的原子性。

3.2 场景二:多线程协作完成任务

问题描述:

假设我们有一个复杂的任务,需要多个线程协作完成。每个线程负责任务的一部分,我们需要等待所有线程都完成后,再进行下一步处理。

解决方案:

我们可以使用Semaphore来控制并发线程数量,使用CountDownLatch来等待所有线程完成任务。

代码示例:

import java.util.concurrent.*;

public class MultiThreadedTask {

    private static final int THREAD_COUNT = 5;
    private static final int MAX_CONCURRENT = 3; // 最大并发数

    public static void main(String[] args) throws InterruptedException {

        Semaphore semaphore = new Semaphore(MAX_CONCURRENT, true);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可证
                    System.out.println("Thread " + Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                    Thread.sleep(1000); // 模拟任务执行
                    System.out.println("Thread " + Thread.currentThread().getName() + " 完成任务 " + taskId);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放许可证
                    latch.countDown(); // 倒计时减一
                }
            }).start();
        }

        latch.await(); // 等待所有线程完成
        System.out.println("所有任务执行完毕,进行下一步处理");
    }
}

代码解读:

  1. Semaphore:初始化时设置许可证数量为MAX_CONCURRENT(3),表示最多允许3个线程同时执行任务。
  2. CountDownLatch:初始化时设置计数器为THREAD_COUNT(5),表示需要等待5个线程完成任务。每个线程完成任务后,调用latch.countDown()将计数器减一。latch.await()方法会阻塞当前线程,直到计数器变为0。

通过这个例子,我们可以看到,SemaphoreCountDownLatch的组合使用,既控制了并发线程数量,又实现了多线程协作。

3.3 场景三:资源竞争与优先级控制

问题描述:
假设我们有多个线程竞争同一资源,我们希望某些线程具有更高的优先级,可以优先获取资源。

解决方案:
我们可以将Semaphore的公平性设置为false,创建非公平信号量,并结合ReentrantLock和线程的优先级设置来实现。

代码示例:

import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class PriorityResourceAccess {

    private static final int MAX_RESOURCES = 2;
    private final Semaphore semaphore = new Semaphore(MAX_RESOURCES, false); // 非公平信号量
    private final ReentrantLock lock = new ReentrantLock();

    public void accessResource(String threadName) throws InterruptedException {

        semaphore.acquire();
        lock.lock();

        try {
            System.out.println(threadName + " 正在访问资源");
            Thread.sleep(1000); // 模拟资源访问
             System.out.println(threadName + " 完成资源访问");
        } finally {
            lock.unlock();
            semaphore.release();
        }
    }

    public static void main(String[] args) {
        PriorityResourceAccess resourceAccess = new PriorityResourceAccess();

        Thread highPriorityThread = new Thread(() -> {
            try {
                resourceAccess.accessResource(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "HighPriorityThread");
        highPriorityThread.setPriority(Thread.MAX_PRIORITY); // 设置高优先级


         Thread lowPriorityThread1 = new Thread(() -> {
            try {
                resourceAccess.accessResource(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "LowPriorityThread1");
         lowPriorityThread1.setPriority(Thread.MIN_PRIORITY); // 设置低优先级

         Thread lowPriorityThread2 = new Thread(() -> {
            try {
                resourceAccess.accessResource(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "LowPriorityThread2");
         lowPriorityThread2.setPriority(Thread.MIN_PRIORITY); // 设置低优先级


        lowPriorityThread1.start();
        lowPriorityThread2.start();
         try{
            Thread.sleep(100);
        }catch(InterruptedException e){}
        highPriorityThread.start();

    }
}

代码解读:

  1. Semaphore设置为非公平信号量,当有多个线程在等待许可时,不保证按照请求的顺序授予许可。
  2. ReentrantLock用于进一步控制对资源的同步访问。
  3. main方法中创建了不同优先级的线程,并让低优先级的线程先启动。
  4. 因为Semaphore设置为非公平,且线程有优先级,所以高优先级的线程有更大概率先获得许可。

请注意: 线程优先级并不能保证绝对的执行顺序,它只是给线程调度器一个提示。在不同的操作系统和JVM实现中,线程优先级的效果可能会有所不同。而且过多的依赖线程优先级可能会导致代码难以调试和维护。

4. 总结

通过以上几个场景的演练,相信你已经对SemaphoreReentrantLockCountDownLatch的“组合拳”有了一定的了解。在实际开发中,我们可以根据具体的需求,灵活地将它们组合起来,解决各种复杂的并发问题。

最后,提醒大家,并发编程是一门“易学难精”的学问,需要不断地学习和实践。希望今天的分享能对你有所帮助,如果你有任何问题或想法,欢迎在评论区留言交流。

记住,熟能生巧,多写多练,你也能成为并发编程高手!

点评评价

captcha
健康