了解 AQS
什么是 AQS
我倒是觉得我应该把这个放在前面
AQS 的全称为 AbstractQueuedSynchronizer
,翻译过来的意思就是抽象队列同步器。这个类在
java.util.concurrent.locks 包下面。
AQS 就是一个抽象类,主要用来构建锁和同步器。为他们提供一个底层的、通用的同步机制,它帮你处理了复杂的线程排队、阻塞、唤醒等底层细节,你只需要按需实现一些关键的方法,就能定制出自己的同步工具。
AQS 是 JUC 锁的“大脑”,它管理着线程的排队、阻塞和唤醒。它的核心思想我先放到这里
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁 实现的,即将暂时获取不到锁的线程加入到队列中。
AQS 为构建锁和同步器提供了一些通用功能的实现。因此,使用 AQS
能简单且高效地构造出应用广泛的大量的锁,比如我们前面提到的
ReentrantLock,Semaphore这种,其他的诸如
ReentrantReadWriteLock读写分离锁,SynchronousQueue等等皆是基于
AQS 的
在真正讲解 AQS 源码之前,需要对 AQS 有一个整体层面的认识。也就是从整体层面上认识 AQS,了解 AQS 在整个 Java 并发中所位于的层面
AQS的作用是什么
正如上述所说,AQS 解决了开发者在实现同步器时的复杂性问题。
它提供了一个通用框架,用于实现各种同步器,它通过对底层的线程同步机制管理的各种方法的封装,为 Java 中的同步器提供了一个底层的、通用的同步机制,AQS 将复杂的线程管理逻辑隐藏起来,使开发者只需专注于具体的同步逻辑。
简单来说,AQS 是一个抽象类,为同步器提供了通用的 执行框架。它定义了 资源获取和释放的通用流程,而具体的资源获取和释放的详细逻辑则由具体同步器通过重写模板方法来实现。 因此,可以将 AQS 看作是同步器的 基础“底座”,而同步器则是基于 AQS 实现的 具体“应用”。
什么是 CLH 锁队列
CLH = Craig, Landin, and Hagersten
这三位学者在 1993 年提出了一种公平、可扩展的自旋锁算法,称为 CLH Lock。
原始的 CLH 是一种基于链表的自旋锁,它是一种基于 自旋锁 的优化实现,用于多处理器系统,保证 FIFO 公平性,避免线程饥饿。 但注意,Java AQS 中的“CLH 队列”并不是原始 CLH 锁,而是其变体(variant)!
自旋锁通过线程不断对一个原子变量执行 CAS
操作来尝试获取锁,直到该方法返回成功时即为成功获取锁。在高并发场景下,多个线程会同时竞争同一个原子变量,容易造成某个线程的
CAS 操作长时间失败导致饥饿,而且很明显的性能不好。
关于性能不好,这是因为自旋锁锁状态中心化,在竞争激烈的情况下,锁状态变更会导致多个 CPU 的高速缓存的频繁同步,从而拖慢 CPU 效率
因此自旋锁适用于锁竞争不激烈、锁持有时间短的场景。
那么 CLH 锁是对自旋锁的一个改进,它把这些线程组织成了一个队列(单链表实现),在多线程场景下,会将请求获取锁的线程通过单链表组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁,它是这样对上面描述自旋锁进行了改进的:
- 每个线程会作为一个节点加入到队列中,并通过自旋监控前一个线程节点的状态,而不是直接竞争共享变量。
- 线程按顺序排队,确保公平性,先请求的线程先获得锁,从而避免了 “饥饿” 问题。
CLH 锁 的队列结构如下图所示。
CLH 锁数据结构很简单,类似一个链表队列,所有请求获取锁的线程会排列在链表队列中,自旋访问队列中前一个节点的状态。当一个节点释放锁时,只有它的后一个节点才可以得到锁。CLH 锁本身有一个队尾指针 Tail,它是一个原子变量,指向队列最末端的 CLH 节点。每一个 CLH 节点有两个属性:所代表的线程和标识是否持有锁的状态变量。当一个线程要获取锁时,它会对 Tail 进行一个 getAndSet 的原子操作。该操作会返回 Tail 当前指向的节点,也就是当前队尾节点,然后使 Tail 指向这个线程对应的 CLH 节点,成为新的队尾节点。入队成功后,该线程会轮询上一个队尾节点的状态变量,当上一个节点释放锁后,它将得到这个锁。
很明显,CLH 队列的提出了一种公平性的锁实现,严格 FIFO,先到先得,而我们上篇内容提到的不同锁的公平锁都是直接依赖此队列顺序的
AQS 为什么使用 CLH 锁队列的变体?
普通的 CLH 它也有两个缺点:第一是因为有自旋操作,当锁持有时间长时会带来较大的 CPU 开销。第二是基本的 CLH 锁功能单一,不改造不能支持复杂的功能。
AQS(AbstractQueuedSynchronizer)在 CLH 锁的基础上进一步优化,形成了其内部的 CLH 队列变体。但是核心思路没变,还是会暂时获取不到锁的线程将被加入到该队列中
主要改进点有以下三方面
自旋 + 阻塞: CLH 锁使用纯自旋方式等待锁的释放,但大量的自旋操作会占用过多的 CPU 资源。AQS 引入了 自旋 + 阻塞 的混合机制:
- 如果线程获取锁失败,会先短暂自旋尝试获取锁;
- 如果仍然失败,则线程会进入阻塞状态,等待被唤醒,从而减少 CPU 的浪费。
单向队列改为双向队列:CLH 锁使用单向队列,节点只知道前驱节点的状态,而当某个节点释放锁时,需要通过队列唤醒后续节点。AQS 将队列改为 双向队列,新增了
next指针,使得节点不仅知道前驱节点,也可以直接唤醒后继节点,从而简化了队列操作,提高了唤醒效率,而且双向结构允许在 O(1) 时间内断开被取消的节点,这样,就实现了显式的维护前驱节点和后继节点扩展每个节点的状态:AQS 每个节点的状态如下所示,在源码中如下所示
节点中的状态变量为什么用 volatile 修饰?
这个状态变量只会被持有该状态变量的线程写入,只会被队列中该线程的后驱节点对应的线程读,而且后者会轮询读取。因此,可见性问题不会影响锁的正确性,那么,还为什么要写 volatile
实际上,要实现一个可以在多线程程序中正确执行的锁,别忘了还需要解决重排序问题。
而自定义互斥锁就需要自己保证这一规则的成立,因此上述代码通过 volatile 的 Happens-Before(先行发生)规则来解决重排序问题。JMM 的 Happens-Before(先行发生)规则有一条针对 volatile 关键字的规则:“volatile 变量的写操作发生在该变量的后续读之前”。
辅助GC:JVM 的垃圾回收机制使开发者无需手动释放对象。但在 AQS 中需要在释放锁时显式的设置为 null,避免引用的残留,辅助垃圾回收。
AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点(Node)来实现锁的分配。在 CLH 变体队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)
最后对比一下两者,原始的 CLH
锁是忙等待,它的等待方式是自旋,队列结构是单向链表,逻辑上是队列的形式,而且节点状态仅仅就是前面节点是否持有锁,而
JDK 针对 AQS 中的 CLH
队列,它的目的是实现阻塞式同步器,所以说,它的等待方式是阻塞,数据结构也是双向链表,节点状态是一种表示state的int,因此可以表示比较复杂的内容
二编:AQS 的 CLH 队列 = CLH 思想 + 双向链表 + 阻塞 + 状态机
具体体现简单看一下,是这样的,AQS 内部维护一个 FIFO 双向链表,用于存放获取同步状态失败的线程。
emmm,和我们学习的链表一样,
head节点是一个哑节点,不关联任何线程,仅作为队列起点。真正等待的线程从head.next开始
为什么 AQS 的性能较好?
因为 AQS 内部大量使用了 CAS 操作。
AQS 内部通过 CLH 队列来存储等待的线程节点。由于队列是共享资源,在多线程场景下,需要保证队列的同步访问。
AQS 内部通过 CAS
操作来控制队列的同步访问,CAS 操作主要用于控制
队列初始化 、 线程节点入队
这两个操作的并发安全。虽然利用 CAS
控制并发安全可以保证比较好的性能,但同时会带来比较高的
编码复杂度
在 AQS 中 CAS 是无锁竞争,先自旋再阻塞,避免一上来就挂线程,这是 AQS 性能碾压传统管程的关键
- 对于传统锁,竞争失败就会被阻塞,它设涉及到上下文切换
- 对于 AQS 实现的锁机制,它先尝试用 CAS 抢锁,抢不到就自旋一小段时间继续抢,如果自旋还是抢不到,才进入CLH队列阻塞
那么,这样的结果是,很多多线程的线程竞争中,一个线程在获取之后根本不会持有共享资源太久,传统情况下大量短时间的竞争完全可以被避免,自旋就节省了大量的短持有时间的竞争中,对于线程需要阻塞的开销
大量短时间竞争的场景,线程根本不会进入阻塞,直接在用户态搞定。吞吐量直接起飞。
而对于队列,上面也说了,AQS 没有用真实的阻塞队列,用的是 CLH,入队只用一次 CAS,出队正常操作指针,全程无加锁、无扩容、无迭代、不抽象
而且对于唤醒,AQS
只会唤醒后继节点,不全部唤醒,在notifyAll()
会唤醒所有的等待线程,大伙都醒了然后又开始强锁,无数线程抢不到又要滚回去睡觉阻塞,而
AQS 只唤醒 队列里的下一个节点,每次只唤醒一个线程,而且
AQS
的阻塞唤醒方法LockSupport.park()/LockSupport.unpark()也轻量级了太多,他们不需要锁块就能调用
AQS 中为什么 Node 节点需要不同的状态
如果没有 status,AQS 将面临以下问题:
- 无法知道后继是否需要被唤醒,那按这样每次释放锁都要遍历整个队列
- 无法处理线程中断或者超时,被中断的线程会永远阻塞,死锁了就,而且队列中很容易存在僵尸节点
- 共享模式下唤醒型号无法传递,多个许可无法连续分配
waitStatus 的本质是把原来 CLH 的单布尔变量变成一个
int,使用这个来完整的表达一个入队节点被阻塞的线程的生命周期和状态意图
所以说,AQS 中的 waitStatus 状态类似于
状态机 ,通过不同状态来表明 Node
节点的不同含义,并且根据不同操作,来控制状态之间的流转。
Node.status 的几种状态如下:
WAITING:值为 1,标记线程需要被唤醒(park 状态),也就是节点等待被唤醒的状态,需要前驱节点释放锁后调用方法唤醒后面的CANCELLED:值为Integer.MIN_VALUE,值为负数,标记节点被取消,大部分情况是给中断和超时用的,所以说,源码中是通过status < 0快速判断节点是否被取消的。它的含义是取消获取锁,这种状态的节点是异常的,无法被唤醒,也无法唤醒后继节点COND:值为2,标记节点处于 Condition 条件队列中,而非 AQS 主等待队列,也就是说,线程正在等待 Condition 的signal()/signalAll()唤醒。
注意:早期 JDK 版本的 AQS Node 还有
SIGNAL(对应这里的WAITING)、PROPAGATE等状态,我这里是 JDK21
Node 的状态不是单一值,而是位掩码(bitmask) 设计,多个状态可以通过按位与 / 或组合,所以才能用一个
int字段同时表示多种状态
AQS 核心思想
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁 进一步优化实现的,即将暂时获取不到锁的线程加入到队列中。
这就是 AQS 的核心思想,我觉得应该多看几次 CLH 锁那部分,所以涉及到 CLH 的我就不说了,因为本来这部分 CLH 的移到上面去了
AQS(AbstractQueuedSynchronizer)的核心原理图如下
AQS 使用 int 成员变量 state
表示同步状态,通过内置的 FIFO
线程等待/等待队列 来完成获取资源线程的排队工作。
其中,int的state表示如下,它和status完全完全是两个东西,千万不要混淆
state是表示同步状态的,比如锁是否被持有啊、重入次数啊等,属于AbstractQueuedSynchronizer类本身status是表示CLH队列中入队线程节点的状态,属于AbstractQueuedSynchronizer.Node内部类
state变量由volatile修饰,用于展示当前临界资源的获取情况。
另外,状态信息 state 可以通过 protected
类型的getState()、setState()和compareAndSetState()
进行操作state。并且,这几个方法都是 final
修饰的,在子类中无法被重写,保证底层操作的一致性,我相信大伙读名就知道这些是干什么的
state是 AQS 最核心的成员变量,它的设计直接决定了 AQS
能实现独占锁、共享锁、计数类同步器(分别如ReentrantLock、Semaphore、CountDownLatch)。
那么,AQS 把所有同步逻辑的核心,都抽象成对 state
的操作,不同的同步器,本质上就是对 state 的语义定义不同
子类如何利用 state?很简单,自定义
tryAcquire/tryRelease
如果在你这
state是要表示共享锁的,那么就需要变成这样的重写方法
以可重入的互斥锁 ReentrantLock 为例,它的内部维护了一个
state 变量,用来表示锁的占用状态。state
的初始值为 0,表示锁处于未锁定状态。当线程 A 调用 lock()
方法时,会尝试通过 tryAcquire() 方法独占该锁,并让
state 的值加 1。如果成功了,那么线程 A
就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列(CLH
变体队列)中,直到其他线程释放该锁。假设线程 A
获取锁成功了,释放锁之前,A
线程自己是可以重复获取此锁的(state 会累加)。
前面也说了,这就是ReentrantLock可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让
state 的值回到
0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
可见,一个锁,它的状态,它的定性,它的实现,都严格依赖state,因为state本质上就是对线程同步状态或者整个同步器的标志变量,而锁的根本就是管理线程的同步状态
再以倒计时器 CountDownLatch 以例,任务分为 N
个子线程去执行,state 也初始化为 N(注意 N
要与线程个数一致)。这 N
个子线程开始执行任务,每执行完一个子线程,就调用一次
countDown() 方法。该方法会尝试使用 CAS 操作,让
state 的值减少 1。当所有的子线程都执行完毕后(即
state 的值变为 0),CountDownLatch 会调用
unpark() 方法,唤醒主线程。这时,主线程就可以从
await() 方法(CountDownLatch
中的await() 方法而非 AQS
中的)返回,继续执行后续的操作,信号量Semaphone跟这个差不多
AQS 资源获取方式分析
AQS 定义两种资源共享方式
Exclusive:也就是独占锁,只有一个线程能执行Share:也就是共享锁,多个线程可同时执行
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但
AQS
也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock,他不就是读共享,写独占
独占模式
AQS 中对应的 tryAcquire()
模板方法如下,不提供默认实现
分析一下调用链路,对于很多锁实现tryAcquire()
,它们都在其中包裹了最核心的调用入口,是AQS 的
acquire(int arg)
方法,它是独占模式获取锁的核心入口,所以说,我们之前分析的锁,例如
ReentrantLock ,他的加锁方法都是委托
Sync,在尝试获取独占锁的时候,直接一个return sync.tryAcquire(1);,然后线程在调用
tryAcquire(arg) 尝试获取锁
在 acquire()
中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node
节点加入到 AQS
的等待队列中,加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。
若返回 false(获取失败),则进入
acquire(...) 方法执行入队和阻塞逻辑。
以前有个
addWaiter()是当获取锁失败了,自旋也失败了的情况下,需要将线程封装为
Node 加入等待队列这个过程,这个方法在我这版 JDK21 没了,而是内联在核心的
acquire(Node node, int arg, ...) 方法中
在 acquire(...) 方法的无限循环中,当
tryAcquire() 失败后,会执行以下步骤封装 Node 并加入 CLH
队列:
1 | final int acquire(Node node, int arg, boolean shared, |
通过
new ExclusiveNode()/new SharedNode()创建节点,并用node.waiter = current绑定当前线程;若tail == null,调用tryInitializeHead()创建空的头节点,然后通过casTail(t, node)原子性地将新节点设为尾节点,保证多线程下入队的线程安全;再设置新节点的prev指向原尾节点,原尾节点的next指向新节点,形成 CLH 双向链表初始化的队列后如图
而且传统 acquireQueued() 的阻塞线程的逻辑(阻塞线程 +
唤醒后重试 tryAcquire())同样内联在
acquire(...) 方法中,核心逻辑如下:
1 | final int acquire(Node node, int arg, boolean shared, |
- 通过
LockSupport.park(this)阻塞当前线程(底层调用 Unsafe 的park方法,挂起线程),然后阻塞前将node.status设为WAITING,标记线程处于等待状态,线程被唤醒后(其他线程释放锁时调用signalNext()执行LockSupport.unpark()),会再次进入循环,调用tryAcquire()重试获取锁;若线程被中断,根据interruptible参数决定是否退出循环。
最后,它还有个取消获取的逻辑cancelAcquire,当线程被中断(interruptible=true)或超时(timed=true)时,执行
cancelAcquire
1 | private int cancelAcquire(Node node, boolean interrupted, |
对于这个cleanQueue()方法,也没什么太多好说的,双向链表的断开删节点,懂得都懂
共享模式
差别不大,只不过是通过
tryAcquireShared,然后同样的模板方法,然后子类实现也是内部调用acquire(int arg),整体流程一致,只不过走的是判断为共享锁那边的情况,使用共享锁的逻辑,多了一点点无可厚非的处理
AQS 中以共享模式获取资源的入口方法是 acquireShared()
,如下
在 acquireShared()
方法中,会先尝试获取共享锁,如果获取失败,则将当前线程加入到队列中阻塞,等待唤醒后尝试获取共享锁,分别对应上面
acquireShared()
中的两个方法:tryAcquireShared() 和
acquire(....)
其中 tryAcquireShared() 方法是 AQS
提供的模板方法,由同步器来实现具体逻辑。因此这里以
Semaphore 为例,来分析共享模式下,如何获取资源
Semaphore
中实现了公平锁和非公平锁,接下来以非公平锁为例来分析
tryAcquireShared()
Semaphore 中重写的 tryAcquireShared()
方法会调用下边的 nonfairTryAcquireShared() 方法
- 默认情况下,Semaphone 采用非公平模式获取资源
在共享模式下,AQS 中的 state 值表示共享资源的数量。
在 nonfairTryAcquireShared()
方法中,会在死循环中不断尝试获取资源
如果 「剩余资源数不足」 或者 「当前线程成功获取资源」 ,就退出死循环。如果正常获取,方法返回 剩余的资源数量 ,根据返回值的不同,分为 3 种情况:
- 剩余资源数量 > 0 :表示成功获取资源,并且后续的线程也可以成功获取资源。
- 剩余资源数量 = 0 :表示成功获取资源,但是后续的线程无法成功获取资源。
- 剩余资源数量 < 0 :表示获取资源失败
对于acquire(....) ,在获取失败的情况下,将当前线程加入到
CLH
队列阻塞,等待唤醒后尝试获取共享锁,他的实现和上述独占模式并无太大差异,只不过是针对共享模式的一些判断罢了
所以,我们能说,这个
acquire方法是 AQS 「获取资源」的统一底层实现,同时支持共享模式和独占模式,可中断(interruptible=true)和不可中断获取,定时(timed=true)和非定时获取,处理队列初始化、节点入队、线程阻塞 / 唤醒、取消获取等全流程,对 OOM 等异常场景的鲁棒性设计
对于acquire(...) 是 AQS
内部核心方法(你之前看过的源码),这里第三个参数
shared=true 标识共享模式,会将线程封装为
SharedNode 入队并阻塞。
完整的放一下这个方法吧,上面说的好像不怎么清楚
1 | final int acquire(Node node, int arg, boolean shared, |
然后,它会判断当前节点是否是队列中第一个等待节点
1 | if (!first && (pred = (node == null) ? null : node.prev) != null && |
接着,它会尝试获取资源
1 | if (first || pred == null) { |
- 当线程被唤醒并成功获取资源后,会调用
signalNextIfShared(node)唤醒后续的共享节点,实现链式唤醒,保证多个线程能同时获取资源,比如 Semaphore 释放多个许可时,多个等待线程能依次被唤醒。
二遍:这里举个例子
当线程 A 释放资源(
releaseShared),会调用signalNext(head)唤醒头节点的后继节点(线程 B);线程 B 被唤醒后,执行
tryAcquireShared获取资源成功;线程 B 会调用
signalNextIfShared(node)唤醒自己的后继节点(线程 C);线程 C 重复步骤 2-3,直到没有可获取资源的线程。
1 | private static void signalNextIfShared(Node h) { |
- 独占模式只会唤醒一个节点,而共享模式会链式唤醒所有可获取资源的节点。
然后是一个队列初始化,接着就是区分共享和独占的核心内容了,创建节点
1 | else if (node == null) { // 节点还未创建 |
继续是 CAS 保证节点原子性入队,其实就是双向链表的尾插
1 | else if (pred == null) { // 节点已创建但未入队 |
- 共享模式下,线程会被封装为
SharedNode入队
自旋等待,减少不公平性
1 | else if (first && spins != 0) { |
最后才是设置节点状态 & 线程阻塞
1 | else if (node.status == 0) { |
- 先将节点状态设为
WAITING:标记节点可被唤醒,然后就是LockSupport.park调用这个方法进行阻塞
Semaphore 的 acquire()
实际调用的是这个方法,只不过是支持中断了,AQS 中的源码如下:
1 | public final void acquireSharedInterruptibly(int arg) |
那么Semaphore中的尝试获取,也就是非阻塞获取,也离不开上面的acquire方法,对应
Semaphore 的
tryAcquire(long timeout, TimeUnit unit),支持超时获取,核心逻辑如下,它支持中断,超时时间到仍未获取资源,返回
false,剩下没啥好说的
1 | public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
所以,完整的调用链路如下
1 | Semaphore.acquire() |
AQS 资源释放方式分析
独占模式
AQS 中以独占模式释放资源的入口方法是 release()
,代码如下:
这是独占锁释放的顶层入口方法(对应各种 Lock.unlock()
的底层实现),分析一下
1 | public final boolean release(int arg) { |
tryRelease也是 AQS 留给子类的抽象方法,子类需实现释放锁的状态修改逻辑,比如ReentrantLock中会减少重入次数,直到 state=0 表示完全释放
只有
tryRelease返回true,表示锁已完全释放,才会执行signalNext唤醒后继线程。
以 ReentrantLock 的 Sync 为例
1 | // ReentrantLock 中非公平锁 Sync 中的 tryRelease 实现 |
tryRelease的返回值决定是否唤醒后继线程:只有返回true(完全释放)才会唤醒,其中包装的核心逻辑一般都是修改 state + 校验释放合法性
唤醒后继线程:signalNext (Node h),这是 AQS
内部的核心唤醒方法,源码如下:
1 | private static void signalNext(Node h) { |
二编:别忘
s = h.next是真正等待锁的第一个有效节点;
接下来进入 LockSupport.unpark
方法查看如何唤醒后继节点
虚拟线程就用虚拟线程自己对unpark节点唤醒的实现,如果是真实的线程,就使用
Unsafe,来唤醒该节点的后继节点对应的线程
最后唤醒的逻辑交给了 native 方法,给 Java 外部实现
AQS 中 Node 的核心状态常量上面也提到过,为了方便看,再放一下
1 | static final int WAITING = 1; // 线程等待被唤醒 |
其中,上面signalNext (Node h)中的getAndUnsetStatus(WAITING)就是原子清除节点的
WAITING 状态,避免重复唤醒;
只有状态不为 CANCELLED(<0) 且不为 0 的节点,才是有效等待节点。
以 ReentrantLock 为例,锁释放设计 AQS 的调用链路如下
1 | Lock.unlock() |
共享模式
AQS 中以共享模式释放资源的入口方法是 releaseShared()
,代码如下
- 可以看到,其中,它调用了
tryReleaseShared()方法和singalNext()
其中 tryReleaseShared() 方法是 AQS 提供的模板方法
这里同样以 Semaphore 来讲解,那么就是如下
首先Semaphore 调用release来释放资源
那么它就会调用tryReleaseShared
在 Semaphore 实现的 tryReleaseShared()
方法中,会在死循环内不断尝试释放资源,即通过 CAS 操作来更新
state 值。
如果更新成功,则证明资源释放成功,会进入到 singalNext()
方法。singalNext()方法和在在前文共享模式下获取资源的部分提到的singalNextIfShared()方法的逻辑是差不多的,只是缺了个检查,在上面我已进行了详细的源码分析,此处不再重复。
Condition 条件队列的释放
若独占锁结合 Condition 使用,释放逻辑会涉及「条件队列 →
同步队列」的节点转移,核心方法是
ConditionObject.signal():
1 | public final void signal() { |
对于doSignal(ConditionNode first, boolean all)
1 | private void doSignal(ConditionNode first, boolean all) { |
- Condition 的
signal()会将条件队列的节点转移到 AQS 同步队列; - 转移后的节点需等待 AQS 同步队列的
signalNext唤醒,它的本质还是依赖release方法的唤醒逻辑,复用就完事了
自定义同步器
基于 AQS 可以实现自定义的同步器, AQS 提供了 5 个模板方法和其他各种东西。如果需要自定义同步器一般的方式是这样,这是模板设计模式很好的一个应用
别急,别忘了 AQS 能帮你做了哪些比较底层的事情,AQS 的源码结构决定了自定义同步器的分工
- 维护 CLH
- 线程挂起与唤醒
- 原子操作
state - 中断与超时的处理
所以说,我们需要做的事情,也就是需要重写的方法,那么先自定义的同步器继承
AbstractQueuedSynchronizer ,然后按需要重写 AQS
暴露的如下模板方法
| 方法 | 作用 | 适用模式 | 核心逻辑 |
|---|---|---|---|
tryAcquire(int arg) |
尝试独占式获取资源 | 独占锁(如锁) | 判断 state 是否允许获取,允许则通过 CAS 修改
state,返回 true/false |
tryRelease(int arg) |
尝试独占式释放资源 | 独占锁 | 修改 state 为释放状态,返回是否完全释放(true
则唤醒队列线程) |
tryAcquireShared(int arg) |
尝试共享式获取资源 | 共享锁(如信号量) | 返回负数(失败)/0(成功但无剩余资源)/ 正数(成功且有剩余资源) |
tryReleaseShared(int arg) |
尝试共享式释放资源 | 共享锁 | 修改 state,返回是否释放成功(true
则唤醒后续共享线程) |
isHeldExclusively() |
判断是否当前线程独占资源 | 条件变量(Condition) | 实现 Condition 时需要,返回 true/false |
然后,state 是 AQS
唯一的核心变量(private volatile int),你需要为它定义业务语义
- 比如实现「不可重入独占锁」:
state=0表示锁空闲,state=1表示锁被持有; - 比如实现「二元信号量」:
state=0无许可,state=1有许可; - 所有对
state的修改,必须通过 AQS 提供的getState()/compareAndSetState()等方法,保证线程安全。
什么是钩子方法呢?
钩子方法是一种被声明在抽象类中的方法,一般使用 protected
关键字修饰,它可以是空方法,由子类实现,也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。
除了上面提到的五个钩子方法之外,AQS 类中的其他方法都是
final ,所以无法被其他类重写。
我们先实现一个最简单的独占锁:同一时间只有一个线程能获取锁,释放后其他线程才能获取。
定义同步器类,继承 AQS
重写
tryAcquire/tryRelease/isHeldExclusively,定义state语义1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
// 1. 自定义独占锁(实现 Lock 接口,让用法和原生锁一致)
public class MyExclusiveLock implements Lock {
// 2. 内部类Sync继承 AQS,重写钩子方法
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 尝试获取锁:state=0 表示空闲,CAS 修改为 1 则获取成功
* @param arg 传 1(表示获取 1 把锁)
*/
protected boolean tryAcquire(int arg) {
// 必须用 CAS 保证原子性(多线程竞争时)
if (compareAndSetState(0, 1)) {
// 记录当前持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false; // state≠0,获取失败
}
/**
* 尝试释放锁:把 state 置为 0,释放锁
* @param arg 传 1(表示释放 1 把锁)
*/
protected boolean tryRelease(int arg) {
// 只有持有锁的线程能释放
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException("当前线程未持有锁");
}
// 释放锁,清空持有线程,state 置 0
setExclusiveOwnerThread(null);
setState(0); // 独占锁释放无需 CAS,只有持有线程能执行
return true; // 返回 true,AQS 会唤醒队列中的线程
}
/**
* 判断是否当前线程独占锁(Condition 需要)
*/
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 创建 Condition
public Condition newCondition() {
return new ConditionObject(); // 复用 AQS 的 Condition 实现
}
}
// 3. 封装 Sync,对外提供 Lock 接口的方法
private final Sync sync = new Sync();
// 获取锁
// 它们看起来是调用 AQS 的 final acquire 方法,但是实际上内部会调用我们重写的 tryAcquire
public void lock() {
sync.acquire(1); // arg=1 表示获取 1 把锁
}
// 尝试获取锁(直接调用 tryAcquire)
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 释放锁(调用 AQS 的 release 方法,内部调用 tryRelease)
public void unlock() {
sync.release(1);
}
// 以下是 Lock 接口的默认实现,直接复用 Sync 的能力
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long time, java.util.concurrent.TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
public java.util.concurrent.locks.Condition newCondition() {
return sync.newCondition();
}
}来测试一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class MyLockTest {
private static final MyExclusiveLock lock = new MyExclusiveLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
// 启动 10 个线程,每个线程累加 1000 次
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
lock.lock(); // 获取自定义锁
try {
count++;
} finally {
lock.unlock(); // 释放锁
}
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread t : threads) {
t.join();
}
System.out.println("最终计数:" + count); // 预期输出 10000(线程安全)
}
}
那么,共享锁如何自定义,共享锁的特点是「多个线程可同时获取资源」,比如实现一个「最多允许 3 个线程同时访问」的信号量
先定义共享同步器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77import java.util.concurrent.locks.AbstractQueuedSynchronizer;
// 自定义共享信号量
public class MySharedSemaphore {
private static class Sync extends AbstractQueuedSynchronizer {
// 初始化信号量的许可数(state = 许可数)
public Sync(int permits) {
setState(permits); // AQS 的 setState 方法,设置初始 state
}
/**
* 尝试共享式获取许可:剩余许可 > 0 则获取
* @param arg 获取的许可数(这里固定为 1)
* @return 负数:失败;0:成功但无剩余许可;正数:成功且有剩余许可
*/
protected int tryAcquireShared(int arg) {
// 循环 CAS 保证多线程下修改 state 成功
for (;;) {
int current = getState(); // 获取当前剩余许可
int remaining = current - arg; // 剩余许可 = 当前 - 要获取的
// 剩余许可 < 0 则失败;否则 CAS 修改 state
if (remaining < 0 || compareAndSetState(current, remaining)) {
return remaining; // 返回剩余许可(负数=失败,非负=成功)
}
}
}
/**
* 尝试共享式释放许可
* @param arg 释放的许可数(这里固定为 1)
* @return 是否释放成功(true 则 AQS 唤醒后续线程)
*/
protected boolean tryReleaseShared(int arg) {
// 循环 CAS 修改 state
for (;;) {
int current = getState();
int next = current + arg; // 释放后许可数 +=1
if (next < current) { // 检查溢出
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next)) {
return true; // 释放成功,AQS 会唤醒队列中的共享线程
}
}
}
// 获取当前剩余许可数
public int getPermits() {
return getState();
}
}
private final Sync sync;
// 构造方法:初始化许可数
public MySharedSemaphore(int permits) {
if (permits < 0) throw new IllegalArgumentException("许可数不能为负");
this.sync = new Sync(permits);
}
// 获取许可(阻塞)
public void acquire() {
sync.acquireShared(1); // AQS 的共享获取方法
}
// 释放许可
public void release() {
sync.releaseShared(1); // AQS 的共享释放方法
}
// 获取剩余许可数
public int availablePermits() {
return sync.getPermits();
}
}测试共享信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class MySemaphoreTest {
private static final MySharedSemaphore semaphore = new MySharedSemaphore(3); // 最多 3 个线程同时执行
private static final AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
semaphore.acquire(); // 获取许可
try {
System.out.println(Thread.currentThread().getName() + " 获取许可,剩余:" + semaphore.availablePermits());
Thread.sleep(1000); // 模拟业务逻辑
count.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " 释放许可,剩余:" + semaphore.availablePermits());
}
}, "线程" + i);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("最终计数:" + count); // 输出 10(所有线程都执行了)
}
}
为了修改state操作保证的线程安全,修改 state
时,独占锁可用 setState()(只有持有线程操作),共享锁必须用
compareAndSetState(),而且和原生锁一样,获取资源后要在
finally
中释放,避免死锁;而且tryAcquire/tryRelease
等方法要轻量,只做 state 判断和修改,阻塞逻辑由 AQS 的
acquire()/release() 处理,要不然 AQS
就白用了





