| 学习路线指引(点击解锁) | 知识定位 | 人群定位 |
|---|---|---|
| 🧡 Python实战微信订餐小程序 🧡 | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 |
| 💛Python量化交易实战💛 | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 |
AQS 是一个同步框架,关于同步在操作系统(一)—— 进程同步 中对进程同步做了些概念性的介绍,我们了解到进程(线程同理,本文基于 JVM 讲解,故下文只称线程)同步的工具有很多:Mutex、Semaphore、Monitor。但是Mutex 和 Semaphore 作为低级通信存在不少缺点,Monitor 机制解决了上面部分缺憾,但是仍然存在问题,AQS 的出现很好的解决了这些问题。
Monitor 解决了上述几个问题,但在 HotSpot 中底层是基于 Mutex 做的线程同步,在 1.6 之前且还没有进行优化,每次锁竞争都需要经历两次上下文切换严重影响性能。第二个问题是 HotSpot 实现的是精简的 Mesa 语义,不支持多个条件变量。
AQS 即一个类,java.util.concurrent.AbstractQueuedSynchronized.Class,这个类作为一个构造同步器的框架。AQS 本身并不提供 API 给程序员用于直接的同步控制,它是一个抽象类,通过实现它的抽象方法来构建同步工具,如 ReentrantLock、CountDownLatch 等。也就是说它不是一个面向业务开发使用的工具,是构建同步器所复用的一套机制抽取出来形成的框架,这个框架我们自己也可以通过实现它的抽象方法来构建自己的同步器。
其实通过观察同步器的各种实现都是相似的,我们会发现锁的实现通常需要以下三要素:
状态管理在信号量中是整型值,在 Mutex 中就是 Owner,在 Synchronized (Monitor) 中也是 Owner,这些机制都有对应的操作来加锁和解锁,通常加锁/解锁操作也对应着线程的阻塞(等待)/唤醒,这些线程没有获取到锁后需要等待,有的实现是自旋,有的实现是挂起。不论是 Synchronized 还是 AQS 都有等待队列供未获取到锁的线程进入,直到锁释放。
同理,AQS 所做的主要的三件事也就是上面三件,只是每一项的实现细节可能不同,支持的功能更广泛。
下文会先列出 AQS 的设计,也就是实现了哪些特性,接下来就会通过看源码和图示来了解这些设计的具体实现。
独占模式即同一时刻只能有一个线程可以通过阻塞点,共享模式下可以同时有多个线程在执行。
以上都是 AQS 需要支持的功能,基于模板模式的设计,AQS 提供了以下方法供子类继承重新实现:

关于同步器的实现思路,首先需要有一个状态来标明锁是否可用,在 AQS 的实现中,其维护了一个变量 state,这个变量使用 volatile 关键字进行标识,保证其在线程之间的可见性。加锁解锁操作简化来看就是只需要把这个状态更改,且标明当前线程占有锁。在独占模式下,加锁操作必须互斥,也就是在同一时刻只能有一个线程加锁成功,AQS 使用 CAS 原子指令来保证 State 的互斥访问。在一个线程成功加锁(改变锁的状态 State 的值,该值具体如何改变取决于同步工具如何实现)之后,其他线程尝试 CAS 则会加锁失败,那么加锁失败的线程该如何呢?这些线程可以自旋等待或者阻塞,AQS 提供 CLH 队列将这些阻塞的线程管理起来,CLH 是一个先进先出的队列,加锁失败的线程会被进入队列阻塞。因此加锁和解锁操作并不仅仅是简单的修改锁状态,在这之后还需要维护队列。AQS 的主要原理也就是围绕着队列进行入,加锁解锁功能由继承 AQS 的同步工具实现,在调用加锁操作之后,AQS 来维护线程入队,并且将线程阻塞,在调用解锁操作后,AQS 将队列中的线程按规则出队并且唤醒。
static final class Node {
static final Node SHARED = new Node(); // 共享模式下的节点
static final Node EXCLUSIVE = null; // 独占模式下的节点
static final int CANCELLED = 1; // 被取消了的状态
static final int SIGNAL = -1; // 处于park() 状态等待被unpark()唤醒的状态
static final int CONDITION = -2; // 处于等待 condition 的状态
static final int PROPAGATE = -3; // 共享模式下需要继续传播的状态
volatile int waitStatus; // 当前节点的状态
volatile Node prev; // 前驱节点指针
volatile Node next; // 后继节点指针
volatile Thread thread; // 抢占锁的线程
Node nextWaiter; // 指向下一个也处于等待的节点
}
上面提到 AQS 有一个阻塞队列,这个队列的具体实现是双向链表,队列中的节点是对线程进行封装后的 Node 类,这个类的每个字段含义如下:
为了更好的讲解原理,下面直接模拟线程竞争锁的场景来进行分析,我们首先看两个线程在独占锁模式下加锁和解锁的源码,再看共享模式下的场景。
场景:线程 T1 和线程 T2 竞争锁
acquire(int arg) 是独占模式下加锁的入口方法
public final void acquire(int arg) {
// tryAcquire(arg) 是继承 AQS 实现在独占模式下的同步工具需要重写的方法。当这个方法返回true,就表示当前线程获得了锁。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
流程如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private Node addWaiter(Node mode) {
// 根据当前线程实例化一个独占式的 Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
/**
* 1. 将尾部赋给一个临时变量,注意此时tail为空,因为tail此时还没有指向Node对象
* 2. tail == null,因此执行 enq(node) 方法,这个node是创建的当前线程的node
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 这个CAS方法的含义是,判断当前AQS的尾部节点是pred,如果是则重新设置为当前node
// 设定成功,则退出
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果挂载队尾节点也存在竞争 则使用无限CAS自旋方式设置队尾
enq(node);
return node;
}
对上述流程第4点拆解来看,T2 首先要执行addWaiter(Node.EXCLUSIVE)进行入队,我们看下执行流程:

private Node enq(final Node node) {
// 死循环判断
for (;;) {
/**
* 第一次循环:
* tail赋值给临时变量t, 注意此时tail仍然是null, 进入if块
* 调用compareAndSetHead(new Node()) 创建了一个新的节点 ,这个节点的结构如下:
* Node() { // Used to establish initial head or SHARED marker
* }
* 该节点中的线程为 NULL,也就是我们下文所称的哑节点
* 此时队列里有了第一个Node
*/
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
/**
* 第二次循环:在第一次循环时head赋给了tail,此时tail 不为空,
* 进入else块,将线程T1入队,也就是维护链表关系
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq() 算是上述整个流程的一个分支,为后续讲解更清楚,有必要先详细讲解此方法,画出入队情形下的队列图,流程如下:

下图是 T2 入队成功后队列图:

对上述流程第4点进行拆解,执行完毕 addWaiter() 返回当前节点 T2 Node,接着就进入acquireQueued 方法,我们来看该方法的流程:
final boolean acquireQueued(final Node node, int arg) {
// 标记自己是否拿到了资源 true为没有获取到。
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 判断上一个节点是否是头部,即当前节点为队列中第二个节点,tryAcquire()尝试获取锁
if (p == head && tryAcquire(arg)) {
// 将自己设置为头结点
setHead(node);
// 断开原先的哑节点与当前节点的next连接
p.next = null; // help GC
// 标记已经获取到节点
failed = false;
// 返回线程中断标记
return interrupted;
}
/**
* 判断在一次自旋加锁失败后是否需要睡眠
* 自旋第一次时shouldParkAfterFailedAcquire(p, node)返回false, 不会进入if块
* 自旋第二次时,shouldParkAfterFailedAcquire(p, node)返回true,
* 此时进入parkAndCheckInterrupt()方法,此时线程阻塞在 parkAndCheckInterrupt()
* 如果线程休息过程中被中断过(Thread.interrupted();), interrupted 返回true
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法有两个标记,一个是 failed 标记是否拿到了资源,注意默认为 true 的时候是没有拿到,另一个是中断标记 interrupted 默认为 false, 该标记的含义不是该线程是否已经被打断过,而是是否需要被打断。我们先看 for 循环中的逻辑再看 finally 块。
private void setHead(Node node) {
head = node;
// 当前节点的线程字段置为空,也就是这个节点替代了原先的哑节点变成了新的哑节点
node.thread = null;
// 断开与原哑节点的prev连接
node.prev = null;
}

/**
* 检查当前节点并更新状态,返回是否需要被阻塞
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status 前置节点
* @param node the node 当前节点
* @return {@code true} if thread should block 是否需要被阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置节点的状态,ws默认值为0
int ws = pred.waitStatus;
// SIGNAL默认值为1
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前置节点处于取消状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 如果前置节点不是正常的等待状态那么就继续往前找直到找到一个正在等待装填的节点。将其后置节点断开接上当前节点。GC会回收一堆相互引用又没有外部引用的节点。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
接上述流程,T2 线程获取资源失败,因而走此方法逻辑,此时传进该方法的参数 p 是 T2 Node 的前置节点,也就是哑节点(注意因没获取到资源没有进入if块,哑节点还是最先创建的那个哑节点)。这个方法就是在线程没有获取资源的情况下不断自旋直到进入阻塞状态或者是加锁成功,shouldParkAfterFailedAcquire 就是返回一个布尔值判断当前线程是否需要进行阻塞。

流程如下:


private final boolean parkAndCheckInterrupt() {
// 线程阻塞在这,被打断后才会往下执行, 此时打断标记为 true
LockSupport.park(this);
// 返回打断标记并重置为false
return Thread.interrupted();
}
这个方法比较简单,就是在上一个方法判断要阻塞之后调用这个方法,通过执行 park() 方法将线程阻塞在此处,直到有另一个线程将其唤醒。唤醒的方式有两种,一是调用 unpark() 方法,二是使用 interrupt() 方法置中断标记为 true。T2 Node 阻塞在此处被唤醒后就继续往下走执行 Thread.interrupted(), 该方法会重置中断标记并且给出是否被中断,因此此方法最终会返回一个 bool 值,表明该线程是否需要被打断。
T2 Node 执行到此处则一直阻塞在这了,为了讲清楚这些细节,我们按实际场景讲述,这个线程就阻塞在这了, T2 Node 被唤醒必然是另一个线程对其执行了 unpark() 或者 interrupt() 方法,我们先来看调用 unpark() 的情况,也就是线程 T1 解锁的流程。
书接上回,T2 阻塞住了,此时 T1 执行完毕临界区,开始进行解锁,解锁操作首先是恢复锁状态 State 为可用,其中是否重入等细节由继承AQS 的同步工具实现(待讲 ReentrantLock 再讲此处细节),此处略过不表,我们重点关注 AQS 的锁释放原理,解锁过程(独占模式下)最终会调用到 AQS 的 release(int arg) 方法,流程如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 头结点存在 且头结点状态不为0
// 如果头结点不为初始化状态则唤醒队列下一个等待的线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 获取当前节点状态
*/
int ws = node.waitStatus;
// 该节点状态是有效的 就将其设置为初始化状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 从尾节点往前找, 找到一个waitStatus 小于0 的Node
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
此时流程如下:

上面的解锁流程的最后,T2 线程被唤醒,继续往下执行, 注意这里我看其他博客说的有点问题,在被 unpark 唤醒的情况下是不会修改中断标记的,因此这里返回的是 false,所以回到 acquireQueued 方法中并不会执行下面这行代码,关于 unpark() 和 interrupt() 我单独开篇博客讲下。

T2 线程被唤醒后就开始有了竞争锁的资格,这时候 for 循环又开始执行,即再次竞争锁,假如这次加锁成功(加锁失败就又继续阻塞,咱就没必要继续讲了,为了闭环我们假设这次加锁成功),那么返回中断标记 interrupted,这个值为 false。

此时也就完成了锁的更替 由 T1 变成了 T2,整个环节就闭环了,在这上面的讲解过程中没有引入更多的线程是为了避免讨论复杂化,在 T3、T4 甚至更多线程来临时同样是遵循的上述代码流程,只是因为多线程并发,这其中哪个线程能获取到锁、链表的维护是否正确,唤醒的时候唤醒的是哪个线程这些事并不可控,因而也没有必要一个个场景来复现,只要弄清楚原理想必足够了。





研读源码过程中发现所需时间耗费过长,在这个以敏捷为王的时代,偶尔也疑虑如此费劲周章地研习源码能有何收获,成本与收获似不成正比。故而 AQS 的源码暂且看到这,想必知晓原理已足够,后续若有闲思再补充共享式加锁和解锁流程细节。