你好,我是老码农张三,一个在并发编程领域摸爬滚打了多年的老家伙。今天,咱们深入探讨一下 Java 并发编程中一个非常重要的工具——Semaphore
,特别是它在公平性和非公平性方面的表现,以及这些特性对并发程序的影响。对于你这样的高级开发者来说,理解这些细节能够帮助你写出更高效、更可靠的并发代码。
什么是 Semaphore?
首先,咱们得确保大家对 Semaphore
有一个共同的认知。简单来说,Semaphore
就像一个停车场,它维护着一定数量的“许可证”(permit)。线程要访问共享资源,就必须先获得一个许可证。如果许可证用完了,线程就得等待,直到有许可证被释放出来。
Semaphore
主要用于限制并发访问资源的线程数量,从而避免资源被过度使用,保证系统的稳定性和性能。比如,你想限制数据库连接的数量,或者控制同时访问某个 API 的线程数,Semaphore
都能派上用场。
Java 中的 Semaphore
类位于 java.util.concurrent
包下,它提供了两种常用的构造函数:
Semaphore(int permits)
:创建一个Semaphore
,并初始化许可证的数量。Semaphore(int permits, boolean fair)
:创建一个Semaphore
,并指定是否为公平模式。fair
参数决定了Semaphore
的公平性。
公平模式 vs 非公平模式
Semaphore
的核心在于它的公平性策略,这直接影响了线程获取许可证的顺序。我们来详细对比一下这两种模式:
公平模式 (Fair Mode)
在公平模式下,Semaphore
会维护一个等待队列。线程在请求许可证时,会按照请求的先后顺序加入到队列中。当有许可证释放时,Semaphore
会从队列的头部开始,依次给线程分配许可证。
优点:
- 避免饥饿: 公平模式保证了先来的线程先获得许可证,避免了某些线程长时间等待而无法获得许可证的情况,也就是所谓的“饥饿”问题。
- 更公平的资源分配: 每个线程都有机会获得许可证,保证了资源的公平分配。
缺点:
- 性能开销: 为了维护等待队列和保证请求的顺序,公平模式的实现会引入额外的开销,例如线程上下文切换,可能会降低性能。
- 吞吐量较低: 由于线程必须按顺序获取许可证,公平模式可能会限制系统的吞吐量。
代码示例:
import java.util.concurrent.Semaphore;
public class FairSemaphoreExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个公平的 Semaphore,初始许可证数量为 1
Semaphore semaphore = new Semaphore(1, true);
// 创建多个线程模拟资源竞争
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 尝试获取许可证");
semaphore.acquire(); // 获取许可证
System.out.println("线程 " + threadId + " 获取到许可证,开始执行任务");
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("线程 " + threadId + " 释放许可证");
semaphore.release(); // 释放许可证
}
}).start();
}
}
}
在这个例子中,我们创建了一个公平的 Semaphore
。你会发现,线程获取许可证的顺序大致上是按照它们启动的顺序进行的。
非公平模式 (Non-Fair Mode)
在非公平模式下,Semaphore
不维护等待队列,或者说,它会优先尝试直接获取许可证。当一个线程请求许可证时,Semaphore
会首先检查是否有可用的许可证。如果有,就直接分配给该线程;如果没有,才将该线程加入等待队列。
优点:
- 性能高: 非公平模式减少了维护等待队列的开销,提高了性能和吞吐量。
- 减少上下文切换: 线程可以直接尝试获取许可证,减少了线程上下文切换的次数。
缺点:
- 可能导致饥饿: 后来的线程可能抢先获得许可证,导致先来的线程长时间等待,甚至发生“饥饿”现象。
- 不公平的资源分配: 某些线程可能频繁地获得许可证,而其他线程则较少获得。
代码示例:
import java.util.concurrent.Semaphore;
public class NonFairSemaphoreExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个非公平的 Semaphore,初始许可证数量为 1
Semaphore semaphore = new Semaphore(1, false);
// 创建多个线程模拟资源竞争
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 尝试获取许可证");
semaphore.acquire(); // 获取许可证
System.out.println("线程 " + threadId + " 获取到许可证,开始执行任务");
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("线程 " + threadId + " 释放许可证");
semaphore.release(); // 释放许可证
}
}).start();
}
}
}
在这个例子中,我们创建了一个非公平的 Semaphore
。你会发现,线程获取许可证的顺序是不确定的,后启动的线程有可能先获得许可证。
如何选择?
选择公平模式还是非公平模式,需要根据具体的应用场景进行权衡:
- 如果你的应用对公平性有严格的要求,或者需要避免线程饥饿,那么应该选择公平模式。 例如,在某些实时系统中,需要保证每个任务都有机会被执行。
- 如果你的应用对性能要求较高,并且可以容忍一定程度的不公平性,那么应该选择非公平模式。 例如,在高并发的 Web 服务器中,优先保证系统的吞吐量。
通常情况下,非公平模式是默认的选择,因为它在大多数场景下都能提供更好的性能。但是,你必须清楚地了解两种模式的优缺点,并根据实际情况做出选择。
技术细节与性能优化
对于高级开发者来说,仅仅了解 Semaphore
的基本概念是不够的。我们还需要深入了解其内部实现,以及如何进行性能优化。
Semaphore
的内部实现
Semaphore
的核心在于它维护的计数器和等待队列。在 Java 的实现中,Semaphore
内部使用了一个 Sync
抽象类,它继承自 AbstractQueuedSynchronizer
(AQS)。AQS 是 Java 并发框架的基础,它提供了一种构建锁和同步器的框架。Semaphore
通过 AQS 来管理线程的阻塞和唤醒。
关键方法:
acquire()
:尝试获取一个许可证。如果许可证不足,线程会被阻塞。在公平模式下,线程会被加入等待队列;在非公平模式下,线程会尝试直接获取许可证,如果失败才加入队列。release()
:释放一个许可证。释放许可证时,Semaphore
会唤醒等待队列中的一个或多个线程。tryAcquire()
:尝试获取一个许可证,如果获取不到则立即返回false
,不会阻塞线程。这在实现非阻塞的并发控制时非常有用。
AQS 的作用:
- 状态管理: AQS 维护了一个整数状态值,
Semaphore
用它来表示可用的许可证数量。 - 线程阻塞与唤醒: AQS 使用
LockSupport
类来实现线程的阻塞和唤醒。当线程无法获取许可证时,会被阻塞;当许可证释放时,会被唤醒。 - 等待队列: AQS 维护了一个等待队列,用于管理被阻塞的线程。
性能优化技巧
在实际应用中,我们可以通过一些技巧来优化 Semaphore
的性能:
- 选择合适的模式: 根据应用场景选择公平模式或非公平模式。如果性能是首要考虑因素,并且可以容忍一定程度的不公平性,那么非公平模式通常是更好的选择。
- 调整许可证数量: 根据系统的负载和资源情况,合理地调整
Semaphore
的初始许可证数量。如果许可证数量设置过小,可能会导致线程频繁阻塞,影响性能;如果许可证数量设置过大,可能会导致资源浪费。 - 使用
tryAcquire()
: 在某些场景下,可以使用tryAcquire()
方法来避免线程阻塞。例如,在处理一些非关键任务时,如果获取不到许可证,可以直接放弃任务,而不是阻塞线程。这可以提高系统的响应速度。 - 避免过度使用:
Semaphore
是一种同步工具,过度使用会降低系统的并发性能。在使用Semaphore
时,应该尽量减少临界区的代码量,避免长时间持有许可证。 - 考虑使用更轻量级的同步工具: 在某些场景下,
Semaphore
并不是最佳的选择。例如,如果只需要控制单个资源的访问,可以使用ReentrantLock
或者AtomicInteger
等更轻量级的同步工具,它们通常具有更好的性能。 - 监控与调优: 持续监控
Semaphore
的使用情况,例如线程的阻塞时间、许可证的利用率等。根据监控结果,进行相应的调整和优化,可以提高系统的整体性能。
实际应用场景
Semaphore
在并发编程中有着广泛的应用,下面列举一些常见的场景:
限制并发访问数据库连接
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Semaphore;
public class DatabaseConnectionPool {
private static final int MAX_CONNECTIONS = 10;
private final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS);
private final String url;
private final String user;
private final String password;
public DatabaseConnectionPool(String url, String user, String password) {
this.url = url;
this.user = user;
this.password = password;
}
public Connection getConnection() throws InterruptedException, SQLException {
semaphore.acquire(); // 获取许可证,限制并发连接数
try {
return DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
semaphore.release(); // 如果连接失败,释放许可证
throw e;
}
}
public void releaseConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
// 忽略异常
} finally {
semaphore.release(); // 释放许可证
}
}
}
public static void main(String[] args) throws InterruptedException, SQLException {
DatabaseConnectionPool pool = new DatabaseConnectionPool(
"jdbc:mysql://localhost:3306/test", "root", "password");
// 模拟多个线程获取数据库连接
for (int i = 0; i < 20; i++) {
new Thread(() -> {
Connection connection = null;
try {
connection = pool.getConnection();
System.out.println(Thread.currentThread().getName() + " 获得了连接");
Thread.sleep(2000); // 模拟数据库操作
} catch (InterruptedException | SQLException e) {
e.printStackTrace();
} finally {
pool.releaseConnection(connection);
System.out.println(Thread.currentThread().getName() + " 释放了连接");
}
}, "Thread-" + i).start();
}
}
}
在这个例子中,我们使用 Semaphore
来限制同时连接数据库的线程数量,从而避免数据库连接过多,导致系统崩溃。
限制同时访问 API 的线程数
import java.util.concurrent.Semaphore;
public class ApiRateLimiter {
private final Semaphore semaphore;
public ApiRateLimiter(int permits) {
this.semaphore = new Semaphore(permits);
}
public void executeApiCall() throws InterruptedException {
semaphore.acquire(); // 获取许可证,限制并发访问
try {
// 执行 API 调用
System.out.println(Thread.currentThread().getName() + " 执行 API 调用");
Thread.sleep(1000); // 模拟 API 调用时间
} finally {
semaphore.release(); // 释放许可证
}
}
public static void main(String[] args) throws InterruptedException {
ApiRateLimiter limiter = new ApiRateLimiter(5); // 限制并发访问数量为 5
// 模拟多个线程访问 API
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
limiter.executeApiCall();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
}
}
在这个例子中,我们使用 Semaphore
来限制同时访问某个 API 的线程数量,从而避免 API 被过度请求,导致服务不可用。
实现线程池
虽然 Java 提供了内置的线程池,但是使用 Semaphore
也可以实现一个简单的线程池。
import java.util.concurrent.Semaphore;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool {
private final Semaphore semaphore;
private final Executor executor;
private final int corePoolSize;
public CustomThreadPool(int corePoolSize, int maxPoolSize) {
this.corePoolSize = corePoolSize;
this.semaphore = new Semaphore(maxPoolSize);
this.executor = Executors.newFixedThreadPool(corePoolSize);
}
public void submit(Runnable task) {
try {
semaphore.acquire();
executor.execute(() -> {
try {
task.run();
} finally {
semaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断
} catch (RejectedExecutionException e) {
// 线程池已关闭,无法提交任务
semaphore.release(); // 确保释放信号量
System.err.println("任务被拒绝,线程池已满或已关闭");
}
}
public void shutdown() {
((ThreadPoolExecutor) executor).shutdown();
try {
if (!((ThreadPoolExecutor) executor).awaitTermination(60, TimeUnit.SECONDS)) {
((ThreadPoolExecutor) executor).shutdownNow();
}
} catch (InterruptedException e) {
((ThreadPoolExecutor) executor).shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws InterruptedException {
CustomThreadPool threadPool = new CustomThreadPool(2, 5);
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
threadPool.submit(() -> {
System.out.println("执行任务 " + taskNumber + ",线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
threadPool.shutdown();
}
}
在这个例子中,我们使用 Semaphore
来限制同时执行的任务数量,从而控制线程池的并发度。
总结
Semaphore
是一个强大的并发工具,它提供了灵活的并发控制机制。理解 Semaphore
的公平性和非公平性,以及它们对性能的影响,对于编写高效、可靠的并发程序至关重要。作为高级开发者,你应该深入了解 Semaphore
的内部实现,掌握性能优化技巧,并在实际应用中灵活运用。希望这篇文章能够帮助你更好地理解和使用 Semaphore
,写出更出色的并发代码!
扩展阅读
- Java 并发编程的艺术: 深入探讨 Java 并发编程的经典书籍,涵盖了 AQS、锁、并发容器等内容。
- Java 并发编程实战: 另一本 Java 并发编程的优秀书籍,提供了丰富的实践案例。
- JDK 源码: 阅读 JDK 源码,可以帮助你深入理解
Semaphore
的实现细节。
记住,并发编程是一个复杂而充满挑战的领域。不断学习,不断实践,才能在这个领域里越走越远。加油,老铁!