JUC锁机制
我们都知道,synchronized是 Java
关于锁的一个关键字,那么,它自动进行锁的释放的,所以只存在两种释放锁的情况:
- 获取了对应方法锁的对象执行完了该方法
- 线程执行异常或被中断,JVM会让线程自动释放锁
不仅如此,synchronized 的局限性还在于
- 无法中断等待锁的线程,不可响应中断。
- 无法设置超时,只能无限等待。
- 无法尝试获取锁而不阻塞,非阻塞尝试。
- 读写无法分离
它不够灵活,而且容易出现阻塞,因为方法或者代码块一旦执行,就独占该锁,直到结束才返回锁,此时就出现了JUC中的各种锁,它们灵活且强大
locks包概述
对于 Lock 接口本身,就有一些需要做铺垫的内容讲解,这里我们先看看 JUC 的锁机制主要包中都有什么内容,先了解一下
前面说过,Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作,所以说,它这个接口是代表着什么呢,首先看看 JUC lock 包下的内容
其中接口有这三个:
Lock就是锁的接口,定义了显式加锁和解锁的规范,是锁的顶层接口,所有具体锁都实现这个接口。Condition是Lock的配套接口,用于实现 条件等待 等协调线程用的监视器ReadWriteLock是读写锁接口,允许同时多个读操作,但只允许一个写操作。
对于抽象类和具体的类,也简单说一下:
AbstractOwnableSynchronizer,就是提供对“拥有者线程”信息的支持。维护一个Thread owner字段,表示当前持有锁的线程。
LockSupport不熟,看了一眼,大概就是用于实现线程阻塞和唤醒的工具,基于Unsafe实现的Object.wait()/notify()的轻量级替代品AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer,这俩就是 AQS,是整个 JUC 锁体系的核心框架,大部分锁都是基于 AQS 实现的,没有 Long 的那个就只不过是用于int类型的 state,两者几乎一样ReentrantLock这个和下面的三个锁都是重点要说的锁,它是可重入互斥锁,功能比synchronized强大。ReentrantReadWriteLock是读写分离的可重入锁,实现了ReadWriteLock接口StampedLock是对ReentrantReadWriteLock的性能提升版本,所以它也是读写分离锁,它支持乐观读,无需阻塞,提升吞吐
Lock 接口
synchronized 是 JVM 层面的隐式锁;Lock 是
API 层面的显式锁。
首先,Lock 接口存在这些方法
1 | public interface Lock { |
对于这些方法,其中,大部分看名字都能猜出来,JUC 中的锁大部分都实现了这个接口
void lock():获取锁。如果锁不可用,当前线程阻塞直到获得锁。- 不可中断,即使调用
thread.interrupt()也不会抛出异常 - 而且必须配合
try-finally使用,确保释放锁。如果忘记unlock(),会导致死锁
- 不可中断,即使调用
void lockInterruptibly() throws InterruptedException:可中断地获取锁,也就是说,除非当前线程被中断了,否则请求获取锁。- 如果线程在等待锁的过程中被中断了,会立即抛出
InterruptedException,并退出等待。在入口处若已中断,也会抛异常。 - 一般在长时间等待锁时,外部可以中断线程以避免资源浪费。
- 如果线程在等待锁的过程中被中断了,会立即抛出
boolean tryLock():立即尝试获取锁,不阻塞。- 避免死锁、实现非阻塞算法、高并发下的乐观策略。别忘了解锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException:加中断和超时的版本,在指定时间内尝试获取锁- 成功获取锁 → 返回
true - 等待期间被中断 → 抛
InterruptedException - 超时仍未获取 → 返回
false
- 成功获取锁 → 返回
void unlock():释放锁- 只有持有锁的线程才能调用
unlock(),否则抛异常,通常只在finally块中调用。 - 对于可重入锁,
lock()调用 n 次,就需要unlock()n 次才能真正释放。
- 只有持有锁的线程才能调用
Condition newCondition():创建一个与该Lock绑定的Condition对象,后面细说
对于 Lock 接口,官方有内存同步语义的指定
所有
Lock实现必须保证与synchronized相同的内存可见性语义
这意味着使用 Lock
也能保证线程间的数据可见性,lock()
成功等价于进入到 synchronized
块,也就是happens-before
获取锁的线程看到之前释放锁的线程的所有写操作,unlock()
成功等价于退出 synchronized
块,释放锁前的修改对下一个获取锁的线程可见
源码注释中还多次强调了这些事:
某些实现可以检测死锁并抛出异常
中断支持不是强制的,但若支持,必须遵守语义
不要用
Lock对象本身作为synchronized的监视器:1
synchronized(lock) { ... } // 真有人会这么写吗
参数为
null时应抛NullPointerException。慎防NPE
可重入锁:ReentrantLock
使用ReentrantLock
java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁
对于传统synchronized代码,我们大概需要这样写
1 | public class Counter { |
如果用ReentrantLock替代,可以把代码改造为
1 | public class Counter { |
因为synchronized是 Java 语言层面提供的语法,是 Java
对并发编程的一个支持,所以我们不需要考虑异常等其他情况,而ReentrantLock是
Java
代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。
ReentrantLock是可重入锁,默认是非公平锁,它和synchronized一样,一个线程可以多次获取同一个锁。但是和synchronized不同的是,ReentrantLock可以尝试获取锁,也就是说,ReentrantLock是对上面
Lock 接口中提到的tryLock方法有所实现
1 | if (lock.tryLock(1, TimeUnit.SECONDS)) { |
上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。
所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。
其中,ReentrantLock中的可中断锁在任务取消中的价值也很明显
1 | public void doWork() { |
这样就支持了用户取消操作这样的业务
分析ReentrantLock
结构分析
首先,ReentrantLock 实现了 Lock 接口,整体架构使用了Sync + AQS,至于这二位高手我下面会细说,这里只是简单的说一下了
ReentrantLock
本身不直接实现锁逻辑,而是将所有操作委托给内部的
Sync 对象。
而Sync 是一个抽象静态内部类,继承自
AbstractQueuedSynchronizer(AQS),至于
AQS 就是并发同步器框架,它是锁机制的具体实现
可以看到,Sync
中存在大量的模板方法,它们真正的实现了ReentrantLock
中锁机制的真正内容,ReentrantLock 通过继承 AQS
并通过各种各样的 Sync
实现其模板方法,将真正的锁机制逻辑进行委托,来定义自己的同步语义。
那么,它还通过state
表示锁的持有次数,来实现的可重入性
- 对于
ReentrantLock,state == 0表示锁未被持有;state > 0表示锁已被持有,数值 = 当前线程的重入次数;同时,AQS 还维护一个exclusiveOwnerThread字段,记录当前持有锁的线程。这正是可重入性的实现基础
对于如何上锁,就例如我们看看 sync.tryLock()
方法,它是非公平尝试获取锁,可以看到,就算强如 JUC,也离不开 CAS
1 |
|
锁的获取
然后来看看锁的获取流程,以 lock() 为例
零帧起手进行委托
进入 Sync.lock()
它会调用对应公平和非公平的策略,非公平直接抢,公平先看队列
失败则进入 AQS 的 acquire 流程,自旋喽喽喽
acquire(1)是 AQS 的模板方法,会调用子类实现的tryAcquire
如果
tryAcquire返回false,AQS 会将当前线程封装为Node加入CLH 等待队列,这也就是为毛ReentrantLock的源码中一堆 Node,它会阻塞当前线程,等待被前面的节点唤醒
那么来看看带超时的重试锁,tryLock(timeout, unit)
的实现
零帧起手内部调用内部sync的tryLockNanos(long nanos),可以看到这是支持中断的,而且,先尝试,失败了进入
AQS 处理
锁的释放
对于锁的释放,还是零帧起手调用sync.release(1);,但是,它调用的是
AQS 的 release(1)
进而调用 Sync.tryRelease(1):
如果 free == true,AQS
会唤醒等待队列中的下一个线程,否则自旋
对 Condition 的支持
还是委托给对应的sync,然后再返回 AQS 内部的
ConditionObject,支持一些Condition的操作,例如等待和唤醒,这个下面细说
公平 vs 非公平锁
对于ReentrantLock构造函数默认是非公平锁,当然,你可以指定,内置了两种
Sync 的实现,也就是说,用了组件化的设计模式
非公平锁是默认的,它更高性能,新来的线程可以直接插队抢锁,即使等待队列中有其他线程,也就是抢占型的,所以它吞吐量高,但是可能引起饥饿
看
NonfairSync.initialTryLock(),非公平锁初始化尝试锁的方法
第一步就是用 CAS 无条件尝试抢锁,不管有没有人在排队!
公平锁也就是
ReentrantLock中FairSync的实现,它是公平锁,只有当等待队列为空 或 当前线程是队首 时,才允许获取锁。保证线程按请求顺序获得锁,避免饥饿,但是性能差了点看
FairSync.initialTryLock()
但是
tryLock()不遵守公平策略!即使你创建的是公平锁,tryLock()也会直接尝试获取,或许是为了灵活性吧,谁知道呢
ReadWriteLock 接口
在传统互斥锁,如 synchronized 或
ReentrantLock中:
- 无论读还是写,都只能一个线程访问。
- 但读操作是安全的:多个线程同时读同一份数据很多情况下不会引发竞态条件(当然前提是无写入)
ReadWriteLock 接口就是读写锁的思想,它把读写两个操作的锁进行分离,然后可以自己去分配,这特别适用于 读多写少 的场景
- 允许多个读线程并发执行,那么写操作独占来保证数据一致性就够了
- 读与写互斥,写与写互斥
接口定义极其简洁但语义丰富
1 | public interface ReadWriteLock { |
| 锁类型 | 并发性 | 互斥关系 |
|---|---|---|
| 读锁(readLock) | 多个线程可同时持有 | 与写锁互斥 |
| 写锁(writeLock) | 仅一个线程可持有 | 与读锁和其他写锁互斥 |
对于内存可见性保证,读写锁一样遵循
成功获取读锁的线程,一定能看到之前所有写锁释放前的修改
这意味着写锁的 unlock()操作具有 happens-before
的性质,关系到后续读锁的 lock() 操作;而且读写锁天然提供了
volatile 语义,无需额外使用 volatile
字段。
1 | // 线程 A 写 |
ReadWriteLock
接口本身不规定行为细节,留给具体实现决定。
而且一般情况下,这里会涉及到锁升级降级的问题,持有写锁的线程可获取读锁,这是锁降级,但是持有读锁的线程不能获取写锁,这是升级,因为升级很容易有死锁
而且读写分离锁的公平性策略也更复杂一些,这个公平性和上面提到的ReentrantLock的不太一样,这里是当写锁释放时,如果有等待的读者和写者,优先唤醒谁?而对于获取锁的公平情况,一般来说,可以切换
对于可重入性,具体实现有区别,下面细说
以 ReentrantReadWriteLock
为例,先来看看如何使用读写锁,因为它跟上面的和之前的synchronized不太一样
1 | public class Cache<K, V> { |
读写分离的可重入锁:ReentrantReadWriteLock
如何使用
ReentrantReadWriteLock 实现了 ReadWriteLock
接口,内部维护两把锁:
- 读锁(ReadLock):共享锁(Shared Lock),允许多个线程同时持有。
- 写锁(WriteLock):独占锁(Exclusive Lock),同一时间只能被一个线程持有。
读操作不修改数据,因此可以并发;写操作会改变数据,必须互斥。
所以可以分成以下这四种情况的特性:
- 读读不互斥:多个线程可以同时获取读锁,进行并发读取。
- 读写互斥:有线程持有读锁时,其他线程无法获取写锁。有线程持有写锁时,其他线程无法获取读锁或写锁。
- 写写互斥:任何时候只有一个线程能持有写锁。
- 可重入:
- 不支持锁升级:持有读锁的线程不能直接获取写锁
来看如何使用它
1 | public class CacheExample { |
然后,持有写锁获取读锁是可以的
1 | writeLock.lock(); |
源码分析
结构分析
来看看它的源码,别忘了,它是在保证线程安全的前提下,最大化读操作的并发度。所以肯定倾向于读
ReadLock和WriteLock是两个独立的Lock实现,但共享同一个Sync同步器。和ReentrantLock一样,所有复杂的并发控制逻辑都封装在Sync及其子类中
其中,构造函数可指定是否为公平锁:
1 | new ReentrantReadWriteLock(); // 非公平(默认) |
对于它的公平和非公平含义,有一些区别
- 非公平:写线程总是可以插队。读线程只有当队列头是写线程时才阻塞
- 公平:无论读写,要同步队列中有等待者(
hasQueuedPredecessors()),就必须排队。
来看它的核心的数据结构。AQS 状态编码。无数人认为这是
ReentrantReadWriteLock 最精妙的设计之一。
实际上,它就是利用一个 int(32位)表示两种锁的状态
- 写锁:用
state低 16 位表示,值为 0 表示无写锁,>0 表示写锁重入次数; - 读锁:用
state高 16 位表示,值为 0 表示无读锁,>0 表示读锁总持有数(多个线程的读锁总和)。
这样,他就不要维护读写两个变量了,能直接通过 CAS 原子地更新整个锁状态,巧妙,所以说,它的最大读/写锁重入次数均为 65535。(真的有这么多的时候吗)
读锁的获取与释放
读锁是共享锁,同一时间可被多个线程持有,支持重入,且写锁持有者可以获取读锁
读锁的获取,核心方法是tryAcquireShared +
fullTryAcquireShared,线程调用
readLock().lock() 时,最终调用 Sync 类的
tryAcquireShared 方法,返回 1 成功,-1 失败,逻辑如下:
1 | protected final int tryAcquireShared(int unused) { |
- 写锁被其他线程持有 → 读锁获取失败;
- 写锁被当前线程持有 → 允许获取读锁
- 通过
ThreadLocal<HoldCounter>记录每个线程的读锁重入数
对于fullTryAcquireShared当 tryAcquireShared
失败时,会进入该方法,通过自旋重试 CAS,同时处理重入逻辑,核心逻辑与
tryAcquireShared 一致,只是增加了循环重试
读锁的释放,核心:tryReleaseShared 方法,线程调用
readLock().unlock() 时,最终调用 Sync 类的
tryReleaseShared 方法,逻辑如下:
1 | protected final boolean tryReleaseShared(int unused) { |
- 先通过
ThreadLocal/HoldCounter减少当前线程的读锁重入数,再通过 CAS 循环更新 state,只有当 state 整个都变为 0 时,才返回 true,表示读写锁都释放
读锁重入计数的管理
对于读锁,由于是共享的,所以需要为每个线程单独记录其持有的读锁数量。
这个相关的重要字段在 Sync 中
对于HoldCounter 结构如下
那么,为什么要这样设计?首先,firstReader保证当有且只有一个线程读时,无需访问
ThreadLocal,性能极高,然后,使用cachedHoldCounter在多线程交替读的场景下,大概率命中缓存,避免频繁
ThreadLocal.get()。readHolds作为兜底方案,存储所有非首个和非最近的线程的计数,前面都找不到的时候来这里找
写锁的获取与释放
写锁是独占锁(排他锁),同一时间只能有一个线程持有,支持重入。所以它的实现和上面的可重入锁区别不大
对于写锁的获取,核心是tryAcquire 方法,线程调用
writeLock().lock() 时,最终会调用 Sync 类的
tryAcquire 方法
1 | protected final boolean tryAcquire(int acquires) { |
- 有读锁存在时,写锁获取失败,而或者写锁被其他线程持有,获取失败;
写锁的释放,核心在tryRelease 方法,线程调用
writeLock().unlock() 时,最终调用 Sync 类的
tryRelease 方法,逻辑如下
1 | protected final boolean tryRelease(int releases) { |
- 因为写锁支持重入,每次 unlock 只会将 state 减 1,直到低 16 位为 0 才真正释放;然后写锁释放完成后会清空写锁持有者,允许其他线程竞争锁。
邮戳锁:StampedLock
使用StampedLock
前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。
如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要是想进一步提升在读多写少下的并发执行效率,Java 8
引入了新的读写锁:StampedLock。它对写锁独占,对于悲观读锁是共享的,这和ReentrantReadWriteLock是一样的,但是StampedLock多了个乐观读锁,是基于无锁读加校验实现的
StampedLock和ReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
它乐观在估计读的过程中大概率不会有写入,所以说需要再读一次的这种检测手段
来看使用例子,我们模拟一个
坐标点(Point)对象,写操作就是移动点的位置(move()),读操作就是计算该点到原点的距离(distanceFromOrigin())
这是一个典型的 “读多写少” 场景,非常适合
StampedLock。
1 | import java.util.concurrent.locks.StampedLock; |
和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。
注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。
写线程获取写锁后,其他所有读/写线程都会被阻塞,这是肯定的,然后读线程大多数时候直接通过 乐观读 快速返回结果,极其偶尔的情况下会出现降级为悲观读锁的情况。所以说,在没有写操作的间隙中,读线程几乎无开销地并发执行,只有在真正发生冲突时,才退化为传统读锁,保证正确性。
StampedLock把读锁细分为乐观读和悲观读,这样是能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。
嗯,别忘了
- 必须用
stamp解锁:unlockWrite(stamp)/unlockRead(stamp) - 乐观读后必须
validate:if (!lock.validate(stamp)) { ... } - 不能重入:同一线程不要重复获取写锁
StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能
源码分析
结构分析
它的核心思想其实比较简单,就是用邮戳Stamp作为版本号,来实现读的乐观锁,代替传统的锁持有
三种模式:
- Writing (写):独占模式,类似
ReentrantReadWriteLock的写锁。 - Reading (悲观读):共享模式,类似
ReentrantReadWriteLock的读锁。 - Optimistic Reading (乐观读):无锁读!通过版本校验来判断读取期间是否有写入发生。
首先,它是一个比较独立的锁,它的实现既不依赖上面说的哪些接口,也不依赖 AQS
那么大伙问题可能就来了,那么它是如何实现的锁核心机制包括 CAS 的,别急,随便看一个可能会涉及到 CAS 的方法,例如悲观读的解锁,他需要内部通过 CAS 来减少读者计数
红框的语句的这个地方调用了实际的 CAS 的逻辑来进行原子更新
?有思路,好熟悉,你是不能忘记的方法)))))))
大彻大悟,原来你内部是使用的
Unsafe,而且StampedLock
对内存可见性的控制极为严谨,大量使用了 Unsafe
的内存屏障指令。
U.storeStoreFence():- 在
writeLock成功后调用。 - 确保临界区内的写操作不会被重排到获取写锁之前。
- 在
U.loadFence():- 在
validate()开始时调用。 - 确保临界区内的读操作不会被重排到
validate()之后。
- 在
这些屏障是 StampedLock 能提供正确
happens-before 语义的关键。
而对于当快速路径(Fast Path)失败时,StampedLock
也会进入基于 CLH 队列的等待逻辑。
- 多个连续的读者请求会被合并成一个节点加入队列。
- 当这个“读者组”的头节点被唤醒时,它会一次性唤醒所有组内成员。
- 目的:减少上下文切换,提高吞吐量,实现批量唤醒。吞吐量优先,非绝对公平
对于其核心的数据结构,如下
1 | private static final int LG_READERS = 7; // 2^7 - 1 = 127, 最大读者数 |
上面的内容不太重要的,但是这些内容构成了StampedLock的
state的核心数据结构
1 | | 63 ... 8 | 7 | 6 ... 0 | |
- 最低 7 位
(
[6:0]):当前读者数量(Reader Count),最大为 127 (RFULL)。 - 第 7 位
(
[7]):写锁标志(Write Bit)。如果为 1,表示当前有写锁。 - 高位
(
[63:8]):版本序列号(Sequence Number)。每次写锁的获取和释放都会递增它。
对于初始状态
乐观读和悲观读
首先,来讲乐观读,核心方法是tryOptimisticRead() &
validate()
- 首先,
tryOptimisticRead()返回一个只包含序列号的stamp。对于无锁读取,线程可以自由地读取共享变量这个邮戳stamp,乐观校验就调用validate(stamp)。- 如果成功,说明从
tryOptimisticRead到validate之间没有任何写操作发生,读取的数据是一致的快照。 - 如果失败,说明期间有写操作,数据可能不一致,通常降级为悲观读。
- 如果成功,说明从
乐观读期间不能有任何同步操作(如
synchronized,
Lock),否则会破坏其无锁语义。
对于悲观读,核心方法是readLock() &
unlockRead()
1 | public long readLock() { |
- 行为与
ReentrantReadWriteLock的读锁不能说是非常相似,只能说是一模一样 - 只不过,它使用
stamp作为凭证,而不是隐式的线程所有权,内部使用 CAS
那么,发生了什么样的情况,或者我们调用了什么方法,才会让乐观读锁降级变成悲观读锁
从源码和实际使用来看,有两类核心场景会触发这种切换,本质都是 “乐观读的有效性被破坏”:
乐观读的
validate()验证失败这是最常见的触发条件,对应源码中
validate()方法返回false,此时必须切换为悲观读锁才能保证数据一致性。调用
tryOptimisticRead()获取乐观戳记时,如果此时有线程持有写锁(state & WBIT != 0)或者 state 版本号对不上,直接返回0,戳记无效,必须切换为悲观读1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26double distanceFromOrigin() {
// 1. 尝试获取乐观读戳记
long stamp = sl.tryOptimisticRead();
try {
retryHoldingLock: for (;; stamp = sl.readLock()) {
// 乐观读戳记无效(tryOptimisticRead返回0),直接走悲观读
if (stamp == 0L)
continue retryHoldingLock;
// 2. 无锁读取数据(乐观读的核心:无阻塞)
double currentX = x;
double currentY = y;
// 3. 验证乐观读戳记:失败则重新循环,调用readLock()获取悲观读锁
if (!sl.validate(stamp))
continue retryHoldingLock;
// 验证成功,返回计算结果
return Math.hypot(currentX, currentY);
}
} finally {
// 4. 若最终用了悲观读锁,释放锁
if (StampedLock.isReadLockStamp(stamp))
sl.unlockRead(stamp);
}
}主动转换
源码中提供了
tryConvertToReadLock(long stamp)方法,允许主动将 “乐观读戳记” 转换为 “悲观读戳记”,本质也是从乐观读切换为悲观读:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public long tryConvertToReadLock(long stamp) {
long a, s, nextState;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((a = stamp & ABITS) >= WBIT) {
// 写锁戳记:释放写锁,获取读锁
if (s != stamp) break;
nextState = state = unlockWriteState(s) + RUNIT;
signalNext(head);
return nextState;
} else if (a == 0L) { // 乐观读戳记(核心分支)
// 读锁计数未溢出,尝试CAS获取悲观读锁
if ((s & ABITS) < RFULL) {
if (casState(s, nextState = s + RUNIT))
return nextState; // 转换成功,返回悲观读戳记
} else if ((nextState = tryIncReaderOverflow(s)) != 0L)
return nextState;
} else {
// 已持有悲观读锁,直接返回戳记
if ((s & ABITS) == 0L) break;
return stamp;
}
}
return 0L; // 转换失败
}- 当我们调用
tryConvertToReadLock(stamp)且传入的是乐观读戳记 - 此时如果锁状态允许,无写锁、读锁计数未溢出,会通过
casState原子更新state,返回悲观读戳记,完成从乐观读到悲观读的切换 - 如果转换失败(比如此时有写锁),返回
0,此时我们会主动调用readLock()获取悲观读锁。
- 当我们调用
写锁的操作
对于写,区别更不大了,方法是writeLock() &
unlockWrite()
1 | public long writeLock() { |
获取写锁时,会将
WBIT位置 1,意思是写锁被获取,并阻塞所有读和写。释放写锁时,会递增序列号(
unlockWriteState方法),这正是乐观读校验的关键。
Condition 接口
接口分析
在 Java 并发编程的早期,线程协作依赖synchronized +
Object.wait/notify,但是一个对象只有一个
wait-set,那么它就无法区分不同条件,不能中断等待、不能灵活设置超时,还必须配合
synchronized 使用,气鼓包了
Condition将Object的 monitor 方法(wait/notify)拆出来,变成独立对象,并与任意Lock实现绑定。
核心思想:
- 一把
Lock可以关联 多个Condition,每个代表一种等待条件。 - 每个
Condition拥有独立的等待队列(wait-set)。 - 支持更丰富的等待/唤醒语义,中断、超时、绝对时间等
Condition 接口的源代码如下,定义了这些方法,其中能做这些事情
1 | public interface Condition { |
无论是 await() 还是
signal(),调用线程必须已经持有与该
Condition 关联的 Lock。否则会抛出
IllegalMonitorStateException
1 | lock.lock(); |
那么,它是如何原子性的释放锁然后挂起线程实现等待的,当你调用
condition.await() 时,发生以下原子操作
- 自动释放与该
Condition关联的Lock; - 将当前线程加入该 Condition 的等待队列;
- 挂起线程(进入 waiting 状态)。
这保证了,不会因先释放锁再挂起而导致错过 signal
而当线程被唤醒(signal)或超时后,必须重新获取锁才能从
await() 返回。所以 await()
返回时,一定持有锁!
还有一点,文档明确指出:
“spurious wakeup is permitted… always wait in a loop”
即使没人调用
signal(),线程也可能被虚假唤醒(spurious
wakeup)
所以永远不要用 if检查条件,要用
while
1 | // ❌ 错误 |
使用Condition
上面说了,使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。
但是,synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?
那么,可以祭出Condition接口了,可以使用Condition对象来实现wait和notify的功能。
以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现
1 | class TaskQueue { |
getTask()内部先判断队列是否为空,如果为空,线程就进行等待,直到另一个线程往队列中放入了一个任务唤醒了它,然后while()循环处理,就可以返回队列的元素了。- 线程1可以调用
addTask()不断往队列中添加任务,放入一个任务,就唤醒一下getTask()处理的线程让他从等待中苏醒过来处理任务
可以发现,使用起来完全和notifyAll(),wait()没啥差别,也满足了多线程协调运行的原则,当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。
可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:
await()会释放当前锁,进入等待状态;signal()会唤醒某个等待线程;signalAll()会唤醒所有等待线程;- 唤醒线程从
await()返回后需要重新获得锁。
此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来
1 | if (condition.await(1, TimeUnit.SECOND)) { |
不仅如此,synchronized 只有一个隐式条件队列(通过
wait/notify),而 ReentrantLock
等锁很多都支持多个独立的
Condition,操作极为灵活,针对并发的操作可以跟线程隔开
1 | private final Lock lock = new ReentrantLock(); |
这是生产者-消费者模型的多线程并发标准写法
ReentrantReadWriteLock是读写分离锁,读共享、写独占,它的writeLock()支持创建Condition,而读锁不支持,因为读锁是共享锁,无法保证条件等待的原子性。那么,读写分离缓存也可以使用类似的思路实现
1 | import java.util.HashMap; |
信号量:Semaphore
使用信号量
前面我们讲了各种锁的实现,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)。
多线程开发的过程中还有一种类型的受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。
这种限制数量的锁,如果用Lock数组来实现,就太麻烦了,针对于这种情况,我们可以使用信号量机制,对应 Java 里的实现就是 Semaphore
这种情况就可以使用Semaphore,它是一种计数器表示的信号量,用来保护一个或者多个共享资源的访问。如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0。当一个线程使用完这个信号量的时候,必须释放掉该信号量然后还给Semaphore。
Semaphore最常见的就是可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。
1 | public class SemaphoreTest { |
- 上述代码创建了40个线程,但是只能允许有10个线程并发执行。
Semaphores在本质上是非可重入的
,这意味着我们不能在同一个 线程 中第二次获得
Semaphore,试图这样做会导致死锁,假设你创建了一个
Semaphore(1),然后这样
1 | Semaphore sem = new Semaphore(1); |
- 线程A自己把自己阻塞了!这说明
Semaphore不具备可重入性,它不会因为是同一个线程就允许重复获取。 - 而只要我在同一个线程里用
acquire/release,就是可重入这种想法是完全错误的,可重入的核心是“无需额外许可即可重复进入”。你看Semahpone做得到吗
因为Semaphore
没有线程所有权概念,它只维护一个许可计数器,任何线程都可以调用
release(),即使它从未调用过 acquire(),这是
Semaphore 的特性
可重入是允许一个线程使用lock()
等方法多次锁定一个特定的资源,锁内部会记录持有者线程和获取次数,只有释放次数等于获取次数时,锁才真正被释放。
对于如何正确使用Condition
必须在锁块中使用:
调用
await()/signal()前必须先调用lock()加锁,否则会抛出IllegalMonitorStateException,和 Object 的wait()/notify()必须在synchronized块中同理。读锁不能创建 Condition:
ReentrantReadWriteLock的读锁调用newCondition()会抛异常,只有写锁可以。处理中断异常:
await()会抛出InterruptedException,需要捕获并处理(比如恢复中断状态Thread.currentThread().interrupt())。避免死锁:
多个 Condition 交叉等待时,要保证锁的获取顺序一致;读写锁场景下,读转写必须先释放读锁。
源码分析
上面提到,Semaphore 是一种用于控制多个线程对共享资源访问数量的同步工具,那么,它的结构是这样的,很明显,他也不是两大锁接口下的锁类
它内部维护一个“许可”(permit)计数器。AQS ,用一个
int state表示同步状态,而在Semaphore中,它就用 AQS 中的state就表示当前可用的 permit 数量
因此,线程在访问资源前必须先 获取(acquire) 一个或多个许可;使用完资源后必须 释放(release) 相应数量的许可;如果当前没有足够许可,
acquire()会阻塞超时等,直到有许可可用。
和上面的ReentrantLock等锁是一样的,Semaphore
的所有同步逻辑都委托给内部类 Sync,而 Sync
继承自
AbstractQueuedSynchronizer,也就是说,信号量基于AQS实现
而且 Sync也分公平和不公平的两种情况。根据其构造函数
有一个挺好笑的事情,初始 permits 可以为负数!此时必须先
release()才能让acquire()成功,这种情况可以存在,但是应该不太好用
对于非公平情况,会调用
nonfairTryAcquireShared(int acquires),直接尝试 CAS 修改 state,不检查等待队列,插队
公平版本会通过调用
FairSync.tryAcquireShared先检查是否有前驱节点
只有队列为空或自己是队首时,才尝试获取许可
即使设置了公平模式,
tryAcquire()方法 仍然允许插队!这是为了性能考虑。如需严格公平,应使用tryAcquire(0, SECONDS)。
与
ReentrantLock的独占模式不同,Semaphore使用
共享模式。多个线程可以同时持有许可,只要总数不超过上限。所以信号量内部获取和释放锁,它调用的是
AQS 的 acquireShared / releaseShared
系列方法
对于其内存可见性Java 文档明确指出:
Actions in a thread prior to calling
release()happen-before actions following a successfulacquire()in another thread.
这意味着:
- 线程 A 在
release()之前对共享变量的修改, - 对线程 B 在
acquire()之后的操作 一定是可见的。 - 这是通过 AQS 内部的 volatile 读写和 CAS 保证的。





