JUC锁机制

image-20260210113908812

我们都知道,synchronized是 Java 关于锁的一个关键字,那么,它自动进行锁的释放的,所以只存在两种释放锁的情况

  • 获取了对应方法锁的对象执行完了该方法
  • 线程执行异常或被中断,JVM会让线程自动释放锁

不仅如此,synchronized 的局限性还在于

  • 无法中断等待锁的线程,不可响应中断。
  • 无法设置超时,只能无限等待。
  • 无法尝试获取锁而不阻塞,非阻塞尝试。
  • 读写无法分离

它不够灵活,而且容易出现阻塞,因为方法或者代码块一旦执行,就独占该锁,直到结束才返回锁,此时就出现了JUC中的各种锁,它们灵活且强大

locks包概述

对于 Lock 接口本身,就有一些需要做铺垫的内容讲解,这里我们先看看 JUC 的锁机制主要包中都有什么内容,先了解一下

前面说过,Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作,所以说,它这个接口是代表着什么呢,首先看看 JUC lock 包下的内容

image-20260210120942555

其中接口有这三个:

  • Lock 就是锁的接口,定义了显式加锁和解锁的规范,是锁的顶层接口,所有具体锁都实现这个接口。
  • ConditionLock 的配套接口,用于实现 条件等待 等协调线程用的监视器
  • ReadWriteLock是读写锁接口,允许同时多个读操作,但只允许一个写操作。

对于抽象类和具体的类,也简单说一下:

  • AbstractOwnableSynchronizer,就是提供对“拥有者线程”信息的支持。维护一个 Thread owner 字段,表示当前持有锁的线程。

    image-20260210121229305
  • LockSupport不熟,看了一眼,大概就是用于实现线程阻塞和唤醒的工具,基于 Unsafe 实现的Object.wait() / notify() 的轻量级替代品

  • AbstractQueuedLongSynchronizerAbstractQueuedSynchronizer,这俩就是 AQS,是整个 JUC 锁体系的核心框架,大部分锁都是基于 AQS 实现的,没有 Long 的那个就只不过是用于 int 类型的 state,两者几乎一样

  • ReentrantLock这个和下面的三个锁都是重点要说的锁,它是可重入互斥锁,功能比 synchronized 强大。

  • ReentrantReadWriteLock是读写分离的可重入锁,实现了 ReadWriteLock 接口

  • StampedLock是对ReentrantReadWriteLock的性能提升版本,所以它也是读写分离锁,它支持乐观读,无需阻塞,提升吞吐

Lock 接口

synchronized 是 JVM 层面的隐式锁;Lock 是 API 层面的显式锁。

首先,Lock 接口存在这些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Lock {
void lock();

void lockInterruptibly() throws InterruptedException;

boolean tryLock();

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();

Condition newCondition();
}

对于这些方法,其中,大部分看名字都能猜出来,JUC 中的锁大部分都实现了这个接口

  • void lock():获取锁。如果锁不可用,当前线程阻塞直到获得锁。
    • 不可中断,即使调用 thread.interrupt() 也不会抛出异常
    • 而且必须配合 try-finally 使用,确保释放锁。如果忘记 unlock(),会导致死锁
  • void lockInterruptibly() throws InterruptedException:可中断地获取锁,也就是说,除非当前线程被中断了,否则请求获取锁。
    • 如果线程在等待锁的过程中被中断了,会立即抛出 InterruptedException,并退出等待。在入口处若已中断,也会抛异常。
    • 一般在长时间等待锁时,外部可以中断线程以避免资源浪费。
  • boolean tryLock()立即尝试获取锁,不阻塞
    • 避免死锁、实现非阻塞算法、高并发下的乐观策略。别忘了解锁
  • boolean tryLock(long time, TimeUnit unit) throws InterruptedException:加中断和超时的版本,在指定时间内尝试获取锁
    • 成功获取锁 → 返回 true
    • 等待期间被中断 → 抛 InterruptedException
    • 超时仍未获取 → 返回 false
  • void unlock():释放锁
    • 只有持有锁的线程才能调用 unlock(),否则抛异常,通常只在 finally 块中调用。
    • 对于可重入锁,lock() 调用 n 次,就需要 unlock() n 次才能真正释放。
  • Condition newCondition():创建一个与该 Lock 绑定的 Condition 对象,后面细说

对于 Lock 接口,官方有内存同步语义的指定

所有 Lock 实现必须保证与 synchronized 相同的内存可见性语义

这意味着使用 Lock 也能保证线程间的数据可见性lock() 成功等价于进入到 synchronized 块,也就是happens-before 获取锁的线程看到之前释放锁的线程的所有写操作,unlock() 成功等价于退出 synchronized 块,释放锁前的修改对下一个获取锁的线程可见

源码注释中还多次强调了这些事:

  1. 某些实现可以检测死锁并抛出异常

  2. 中断支持不是强制的,但若支持,必须遵守语义

  3. 不要用 Lock 对象本身作为 synchronized 的监视器

    1
    synchronized(lock) { ... }   // 真有人会这么写吗
  4. 参数为 null 时应抛 NullPointerException。慎防NPE

可重入锁:ReentrantLock

使用ReentrantLock

java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁

对于传统synchronized代码,我们大概需要这样写

1
2
3
4
5
6
7
8
9
public class Counter {
private int count;

public void add(int n) {
synchronized(this) {
count += n;
}
}
}

如果用ReentrantLock替代,可以把代码改造为

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;

public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}

因为synchronized是 Java 语言层面提供的语法,是 Java 对并发编程的一个支持,所以我们不需要考虑异常等其他情况,而ReentrantLock是 Java 代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

ReentrantLock是可重入锁,默认是非公平锁,它和synchronized一样,一个线程可以多次获取同一个锁。但是和synchronized不同的是,ReentrantLock可以尝试获取锁,也就是说,ReentrantLock是对上面 Lock 接口中提到的tryLock方法有所实现

1
2
3
4
5
6
7
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

其中,ReentrantLock中的可中断锁在任务取消中的价值也很明显

1
2
3
4
5
6
7
8
9
10
11
12
13
public void doWork() {
try {
lock.lockInterruptibly(); // 等待锁时可被中断
try {
// 临界区
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
// 可在此处记录日志或清理资源
}
}

这样就支持了用户取消操作这样的业务

分析ReentrantLock

结构分析

首先,ReentrantLock 实现了 Lock 接口,整体架构使用了Sync + AQS,至于这二位高手我下面会细说,这里只是简单的说一下了

image-20260210135605042

ReentrantLock 本身不直接实现锁逻辑,而是将所有操作委托给内部的 Sync 对象

Sync 是一个抽象静态内部类,继承自 AbstractQueuedSynchronizer(AQS),至于 AQS 就是并发同步器框架,它是锁机制的具体实现

image-20260210135631696

可以看到,Sync 中存在大量的模板方法,它们真正的实现了ReentrantLock 中锁机制的真正内容,ReentrantLock 通过继承 AQS 并通过各种各样的 Sync 实现其模板方法,将真正的锁机制逻辑进行委托,来定义自己的同步语义。

image-20260210135822303

那么,它还通过state 表示锁的持有次数,来实现的可重入性

image-20260210135939001
  • 对于 ReentrantLockstate == 0 表示锁未被持有;state > 0 表示锁已被持有,数值 = 当前线程的重入次数;同时,AQS 还维护一个 exclusiveOwnerThread 字段,记录当前持有锁的线程。这正是可重入性的实现基础

对于如何上锁,就例如我们看看 sync.tryLock() 方法,它是非公平尝试获取锁,可以看到,就算强如 JUC,也离不开 CAS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@ReservedStackAccess
final boolean tryLock() {
Thread current = Thread.currentThread();
int c = getState(); // 获取当前 state(即重入次数)

if (c == 0) {
// 锁空闲,尝试 CAS 获取
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current); // 记录持有者
return true;
}
// 如果当前线程就是锁的持有者,就允许再次 lock(),只是把 state 加 1。
} else if (getExclusiveOwnerThread() == current) {
// 当前线程 already 持有锁 → 可重入!
if (++c < 0) throw new Error("Maximum lock count exceeded");
setState(c); // 重入次数 +1
return true;
}
return false;
}
锁的获取

然后来看看锁的获取流程,以 lock() 为例

零帧起手进行委托

image-20260210141223004

进入 Sync.lock()

image-20260210141405496
  • 它会调用对应公平和非公平的策略,非公平直接抢,公平先看队列

  • 失败则进入 AQS 的 acquire 流程,自旋喽喽喽

    • acquire(1) 是 AQS 的模板方法,会调用子类实现的 tryAcquire

      image-20260210141543067
      image-20260210141620220

      如果 tryAcquire 返回 false,AQS 会将当前线程封装为 Node 加入CLH 等待队列,这也就是为毛 ReentrantLock的源码中一堆 Node,它会阻塞当前线程,等待被前面的节点唤醒

那么来看看带超时的重试锁,tryLock(timeout, unit) 的实现

image-20260210142151004

零帧起手内部调用内部synctryLockNanos(long nanos),可以看到这是支持中断的,而且,先尝试,失败了进入 AQS 处理

image-20260210142337845
锁的释放

对于锁的释放,还是零帧起手调用sync.release(1);,但是,它调用的是 AQS 的 release(1)

image-20260210141916168

进而调用 Sync.tryRelease(1)

image-20260210142058908

如果 free == true,AQS 会唤醒等待队列中的下一个线程,否则自旋

对 Condition 的支持

还是委托给对应的sync,然后再返回 AQS 内部的 ConditionObject,支持一些Condition的操作,例如等待和唤醒,这个下面细说

image-20260210142438483

公平 vs 非公平锁

对于ReentrantLock构造函数默认是非公平锁,当然,你可以指定,内置了两种 Sync 的实现,也就是说,用了组件化的设计模式

image-20260210140556570
  • 非公平锁是默认的,它更高性能,新来的线程可以直接插队抢锁,即使等待队列中有其他线程,也就是抢占型的,所以它吞吐量高,但是可能引起饥饿

    • NonfairSync.initialTryLock(),非公平锁初始化尝试锁的方法

      image-20260210140839412

      第一步就是用 CAS 无条件尝试抢锁,不管有没有人在排队!

  • 公平锁也就是ReentrantLockFairSync的实现,它是公平锁,只有当等待队列为空当前线程是队首 时,才允许获取锁。保证线程按请求顺序获得锁,避免饥饿,但是性能差了点

    • FairSync.initialTryLock()

      image-20260210141111159

      但是tryLock() 不遵守公平策略!即使你创建的是公平锁,tryLock() 也会直接尝试获取,或许是为了灵活性吧,谁知道呢

ReadWriteLock 接口

在传统互斥锁,如 synchronizedReentrantLock中:

  • 无论读还是写,都只能一个线程访问
  • 读操作是安全的:多个线程同时读同一份数据很多情况下不会引发竞态条件(当然前提是无写入)

ReadWriteLock 接口就是读写锁的思想,它把读写两个操作的锁进行分离,然后可以自己去分配,这特别适用于 读多写少 的场景

  • 允许多个读线程并发执行,那么写操作独占来保证数据一致性就够了
  • 读与写互斥,写与写互斥

接口定义极其简洁但语义丰富

1
2
3
4
public interface ReadWriteLock {
Lock readLock(); // 获取读锁
Lock writeLock(); // 获取写锁
}
锁类型 并发性 互斥关系
读锁(readLock) 多个线程可同时持有 与写锁互斥
写锁(writeLock) 仅一个线程可持有 与读锁和其他写锁互斥

对于内存可见性保证,读写锁一样遵循

成功获取读锁的线程,一定能看到之前所有写锁释放前的修改

这意味着写锁的 unlock()操作具有 happens-before 的性质,关系到后续读锁的 lock() 操作;而且读写锁天然提供了 volatile 语义,无需额外使用 volatile 字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 线程 A 写
writeLock.lock();
try {
data = newValue; // 修改共享数据
} finally {
writeLock.unlock(); // 释放写锁 → 内存屏障
}

// 线程 B 读
readLock.lock();
try {
use(data); // 一定能读到 newValue!
} finally {
readLock.unlock();
}

ReadWriteLock 接口本身不规定行为细节,留给具体实现决定。

而且一般情况下,这里会涉及到锁升级降级的问题,持有写锁的线程可获取读锁,这是锁降级,但是持有读锁的线程不能获取写锁,这是升级,因为升级很容易有死锁

而且读写分离锁的公平性策略也更复杂一些,这个公平性和上面提到的ReentrantLock的不太一样,这里是当写锁释放时,如果有等待的读者和写者,优先唤醒谁?而对于获取锁的公平情况,一般来说,可以切换

对于可重入性,具体实现有区别,下面细说

ReentrantReadWriteLock 为例,先来看看如何使用读写锁,因为它跟上面的和之前的synchronized不太一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Cache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock(); // 读锁
private final Lock writeLock = rwLock.writeLock(); // 写锁

public V get(K key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}

public void put(K key, V value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
}

读写分离的可重入锁:ReentrantReadWriteLock

如何使用

ReentrantReadWriteLock 实现了 ReadWriteLock 接口,内部维护两把锁:

  • 读锁(ReadLock):共享锁(Shared Lock),允许多个线程同时持有。
  • 写锁(WriteLock):独占锁(Exclusive Lock),同一时间只能被一个线程持有。

读操作不修改数据,因此可以并发;写操作会改变数据,必须互斥。

所以可以分成以下这四种情况的特性:

  1. 读读不互斥:多个线程可以同时获取读锁,进行并发读取。
  2. 读写互斥:有线程持有读锁时,其他线程无法获取写锁。有线程持有写锁时,其他线程无法获取读锁或写锁。
  3. 写写互斥:任何时候只有一个线程能持有写锁。
  4. 可重入:
  5. 不支持锁升级:持有读锁的线程不能直接获取写锁

来看如何使用它

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CacheExample {
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

private volatile String data;
private volatile boolean cacheValid = false;

public String getData() {
readLock.lock();
try {
if (cacheValid) {
return data;
}
} finally {
readLock.unlock();
}

// 缓存失效,需更新
writeLock.lock();
try {
if (!cacheValid) { // 再次检查(防止多个线程重复写)
data = fetchDataFromDB(); // 模拟耗时操作
cacheValid = true;
}
// 降级:先获取读锁
readLock.lock();
} finally {
writeLock.unlock(); // 释放写锁
}

try {
return data; // 安全读取
} finally {
readLock.unlock();
}
}

private String fetchDataFromDB() {
// 模拟数据库查询
return "fresh data";
}
}

然后,持有写锁获取读锁是可以的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
writeLock.lock();
try {
// 修改数据
data = newData;
cacheValid = true;

// 降级:先获取读锁(此时仍持有写锁)
readLock.lock();
} finally {
writeLock.unlock(); // 释放写锁,但仍持有读锁
}
// 此时其他线程仍无法写,但可以并发读(包括本线程)
try {
// 使用数据(安全,因为已降级)
use(data);
} finally {
readLock.unlock();
}

源码分析

结构分析

来看看它的源码,别忘了,它是在保证线程安全的前提下,最大化读操作的并发度。所以肯定倾向于读

image-20260211000704441
  • ReadLockWriteLock 是两个独立的 Lock 实现,但共享同一个 Sync 同步器。和 ReentrantLock一样,所有复杂的并发控制逻辑都封装在 Sync 及其子类中

其中,构造函数可指定是否为公平锁

1
2
new ReentrantReadWriteLock();        // 非公平(默认)
new ReentrantReadWriteLock(true); // 公平

对于它的公平和非公平含义,有一些区别

  • 非公平:写线程总是可以插队。读线程只有当队列头是写线程时才阻塞
  • 公平:无论读写,要同步队列中有等待者(hasQueuedPredecessors()),就必须排队。

来看它的核心的数据结构。AQS 状态编码。无数人认为这是 ReentrantReadWriteLock 最精妙的设计之一。

实际上,它就是利用一个 int(32位)表示两种锁的状态

image-20260211000847074
  • 写锁:用 state 低 16 位表示,值为 0 表示无写锁,>0 表示写锁重入次数;
  • 读锁:用 state 高 16 位表示,值为 0 表示无读锁,>0 表示读锁总持有数(多个线程的读锁总和)。

这样,他就不要维护读写两个变量了,能直接通过 CAS 原子地更新整个锁状态,巧妙,所以说,它的最大读/写锁重入次数均为 65535。(真的有这么多的时候吗)

读锁的获取与释放

读锁是共享锁,同一时间可被多个线程持有,支持重入,且写锁持有者可以获取读锁

读锁的获取,核心方法是tryAcquireShared + fullTryAcquireShared,线程调用 readLock().lock() 时,最终调用 Sync 类的 tryAcquireShared 方法,返回 1 成功,-1 失败,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 情况1:有写锁且不是当前线程持有 → 读锁获取失败
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;

int r = sharedCount(c); // 提取读锁计数(高16位)
// 情况2:无需阻塞 + 读锁未超限 + CAS更新state成功
if (!readerShouldBlock() && r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 处理读锁重入计数(记录当前线程的读锁持有数)
if (r == 0) { // 第一个获取读锁的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 第一个线程重入
firstReaderHoldCount++;
} else { // 其他线程,通过ThreadLocal记录重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != Thread.currentThread().getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 情况3:CAS失败/需要阻塞 → 进入完整重试逻辑
return fullTryAcquireShared(current);
}
  • 写锁被其他线程持有 → 读锁获取失败;
  • 写锁被当前线程持有 → 允许获取读锁
  • 通过 ThreadLocal<HoldCounter> 记录每个线程的读锁重入数

对于fullTryAcquireSharedtryAcquireShared 失败时,会进入该方法,通过自旋重试 CAS,同时处理重入逻辑,核心逻辑与 tryAcquireShared 一致,只是增加了循环重试

image-20260211101435971

读锁的释放,核心:tryReleaseShared 方法,线程调用 readLock().unlock() 时,最终调用 Sync 类的 tryReleaseShared 方法,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 1. 减少当前线程的读锁重入计数
if (firstReader == current) { // 第一个读线程
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else { // 其他读线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != Thread.currentThread().getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove(); // 重入数为0,移除ThreadLocal
if (count <= 0)
throw new IllegalMonitorStateException();
}
rh.count--;
}

// 2. CAS循环更新state(高16位-1)
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT; // 读锁计数减1(SHARED_UNIT=65536)
if (compareAndSetState(c, nextc)) {
// 返回是否完全释放(state=0)
return nextc == 0;
}
}
}
  • 先通过ThreadLocal/HoldCounter减少当前线程的读锁重入数,再通过 CAS 循环更新 state,只有当 state 整个都变为 0 时,才返回 true,表示读写锁都释放
读锁重入计数的管理

对于读锁,由于是共享的,所以需要为每个线程单独记录其持有的读锁数量。

这个相关的重要字段在 Sync

image-20260211002211130

对于HoldCounter 结构如下

image-20260211002300332

那么,为什么要这样设计?首先,firstReader保证当有且只有一个线程读时,无需访问 ThreadLocal,性能极高,然后,使用cachedHoldCounter在多线程交替读的场景下,大概率命中缓存,避免频繁 ThreadLocal.get()readHolds作为兜底方案,存储所有非首个和非最近的线程的计数,前面都找不到的时候来这里找

写锁的获取与释放

写锁是独占锁(排他锁),同一时间只能有一个线程持有,支持重入。所以它的实现和上面的可重入锁区别不大

对于写锁的获取,核心是tryAcquire 方法,线程调用 writeLock().lock() 时,最终会调用 Sync 类的 tryAcquire 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); // 获取当前state
int w = exclusiveCount(c); // 提取写锁计数(低16位)

// 情况1:state≠0(有读锁或写锁)
if (c != 0) {
// 子情况1:有读锁(w=0但c≠0),或写锁被其他线程持有 → 获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 子情况2:写锁重入次数超过最大值(65535)→ 抛异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 子情况3:当前线程持有写锁 → 重入,更新state
setState(c + acquires);
return true;
}

// 情况2:state=0(无读写锁)
// writerShouldBlock():公平/非公平策略判断是否阻塞
// 非公平:返回false(允许插队);公平:判断是否有等待队列 → 有则阻塞
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
// CAS成功 → 设置当前线程为写锁持有者
setExclusiveOwnerThread(current);
return true;
}
  • 有读锁存在时,写锁获取失败,而或者写锁被其他线程持有,获取失败;

写锁的释放,核心在tryRelease 方法,线程调用 writeLock().unlock() 时,最终调用 Sync 类的 tryRelease 方法,逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
// 校验:当前线程未持有写锁 → 抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // state减释放数
// 判断写锁是否完全释放(低16位为0)
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null); // 释放写锁持有者
setState(nextc); // 更新state
return free; // 返回是否完全释放
}
  • 因为写锁支持重入,每次 unlock 只会将 state 减 1,直到低 16 位为 0 才真正释放;然后写锁释放完成后会清空写锁持有者,允许其他线程竞争锁。

邮戳锁:StampedLock

使用StampedLock

前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。

如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

要是想进一步提升在读多写少下的并发执行效率,Java 8 引入了新的读写锁:StampedLock。它对写锁独占,对于悲观读锁是共享的,这和ReentrantReadWriteLock是一样的,但是StampedLock多了个乐观读锁,是基于无锁读加校验实现的

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

它乐观在估计读的过程中大概率不会有写入,所以说需要再读一次的这种检测手段

来看使用例子,我们模拟一个 坐标点(Point)对象,写操作就是移动点的位置(move()),读操作就是计算该点到原点的距离(distanceFromOrigin()

这是一个典型的 “读多写少” 场景,非常适合 StampedLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import java.util.concurrent.locks.StampedLock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class StampedLockDemo {

static class Point {
private double x, y;
private final StampedLock lock = new StampedLock();

// 写操作:独占写锁
public void move(double deltaX, double deltaY) {
long stamp = lock.writeLock(); // 获取写锁(阻塞式)
try {
System.out.println(Thread.currentThread().getName() + " 获取写锁,正在移动点...");
x += deltaX;
y += deltaY;

Thread.sleep(500);
System.out.println("点已移动至 (" + x + ", " + y + ")");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlockWrite(stamp); // 解除写锁,必须传入 stamp
System.out.println(Thread.currentThread().getName() + " 释放写锁");
}
}

// 读操作:使用 乐观读+降级到悲观读
public double distanceFromOrigin() {
// 尝试乐观读(不加锁!)
long stamp = lock.tryOptimisticRead();
double currentX = x;
double currentY = y;

// 注意:乐观读期间不能有阻塞或复杂逻辑!
try {
Thread.sleep(10); // 极短时间,模拟快速读
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

// 验证,在读取期间是否有写操作发生?
if (!lock.validate(stamp)) {
System.out.println(Thread.currentThread().getName() + " 乐观读失效,降级为悲观读锁...");

// 降级,获取悲观读锁,会阻塞写线程,退化为ReentrantReadWriteLock
stamp = lock.readLock();
try {
currentX = x; // 重新读取
currentY = y;
} finally {
lock.unlockRead(stamp); // 传入悲观读锁解锁
}
}

return Math.sqrt(currentX * currentX + currentY * currentY);
}
}

public static void main(String[] args) throws InterruptedException {
Point point = new Point();

ExecutorService executor = Executors.newFixedThreadPool(4);

// 启动 2 个写线程(每 2 秒移动一次)
for (int i = 0; i < 2; i++) {
final int id = i;
executor.submit(() -> {
for (int j = 0; j < 3; j++) {
// Writer-0 移动 (1,1), Writer-1 移动 (2,2)
point.move(id + 1, id + 1);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}

// 启动 3 个读线程并发
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
for (int j = 0; j < 5; j++) {
double dist = point.distanceFromOrigin();
System.out.println(Thread.currentThread().getName() + " 读取距离: " + String.format("%.2f", dist));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}

executor.shutdown();
executor.awaitTermination(15, TimeUnit.SECONDS);
System.out.println("\n所有任务完成!");
}
}

ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。

注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。

写线程获取写锁后,其他所有读/写线程都会被阻塞,这是肯定的,然后读线程大多数时候直接通过 乐观读 快速返回结果,极其偶尔的情况下会出现降级为悲观读锁的情况。所以说,在没有写操作的间隙中,读线程几乎无开销地并发执行,只有在真正发生冲突时,才退化为传统读锁,保证正确性。

StampedLock把读锁细分为乐观读和悲观读,这样是能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

嗯,别忘了

  • 必须用 stamp 解锁unlockWrite(stamp) / unlockRead(stamp)
  • 乐观读后必须 validateif (!lock.validate(stamp)) { ... }
  • 不能重入:同一线程不要重复获取写锁

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能

源码分析

结构分析

它的核心思想其实比较简单,就是用邮戳Stamp作为版本号,来实现读的乐观锁,代替传统的锁持有

三种模式:

  • Writing (写):独占模式,类似 ReentrantReadWriteLock 的写锁。
  • Reading (悲观读):共享模式,类似 ReentrantReadWriteLock 的读锁。
  • Optimistic Reading (乐观读)无锁读!通过版本校验来判断读取期间是否有写入发生。

首先,它是一个比较独立的锁,它的实现既不依赖上面说的哪些接口,也不依赖 AQS

image-20260211004952241

那么大伙问题可能就来了,那么它是如何实现的锁核心机制包括 CAS 的,别急,随便看一个可能会涉及到 CAS 的方法,例如悲观读的解锁,他需要内部通过 CAS 来减少读者计数

image-20260211005419521

红框的语句的这个地方调用了实际的 CAS 的逻辑来进行原子更新

image-20260211005437531

?有思路,好熟悉,你是不能忘记的方法)))))))

image-20260211005518571

大彻大悟,原来你内部是使用的 Unsafe,而且StampedLock 对内存可见性的控制极为严谨,大量使用了 Unsafe 的内存屏障指令。

  • U.storeStoreFence()
    • writeLock 成功后调用。
    • 确保临界区内的写操作不会被重排到获取写锁之前。
  • U.loadFence()
    • validate() 开始时调用。
    • 确保临界区内的读操作不会被重排validate() 之后。

这些屏障是 StampedLock 能提供正确 happens-before 语义的关键。

而对于当快速路径(Fast Path)失败时,StampedLock 也会进入基于 CLH 队列的等待逻辑。

image-20260211005738892
  • 多个连续的读者请求会被合并成一个节点加入队列。
  • 当这个“读者组”的头节点被唤醒时,它会一次性唤醒所有组内成员。
  • 目的:减少上下文切换,提高吞吐量,实现批量唤醒。吞吐量优先,非绝对公平

对于其核心的数据结构,如下

1
2
3
4
5
6
7
private static final int LG_READERS = 7; // 2^7 - 1 = 127, 最大读者数
private static final long RUNIT = 1L; // 读者单位
private static final long WBIT = 1L << LG_READERS; // 128, 写标志位
private static final long RBITS = WBIT - 1L; // 127, 读者计数掩码 (0x7F)
private static final long RFULL = RBITS - 1L; // 126, 读者计数满值
private static final long ABITS = RBITS | WBIT; // 127+128=255, 锁模式掩码
private static final long SBITS = ~RBITS; // ...1111111110000000, 序列号部分

上面的内容不太重要的,但是这些内容构成了StampedLockstate的核心数据结构

1
2
3
| 63 ... 8 | 7 | 6 ... 0 |
|----------|---|---------|
| 序列号 | W | 读者计数|
  • 最低 7 位 ([6:0]):当前读者数量(Reader Count),最大为 127 (RFULL)。
  • 第 7 位 ([7])写锁标志(Write Bit)。如果为 1,表示当前有写锁。
  • 高位 ([63:8])版本序列号(Sequence Number)。每次写锁的获取和释放都会递增它。

对于初始状态

image-20260211004718700
乐观读和悲观读

首先,来讲乐观读,核心方法是tryOptimisticRead() & validate()

image-20260211004911216
  • 首先,tryOptimisticRead() 返回一个只包含序列号stamp。对于无锁读取,线程可以自由地读取共享变量这个邮戳stamp,乐观校验就调用 validate(stamp)
    • 如果成功,说明从 tryOptimisticReadvalidate 之间没有任何写操作发生,读取的数据是一致的快照。
    • 如果失败,说明期间有写操作,数据可能不一致,通常降级为悲观读。

乐观读期间不能有任何同步操作(如 synchronized, Lock),否则会破坏其无锁语义。

对于悲观读,核心方法是readLock() & unlockRead()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public long readLock() {
long s = U.getLongOpaque(this, STATE) & RSAFE, nextState;
// 尝试快速路径:直接 CAS 增加读者计数
if (casState(s, nextState = s + RUNIT))
return nextState;
else
return acquireRead(false, false, 0L); // 进入完整队列等待逻辑
}

public void unlockRead(long stamp) {
long s, m;
// 校验 stamp 的读者部分是否有效,并且序列号匹配
if ((stamp & RBITS) != 0L) {
while (((s = state) & SBITS) == (stamp & SBITS) &&
((m = s & RBITS) != 0L)) {
if (m < RFULL) {
// CAS快速路径来减少读者计数
if (casState(s, s - RUNIT)) {
// 如果是最后一个读者,唤醒写者
if (m == RUNIT) signalNext(head);
return;
}
} else if (tryDecReaderOverflow(s) != 0L) // 处理读者溢出
return;
}
}
throw new IllegalMonitorStateException();
}
  • 行为与 ReentrantReadWriteLock 的读锁不能说是非常相似,只能说是一模一样
  • 只不过,它使用 stamp 作为凭证,而不是隐式的线程所有权,内部使用 CAS

那么,发生了什么样的情况,或者我们调用了什么方法,才会让乐观读锁降级变成悲观读锁

从源码和实际使用来看,有两类核心场景会触发这种切换,本质都是 “乐观读的有效性被破坏”:

  • 乐观读的 validate() 验证失败

    这是最常见的触发条件,对应源码中 validate() 方法返回 false,此时必须切换为悲观读锁才能保证数据一致性。

    调用 tryOptimisticRead() 获取乐观戳记时,如果此时有线程持有写锁(state & WBIT != 0)或者 state 版本号对不上,直接返回 0,戳记无效,必须切换为悲观读

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    double distanceFromOrigin() {
    // 1. 尝试获取乐观读戳记
    long stamp = sl.tryOptimisticRead();
    try {
    retryHoldingLock: for (;; stamp = sl.readLock()) {
    // 乐观读戳记无效(tryOptimisticRead返回0),直接走悲观读
    if (stamp == 0L)
    continue retryHoldingLock;

    // 2. 无锁读取数据(乐观读的核心:无阻塞)
    double currentX = x;
    double currentY = y;

    // 3. 验证乐观读戳记:失败则重新循环,调用readLock()获取悲观读锁
    if (!sl.validate(stamp))
    continue retryHoldingLock;

    // 验证成功,返回计算结果
    return Math.hypot(currentX, currentY);
    }
    } finally {
    // 4. 若最终用了悲观读锁,释放锁
    if (StampedLock.isReadLockStamp(stamp))
    sl.unlockRead(stamp);
    }
    }
  • 主动转换

    源码中提供了 tryConvertToReadLock(long stamp) 方法,允许主动将 “乐观读戳记” 转换为 “悲观读戳记”,本质也是从乐观读切换为悲观读:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public long tryConvertToReadLock(long stamp) {
    long a, s, nextState;
    while (((s = state) & SBITS) == (stamp & SBITS)) {
    if ((a = stamp & ABITS) >= WBIT) {
    // 写锁戳记:释放写锁,获取读锁
    if (s != stamp) break;
    nextState = state = unlockWriteState(s) + RUNIT;
    signalNext(head);
    return nextState;
    } else if (a == 0L) { // 乐观读戳记(核心分支)
    // 读锁计数未溢出,尝试CAS获取悲观读锁
    if ((s & ABITS) < RFULL) {
    if (casState(s, nextState = s + RUNIT))
    return nextState; // 转换成功,返回悲观读戳记
    } else if ((nextState = tryIncReaderOverflow(s)) != 0L)
    return nextState;
    } else {
    // 已持有悲观读锁,直接返回戳记
    if ((s & ABITS) == 0L) break;
    return stamp;
    }
    }
    return 0L; // 转换失败
    }
    • 当我们调用 tryConvertToReadLock(stamp) 且传入的是乐观读戳记
    • 此时如果锁状态允许,无写锁、读锁计数未溢出,会通过 casState 原子更新 state,返回悲观读戳记,完成从乐观读到悲观读的切换
    • 如果转换失败(比如此时有写锁),返回 0,此时我们会主动调用 readLock() 获取悲观读锁。
写锁的操作

对于写,区别更不大了,方法是writeLock() & unlockWrite()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public long writeLock() {
long s = U.getLongOpaque(this, STATE) & ~ABITS, nextState;
// 尝试快速路径:如果锁空闲 (ABITS==0),直接设置 WBIT
if (casState(s, nextState = s | WBIT)) {
U.storeStoreFence(); // 存储屏障,保证后续写操作不会重排到此之前
return nextState;
}
return acquireWrite(false, false, 0L); // 进入完整队列等待逻辑
}

public void unlockWrite(long stamp) {
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
releaseWrite(stamp); // 释放并唤醒下一个等待者
}
  • 获取写锁时,会将 WBIT 位置 1,意思是写锁被获取,并阻塞所有读和写

  • 释放写锁时,会递增序列号(unlockWriteState 方法),这正是乐观读校验的关键。

    image-20260211005627643

Condition 接口

接口分析

在 Java 并发编程的早期,线程协作依赖synchronized + Object.wait/notify,但是一个对象只有一个 wait-set,那么它就无法区分不同条件,不能中断等待、不能灵活设置超时,还必须配合 synchronized 使用,气鼓包了

ConditionObject 的 monitor 方法(wait/notify)拆出来,变成独立对象,并与任意 Lock 实现绑定。

核心思想:

  • 一把 Lock 可以关联 多个 Condition,每个代表一种等待条件。
  • 每个 Condition 拥有独立的等待队列(wait-set)
  • 支持更丰富的等待/唤醒语义,中断、超时、绝对时间等
image-20260211104452708

Condition 接口的源代码如下,定义了这些方法,其中能做这些事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Condition {
// 类似Object.wait(),等待,直到被 signal 或中断
void await() throws InterruptedException;
// 等待,忽略中断,它会保留中断状态,但是不抛异常
void awaitUninterruptibly();
// 等待指定纳秒数,返回返回剩余时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待指定时间,返回 true 表示被 signal,false 表示超时
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待到指定截止时间
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待线程,类似 Object.notify()
void signal();
// 唤醒所有等待线程,类似 Object.notifyAll()
void signalAll();
}

无论是 await() 还是 signal()调用线程必须已经持有与该 Condition 关联的 Lock。否则会抛出 IllegalMonitorStateException

1
2
3
4
5
6
7
8
9
10
lock.lock();
try {
while (!conditionMet) {
condition.await(); // 自动释放锁,等待
}
// 条件满足,执行操作
condition.signal(); // 唤醒他人
} finally {
lock.unlock();
}

那么,它是如何原子性的释放锁然后挂起线程实现等待的,当你调用 condition.await() 时,发生以下原子操作

  1. 自动释放与该 Condition 关联的 Lock
  2. 将当前线程加入该 Condition 的等待队列
  3. 挂起线程(进入 waiting 状态)。

这保证了,不会因先释放锁再挂起而导致错过 signal

而当线程被唤醒(signal)或超时后,必须重新获取锁才能从 await() 返回。所以 await() 返回时,一定持有锁!

还有一点,文档明确指出:

spurious wakeup is permitted… always wait in a loop

即使没人调用 signal(),线程也可能被虚假唤醒(spurious wakeup)

所以永远不要用 if检查条件,要用 while

1
2
3
4
5
// ❌ 错误
if (buffer.isEmpty()) condition.await();

// ✅ 正确
while (buffer.isEmpty()) condition.await();

使用Condition

上面说了,使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。

但是,synchronized可以配合waitnotify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写waitnotify的功能呢?

那么,可以祭出Condition接口了,可以使用Condition对象来实现waitnotify的功能。

TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLockCondition来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();

public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll(); // this.notifyAll();
} finally {
lock.unlock();
}
}

public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await(); // this.wait();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
  • getTask()内部先判断队列是否为空,如果为空,线程就进行等待,直到另一个线程往队列中放入了一个任务唤醒了它,然后while()循环处理,就可以返回队列的元素了。
  • 线程1可以调用addTask()不断往队列中添加任务,放入一个任务,就唤醒一下getTask()处理的线程让他从等待中苏醒过来处理任务

可以发现,使用起来完全和notifyAll()wait()没啥差别,也满足了多线程协调运行的原则,当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来

1
2
3
4
5
6
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}

不仅如此,synchronized 只有一个隐式条件队列(通过 wait/notify),而 ReentrantLock 等锁很多都支持多个独立的 Condition,操作极为灵活,针对并发的操作可以跟线程隔开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

// 生产者
public void put(Item item) throws InterruptedException {
lock.lock();
try {
while (queue.isFull()) {
notFull.await(); // 等待“不满”条件
}
queue.add(item);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
}

// 消费者
public Item take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待“不空”条件
}
Item item = queue.remove();
notFull.signal(); // 唤醒生产者
return item;
} finally {
lock.unlock();
}
}

这是生产者-消费者模型的多线程并发标准写法

ReentrantReadWriteLock是读写分离锁,读共享、写独占,它的writeLock()支持创建Condition,而读锁不支持,因为读锁是共享锁,无法保证条件等待的原子性。那么,读写分离缓存也可以使用类似的思路实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* ReentrantReadWriteLock + Condition
* 读锁不能创建Condition,只有写锁可以
*/
public class ReadWriteLockConditionDemo {
// 缓存容器
private final Map<String, String> cache = new HashMap<>();
// 标记缓存是否初始化完成
private boolean cacheInited = false;

// 读写锁
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读锁
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
// 写锁
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

// 条件:缓存初始化完成
private final Condition cacheInitCondition = writeLock.newCondition();

// 初始化缓存,写线程调用
public void initCache() throws InterruptedException {
// 写锁加锁
writeLock.lock();
try {
if (cacheInited) {
System.out.println("缓存已初始化,无需重复初始化");
return;
}

// 模拟初始化缓存
System.out.println("开始初始化缓存...");
Thread.sleep(1000);
cache.put("key1", "value1");
cache.put("key2", "value2");
cacheInited = true;

// 唤醒所有等待“缓存初始化”的线程
cacheInitCondition.signalAll();
System.out.println("缓存初始化完成,唤醒所有等待线程");
} finally {
writeLock.unlock();
}
}

// 读取缓存,读线程调用
public String getCache(String key) throws InterruptedException {
// 先加读锁
readLock.lock();
try {
// 检查缓存是否初始化:未初始化则释放读锁,获取写锁等待
if (!cacheInited) {
// 必须先释放读锁,再获取写锁,否则会死锁:读锁不释放,写锁拿不到
readLock.unlock();
writeLock.lock();
try {
// 双重检查,防止多个线程同时进入写锁
if (!cacheInited) {
System.out.println(Thread.currentThread().getName() + " 缓存未初始化,等待...");
// 等待缓存初始化条件
cacheInitCondition.await();
}
// 重新获取读锁
readLock.lock();
} finally {
// 释放写锁
writeLock.unlock();
}
}

// 读取缓存
String value = cache.get(key);
System.out.println(Thread.currentThread().getName() + " 读取缓存:" + key + "=" + value);
return value;
} finally {
// 释放读锁
readLock.unlock();
}
}

// 测试代码
public static void main(String[] args) {
ReadWriteLockConditionDemo demo = new ReadWriteLockConditionDemo();

// 启动5个读线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
demo.getCache("key1");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "读线程-" + i).start();
}

// 延迟1秒启动写线程
new Thread(() -> {
try {
Thread.sleep(1000);
demo.initCache();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "写线程-初始化").start();
}
}

信号量:Semaphore

使用信号量

前面我们讲了各种锁的实现,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)。

多线程开发的过程中还有一种类型的受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。

这种限制数量的锁,如果用Lock数组来实现,就太麻烦了,针对于这种情况,我们可以使用信号量机制,对应 Java 里的实现就是 Semaphore

这种情况就可以使用Semaphore,它是一种计数器表示的信号量,用来保护一个或者多个共享资源的访问。如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0。当一个线程使用完这个信号量的时候,必须释放掉该信号量然后还给Semaphore

Semaphore最常见的就是可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SemaphoreTest {
private static final int THREAD_COUNT = 40;
private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10,true);

public static void main(String[] args){
for(int i=0; i<THREAD_COUNT; i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程" + Thread.currentThread().getName() +" 读取文件");
s.acquire();
System.out.println("线程" + Thread.currentThread().getName() +" 存储文件");
Thread.sleep(5000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
s.release();
}
}
});
}
threadPool.shutdown();
}
}
  • 上述代码创建了40个线程,但是只能允许有10个线程并发执行。

Semaphores在本质上是非可重入的 ,这意味着我们不能在同一个 线程 中第二次获得 Semaphore,试图这样做会导致死锁,假设你创建了一个 Semaphore(1),然后这样

1
2
3
Semaphore sem = new Semaphore(1);
sem.acquire(); // 线程A获取许可,permits=0
sem.acquire(); // 线程A再次acquire → 阻塞!因为没有可用许可了
  • 线程A自己把自己阻塞了!这说明 Semaphore 不具备可重入性,它不会因为是同一个线程就允许重复获取。
  • 而只要我在同一个线程里用 acquire/release,就是可重入这种想法是完全错误的,可重入的核心是“无需额外许可即可重复进入”。你看Semahpone做得到吗

因为Semaphore 没有线程所有权概念,它只维护一个许可计数器,任何线程都可以调用 release(),即使它从未调用过 acquire(),这是 Semaphore 的特性

可重入是允许一个线程使用lock() 等方法多次锁定一个特定的资源,锁内部会记录持有者线程和获取次数,只有释放次数等于获取次数时,锁才真正被释放。

对于如何正确使用Condition

  1. 必须在锁块中使用

    调用await()/signal()前必须先调用lock()加锁,否则会抛出IllegalMonitorStateException,和 Object 的wait()/notify()必须在 synchronized 块中同理。

  2. 读锁不能创建 Condition

    ReentrantReadWriteLock的读锁调用newCondition()会抛异常,只有写锁可以。

  3. 处理中断异常

    await()会抛出InterruptedException,需要捕获并处理(比如恢复中断状态Thread.currentThread().interrupt())。

  4. 避免死锁

    多个 Condition 交叉等待时,要保证锁的获取顺序一致;读写锁场景下,读转写必须先释放读锁。

源码分析

上面提到,Semaphore 是一种用于控制多个线程对共享资源访问数量的同步工具,那么,它的结构是这样的,很明显,他也不是两大锁接口下的锁类

image-20260211102905073
  • 它内部维护一个“许可”(permit)计数器。AQS ,用一个 int state 表示同步状态,而在 Semaphore 中,它就用 AQS 中的state 就表示当前可用的 permit 数量

    image-20260211102852965
    image-20260211102842633
  • 因此,线程在访问资源前必须先 获取(acquire) 一个或多个许可;使用完资源后必须 释放(release) 相应数量的许可;如果当前没有足够许可,acquire() 会阻塞超时等,直到有许可可用。

image-20260211102654543
image-20260211102700719

和上面的ReentrantLock等锁是一样的,Semaphore 的所有同步逻辑都委托给内部类 Sync,而 Sync 继承自 AbstractQueuedSynchronizer,也就是说,信号量基于AQS实现

而且 Sync也分公平和不公平的两种情况。根据其构造函数

image-20260211103237052

有一个挺好笑的事情,初始 permits 可以为负数!此时必须先 release() 才能让 acquire() 成功,这种情况可以存在,但是应该不太好用

  • 对于非公平情况,会调用nonfairTryAcquireShared(int acquires),直接尝试 CAS 修改 state,不检查等待队列,插队

    image-20260211103011737
  • 公平版本会通过调用FairSync.tryAcquireShared先检查是否有前驱节点

    image-20260211103132931

    只有队列为空或自己是队首时,才尝试获取许可

    即使设置了公平模式,tryAcquire() 方法 仍然允许插队!这是为了性能考虑。如需严格公平,应使用 tryAcquire(0, SECONDS)

ReentrantLock的独占模式不同,Semaphore使用 共享模式。多个线程可以同时持有许可,只要总数不超过上限。所以信号量内部获取和释放锁,它调用的是 AQS 的 acquireShared / releaseShared 系列方法

image-20260211103437155
image-20260211103447391

对于其内存可见性Java 文档明确指出:

Actions in a thread prior to calling release() happen-before actions following a successful acquire() in another thread.

这意味着:

  • 线程 A 在 release() 之前对共享变量的修改,
  • 对线程 B 在 acquire() 之后的操作 一定是可见的
  • 这是通过 AQS 内部的 volatile 读写和 CAS 保证的。