AQS是什么?
因为 AQS 是抽象类,无法实例化,笔者建议读者结合 ReentrantLock 去阅读和调试 AQS 的源码。
简介
AQS (Abstract Queued Synchronizer) 是 JDK 提供的一套基于 FIFO 同步队列的阻塞锁和相关同步器的一个同步框架,通过 AQS 我们可以很容易地实现我们自己需要的独占锁或共享锁。 在 Java 中,信号量、ReentrantLock、CountDownLatch 等工具都是通过 AQS 来实现的。
AQS中维护了两个队列,同步队列和阻塞队列(ConditionObject
)。同步队列用于锁的实现,是基于链表实现的双向队列,也是CLH锁的变种。阻塞队列也是基于链表实现,但是单向的,和wait
、notify
具有同样功效,可以用于生产者消费者模式的实现,如JUC中部分阻塞队列的实现。
整体介绍
实现一把锁的核心要素
一把锁最基本的功能要有阻塞和唤醒,通常情况下还要设计成可重入的,否则一个线程获取锁进入互斥状态后,再次获取锁将会被阻塞,产生死锁问题。如下是设计一把锁,需要的几个核心要素:
- 需要一个
state
变量,记录锁的状态。state=0
,代表没有线程持有锁,state>0
有线程持有锁,并且state
的大小是重入的次数。state
的操作需要CAS保证线程安全。 - 需要有一个变量记录持锁的当前线程(
exclusiveOwnerThread
)。 - 底层支持一个线程的阻塞(
park
)和唤醒(unpark
)。 - 需要一个队列维护所有阻塞的线程,队列的操作需要CAS保证线程安全。
这几个要素在AQS中都有具体表现。
类图
我们可以看到,AbstractQueuedSynchronizer 继承自抽象类 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 这个类定义了存储独占当前锁的线程和获取的方法。
1 | /** |
AbstractQueuedSynchronizer 类通过内部类 Node 构成的 FIFO 同步队列来完成线程获取锁的排队工作。 同时,AbstractQueuedSynchronizer 通过 ConditionObject 来构建等待队列。
AQS的双向同步队列
基于链表实现的双向同步队列,在CLH的基础上进行了变种。CLH是单向队列,其主要特点是自旋检查前驱节点的locked
状态。而AQS同步队列是双向队列,每个节点也有状态waitStatus
,而其并不是一直对前驱节点的状态自旋判断,而是自旋一段时间后阻塞让出cpu时间片(上下文切换),等待前驱节点主动唤醒后继节点。
AQS同步队列有些特别,head节点是一个空节点,没有记录线程node.thread=null
,其后继节点才是实质性的有线程的节点,这样做有什么好处呢,当最后一个有线程的节点出队列后,不需要想着清空队列,同时下次有新节点入队列也不需要重新实例化队列。
所以当队列为空head=tail=null
,第一个线程节点入队列时,需要先初始化,head和tail会先指向一个空节点,新节点加到tail的后面,tail指针后移到新节点,而head依然指向一个空节点,且head永远指向一个没有实质线程的空节点。
我们来看一下单个 Node 类的源码。
1 | static final class Node { |
Node 类是一个典型的双向链表元素结构,拥有前驱和后继的引用。 其中 SHARED 和 EXCLUSIVE 常量分别代表共享模式和独占模式:
- 共享模式是一个锁允许多条线程同时操作,如信号量 Semaphore 采用的就是基于 AQS 的共享模式实现的。
- 独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如 ReentranLock。
四种状态
同步队列节点有4种状态CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3),代码中经常以waitStatus
>0 代表为取消节点,waitStatus
< 0为正常节点。
- CANCELLED(1):取消状态的节点会被剔除。当获取锁意外中断而导致没有获取锁时,节点会被取消。
- SIGNAL(-1):预示后继节点的线程需要被唤醒,但是节点node在获取锁失败后,检测前驱节点
waitStatus=SIGNAL
,判断为node应该放心阻塞,等待前驱唤醒。 - CONDITION(-2):线程正在等待状态 这个状态只在condition阻塞队列中设置。
- PROPAGATE(-3):只设置在头节点,代表需要继续无条件唤醒后继,只应用于共享模式。
模板方法
AQS 类作为抽象的基础框架类,通过定义模板方法的方式提供了一套实现锁的模板,其最基本的锁实现方式需要子类复写模板:
1 | protected boolean tryAcquire(int arg); // 获取独占锁 |
ReentrantLock、Semaphore、CountDownLatch 等工具都是通过复写上述若干方法实现的。
独占模式获取锁操作 — acquire
AQS 通过 acquire 方法获取锁,独占模式获取锁的原则是:
- 获取锁失败的节点插入队尾
- 队首不存储任何信息
- 队首的后继非 CANCELLED 状态节点是唯一有权利获取锁的节点
1. acquire 方法
1 | public final void acquire(int arg) { |
acquire 方法调用了我们上面已经提到的需要子类复写的获取独占锁方法 tryAcquire,框架只定义了获取失败后如何处理 — addWaiter。
2. 获取锁失败后入队操作 — addWaiter
1 | private Node addWaiter(Node mode) { |
这是一个典型的双向队列节点入队操作,通过我们已经讲到过的 CAS 操作实现了尾节点的判断和更新。
1 | private final boolean compareAndSetTail(Node expect, Node update) { |
而兜底方法 enq 有两种可能被调用:
- 当前队列为空,尚没有任何元素存在
- 并发环境下,原子操作 compareAndSetTail 失败
因此,enq 方法中承担了两部分工作的处理:
1 | private Node enq(final Node node) { |
可以看到,enq 方法中通过原子操作和循环的方式实现了并发环境下尾节点的添加,之所以这部分逻辑从 addWaiter 中抽离,是因为大部分情况下是不会出现上述两种可能的,将正常业务逻辑与特殊且不常用业务逻辑分离,是值得学习的一种代码组织方式。
3. 重新获取锁及更新节点状态 — acquireQueued
如上所述,并发环境下有可能在插入列表之前尚需要等待锁,但在插入列表后,马上又可以获取到锁,因此此时再次获取锁就可以减少不必要的等待。 于是,在上述 addWaiter 方法执行完,又执行了 acquireQueued 方法。
1 | final boolean acquireQueued(final Node node, int arg) { |
最终,通过 shouldParkAfterFailedAcquire 方法判断是否需要让线程挂起,如果需要挂起,则调用 parkAndCheckInterrupt 方法挂起线程。 线程的挂起最终是通过 Unsafe 的 static native 方法 park 实现的。
4. 节点出队 — cancelAcquire
最终,无论是线程获取锁过程中发生异常还是超时唤醒,都需要将当前 node 出队,这就是 cancelAcquire 方法做的事。
1 | private void cancelAcquire(Node node) { |
可以看到,上面这段代码处理了三种情况: 1. node 是队尾节点 2. node 的前驱不是队首节点,node 也不是队尾节点 3. node 的前驱是队首节点
其中,对于情况 1 和情况 2,都进行了 node 的出队,但是第三种情况却没有执行出队方法,这是为什么呢? 原因是当前节点已经被标记为 CANCELLED 状态,那么,在后继节点执行 shouldParkAfterFailedAcquire 方法时,会先让其前驱节点,也就是我们当前的 node 节点出队。
5. 唤醒后继节点 — unparkSuccessor
由上所述,当 node 是队首节点的后继时,执行了 unparkSuccessor 方法。 当前节点是队列中唯一等待锁的节点,所以必须让出获取锁的权限,让他的后继去获取锁,这就是 unparkSuccessor 方法做的事情。
1 | private void unparkSuccessor(Node node) { |
代码通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒了该节点对应的线程。
LockSupport.unpark 最终调用了 UNSAFE 类的 unpark 方法。
1 | public static void unpark(Thread thread) { |
那么,被唤醒的线程执行了什么呢?我们要找到该线程是什么时候被挂起的,那就是在上述的 acquireQueued 方法中,我们回看 acquireQueued 方法,被唤醒的线程在执行完 Thread.interrupted() 方法后继续循环,尝试获取锁,从而保证了锁的独占与竞争。 parkAndCheckInterrupt 方法并没有对 Thread.interrupted() 的返回值做任何处理,由此可见,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的。
可中断独占锁加锁 — acquireInterruptibly
上面提到,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的,AQS 还提供了另一个方法 acquireInterruptibly 加锁,从而让获取锁失败等待的线程可以被中断。
1 | public final void acquireInterruptibly(int arg) |
1. 可中断独占锁加锁失败处理 — doAcquireInterruptibly
可以看到,这里获取锁失败后调用了 doAcquireInterruptibly 方法,doAcquireInterruptibly 方法与 acquire 的结构非常像:
1 | private void doAcquireInterruptibly(int arg) |
可以看到,这里仍然是通过独占模式获取锁,但是在 parkAndCheckInterrupt 返回后,与此前 acquire 方法中继续循环的方式不同,取而代之的,抛出了 InterruptedException 异常,中断线程执行。
独占锁的解锁 — release
独占锁的解锁较为简单,因为加锁成功后,该线程对应的节点已经从同步队列中移除,此处如果解锁成功,只需要唤醒下一个节点去竞争锁即可。
1 | public final boolean release(int arg) { |
这里仍然调用了 unparkSuccessor 方法,通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒线程。
共享锁的加锁 — acquireShared
共享锁的加锁主要做了下面三项工作:
- 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
- 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
- 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。
1. 获取锁传递与唤醒 — setHeadAndPropagate
整个流程与独占锁的加锁流程非常类似,最大的区别就是 setHeadAndPropagate 方法,这个方法做的事情就是我们上面提到的:
- 如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。
1 | //两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0 |
2. 唤醒后继节点 — doReleaseShared
doReleaseShared 方法进行了后续节点的唤醒。
1 | private void doReleaseShared() { |
共享锁的解锁 — releaseShard
1 | public final boolean releaseShared(int arg) { |
结合上面我们已经介绍过的 doReleaseShared 方法源码,共享锁的解锁非常简单,与独占模式的解锁一样,他在解锁成功以后进行了后续节点的唤醒操作,从而保证锁的竞争。