Java并发编程进阶:Semaphore与ReentrantLock、CountDownLatch组合拳出击
大家好,我是你们的并发编程引路人,码农老兵。
在Java并发编程的世界里,我们经常会遇到各种各样的“拦路虎”,比如资源竞争、线程同步等等。为了解决这些问题,Java提供了丰富的并发工具类,其中Semaphore
、ReentrantLock
和CountDownLatch
就是三位“猛将”。今天,咱们就来聊聊如何将这三位猛将组合起来,打出一套漂亮的“组合拳”,解决更复杂的并发问题。
1. 认识三位“猛将”
在“出招”之前,我们先来简单认识一下这三位“猛将”:
- Semaphore(信号量):控制同时访问特定资源的线程数量。你可以把它想象成一个“许可证管理员”,它手里握着一定数量的“许可证”,只有拿到许可证的线程才能访问资源。如果许可证被发完了,其他线程就只能乖乖排队等待。
- ReentrantLock(可重入锁):一种更灵活的锁机制,与
synchronized
关键字类似,但提供了更多的功能,比如可中断锁、公平锁、非公平锁等。你可以把它想象成一把“万能钥匙”,只有持有这把钥匙的线程才能进入“房间”(临界区)。 - CountDownLatch(倒计时门闩):允许一个或多个线程等待其他线程完成操作。你可以把它想象成一个“发令枪”,只有当所有参赛选手都准备好后,裁判员才会扣动扳机,比赛才能开始。
2. 为什么要“组合出拳”?
单独使用这三位“猛将”中的任何一位,都能解决一些常见的并发问题。但是,在现实世界的“战场”上,情况往往更加复杂,单一的“招式”可能难以应对。这时,我们就需要将它们组合起来,发挥出更大的威力。
举个例子:假设我们有一个数据库连接池,最多允许10个线程同时访问数据库。同时,为了保证数据的一致性,我们还需要对某些关键操作加锁。此外,我们还希望在所有线程都完成数据库操作后,再执行一些后续处理。这时,单独使用Semaphore
、ReentrantLock
或CountDownLatch
都无法完美解决问题,我们需要将它们“组合出拳”。
3. “组合拳”实战演练
下面,我们就通过几个具体的场景,来演示如何将Semaphore
、ReentrantLock
和CountDownLatch
组合起来,解决复杂的并发问题。
3.1 场景一:数据库连接池限流与关键操作加锁
问题描述:
我们有一个数据库连接池,最多允许10个线程同时访问数据库。为了保证数据的一致性,我们需要对某些关键操作(比如更新操作)加锁。
解决方案:
我们可以使用Semaphore
来限制同时访问数据库的线程数量,使用ReentrantLock
来对关键操作加锁。
代码示例:
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class DatabaseConnectionPool {
private static final int MAX_CONNECTIONS = 10;
private final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS, true); // 使用公平信号量
private final ReentrantLock lock = new ReentrantLock();
private final Connection[] connections = new Connection[MAX_CONNECTIONS];
public DatabaseConnectionPool() {
// 初始化连接池
for (int i = 0; i < MAX_CONNECTIONS; i++) {
connections[i] = new Connection();
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可证
return getAvailableConnection();
}
public void releaseConnection(Connection connection) {
if (markAsUnused(connection)) {
semaphore.release(); // 释放许可证
}
}
public void updateData(Connection connection,String data) {
lock.lock();
try {
//模拟数据库更新
System.out.println(Thread.currentThread().getName() + " 正在更新数据...");
connection.update(data);
Thread.sleep(1000); // 模拟耗时操作
System.out.println(Thread.currentThread().getName() + " 更新数据完成");
} catch(InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock(); // 释放锁
}
}
private synchronized Connection getAvailableConnection() {
for (int i = 0; i < MAX_CONNECTIONS; i++) {
if (connections[i].isAvailable) {
connections[i].isAvailable = false;
return connections[i];
}
}
return null; // 理论上不会执行到这里,因为Semaphore已经控制了数量
}
private synchronized boolean markAsUnused(Connection connection) {
for (int i = 0; i < MAX_CONNECTIONS; i++) {
if (connections[i] == connection) {
connections[i].isAvailable = true;
return true;
}
}
return false;
}
private static class Connection {
boolean isAvailable = true;
public void update(String data){
//真实数据库更新操作
}
}
public static void main(String[] args) {
DatabaseConnectionPool pool = new DatabaseConnectionPool();
ExecutorService executor = Executors.newFixedThreadPool(20); // 创建20个线程
for (int i = 0; i < 20; i++) {
executor.execute(() -> {
try {
Connection connection = pool.getConnection();
System.out.println(Thread.currentThread().getName() + " 获取到连接");
//模拟一些操作
Thread.sleep(500);
//更新数据
pool.updateData(connection,"new Data");
pool.releaseConnection(connection);
System.out.println(Thread.currentThread().getName() + " 释放连接");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
代码解读:
Semaphore
:初始化时设置许可证数量为MAX_CONNECTIONS
(10),表示最多允许10个线程同时访问数据库。getConnection()
方法中使用semaphore.acquire()
获取许可证,releaseConnection()
方法中使用semaphore.release()
释放许可证。ReentrantLock
:在updateData
方法中使用,对更新数据的操作加锁,确保同一时刻只有一个线程能执行更新操作,保证原子性。Connection
类中的isAvailable
属性,用于标记连接是否可用。getAvailableConnection()
和markAsUnused
使用了synchronized
保证原子性。- 在
main
函数中,模拟20个线程去获取并操作数据库,可以观察到同时执行的线程数不会超过10个。
通过这个例子,我们可以看到,Semaphore
和ReentrantLock
的组合使用,既实现了连接池的限流,又保证了关键操作的原子性。
3.2 场景二:多线程协作完成任务
问题描述:
假设我们有一个复杂的任务,需要多个线程协作完成。每个线程负责任务的一部分,我们需要等待所有线程都完成后,再进行下一步处理。
解决方案:
我们可以使用Semaphore
来控制并发线程数量,使用CountDownLatch
来等待所有线程完成任务。
代码示例:
import java.util.concurrent.*;
public class MultiThreadedTask {
private static final int THREAD_COUNT = 5;
private static final int MAX_CONCURRENT = 3; // 最大并发数
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(MAX_CONCURRENT, true);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可证
System.out.println("Thread " + Thread.currentThread().getName() + " 正在执行任务 " + taskId);
Thread.sleep(1000); // 模拟任务执行
System.out.println("Thread " + Thread.currentThread().getName() + " 完成任务 " + taskId);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可证
latch.countDown(); // 倒计时减一
}
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("所有任务执行完毕,进行下一步处理");
}
}
代码解读:
Semaphore
:初始化时设置许可证数量为MAX_CONCURRENT
(3),表示最多允许3个线程同时执行任务。CountDownLatch
:初始化时设置计数器为THREAD_COUNT
(5),表示需要等待5个线程完成任务。每个线程完成任务后,调用latch.countDown()
将计数器减一。latch.await()
方法会阻塞当前线程,直到计数器变为0。
通过这个例子,我们可以看到,Semaphore
和CountDownLatch
的组合使用,既控制了并发线程数量,又实现了多线程协作。
3.3 场景三:资源竞争与优先级控制
问题描述:
假设我们有多个线程竞争同一资源,我们希望某些线程具有更高的优先级,可以优先获取资源。
解决方案:
我们可以将Semaphore的公平性设置为false,创建非公平信号量,并结合ReentrantLock
和线程的优先级设置来实现。
代码示例:
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class PriorityResourceAccess {
private static final int MAX_RESOURCES = 2;
private final Semaphore semaphore = new Semaphore(MAX_RESOURCES, false); // 非公平信号量
private final ReentrantLock lock = new ReentrantLock();
public void accessResource(String threadName) throws InterruptedException {
semaphore.acquire();
lock.lock();
try {
System.out.println(threadName + " 正在访问资源");
Thread.sleep(1000); // 模拟资源访问
System.out.println(threadName + " 完成资源访问");
} finally {
lock.unlock();
semaphore.release();
}
}
public static void main(String[] args) {
PriorityResourceAccess resourceAccess = new PriorityResourceAccess();
Thread highPriorityThread = new Thread(() -> {
try {
resourceAccess.accessResource(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "HighPriorityThread");
highPriorityThread.setPriority(Thread.MAX_PRIORITY); // 设置高优先级
Thread lowPriorityThread1 = new Thread(() -> {
try {
resourceAccess.accessResource(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "LowPriorityThread1");
lowPriorityThread1.setPriority(Thread.MIN_PRIORITY); // 设置低优先级
Thread lowPriorityThread2 = new Thread(() -> {
try {
resourceAccess.accessResource(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "LowPriorityThread2");
lowPriorityThread2.setPriority(Thread.MIN_PRIORITY); // 设置低优先级
lowPriorityThread1.start();
lowPriorityThread2.start();
try{
Thread.sleep(100);
}catch(InterruptedException e){}
highPriorityThread.start();
}
}
代码解读:
Semaphore
设置为非公平信号量,当有多个线程在等待许可时,不保证按照请求的顺序授予许可。ReentrantLock
用于进一步控制对资源的同步访问。- 在
main
方法中创建了不同优先级的线程,并让低优先级的线程先启动。 - 因为
Semaphore
设置为非公平,且线程有优先级,所以高优先级的线程有更大概率先获得许可。
请注意: 线程优先级并不能保证绝对的执行顺序,它只是给线程调度器一个提示。在不同的操作系统和JVM实现中,线程优先级的效果可能会有所不同。而且过多的依赖线程优先级可能会导致代码难以调试和维护。
4. 总结
通过以上几个场景的演练,相信你已经对Semaphore
、ReentrantLock
和CountDownLatch
的“组合拳”有了一定的了解。在实际开发中,我们可以根据具体的需求,灵活地将它们组合起来,解决各种复杂的并发问题。
最后,提醒大家,并发编程是一门“易学难精”的学问,需要不断地学习和实践。希望今天的分享能对你有所帮助,如果你有任何问题或想法,欢迎在评论区留言交流。
记住,熟能生巧,多写多练,你也能成为并发编程高手!