AQS
04月 13日
AQS 概述
AQS 本身是一个抽象类,为开发者实现同步器提供了一个通用的执行框架,主要定义了资源获取和释放的通用流程,而具体的资源获取逻辑则由具体同步器通过重写模版方法来实现。对于 AQS 来说,它的核心思想就是如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁进一步优化实现的。所以 AQS 核心就两样东西,一个 volatile int 类型的 state 变量,一个双向链表实现的 FIFO 同步队列。
首先我们来介绍 state 变量,它更多的是在表示一种同步状态,来展示当前临界资源的获取情况,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。
/**
* The synchronization state.
*/
private volatile int state;
state 的所有获取设置的方法都通过 final 关键字修饰,无法被子类重写
AQS 核心架构
flowchart LR
subgraph AQS [AbstractQueuedSynchronizer]
direction TB
State[volatile int state<br/>同步状态]
Queue[FIFO 同步队列<br/>双向链表]
State ---|CAS 操作| StateOps[getState/setState/<br/>compareAndSetState]
Queue --- QueueHead[head<br/>哑节点/已获取线程]
Queue --- QueueTail[tail<br/>队列尾部]
QueueHead -.->|prev/next| Node1[Node 1<br/>thread=A<br/>waitStatus]
QueueTail -.->|prev/next| Node2[Node 2<br/>thread=B<br/>waitStatus]
Node1 -.->|prev/next| Node2
State ---|语义由子类定义| Examples[ReentrantLock: 重入次数<br/>Semaphore: 可用许可数<br/>CountDownLatch: 倒计数]
end
Subclass[ReentrantLock/Semaphore/...] ---|继承| AQS
Subclass ---|重写| Template[tryAcquire/tryRelease<br/>tryAcquireShared/tryReleaseShared]
style State fill:#FFE4B5
style Queue fill:#E6E6FA
style Node1 fill:#90EE90
style Node2 fill:#FFB6C1
style Template fill:#FFD700
AQS = state + 队列 + 模板方法
- state:同步状态,由子类定义语义
- 队列:FIFO 双向链表,管理等待线程
- 模板方法:由子类实现具体的获取/释放逻辑
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
接着我们来介绍 AQS 的双向队列,AQS 的双向队列有一个 Node 类型的 head 节点和 Node 类型的 tail 节点。其中 AQS 会将每条请求共享资源的线程封装成 CLH 队列变体的一个 Node 节点来实现锁的分配。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
其中对于 Node 节点主要关注以下字段需要注意
- volatile Node prev,指向前驱节点
- volatile Node next;,指向后驱节点
- volatile Thread thread,当前请求共享资源的线程
- volatile int waitStatus,节点的状态。
这里对于 Node 节点,就有以下几种状态
| Node 节点状态 | 值 | 含义 |
|---|---|---|
| CANCELLED | 1 | 表示线程已经取消获取锁。线程在等待获取资源时被中断,等待资源超时会更新为该状态。 |
| SIGNAL | -1 | 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。 |
| CONDITION | -2 | 表示节点在等待 Condition。当其他线程调用了 Condition 的 signal() 方法后,节点会从等待队列转移到同步队列中等待获取资源。 |
| PROPAGATE | -3 | 用于共享模式。在共享模式下,可能会出现线程在队列中无法被唤醒的情况,因此引入了 PROPAGATE 状态来解决这个问题。 |
| (初始状态) | 0 | 加入队列的新节点的初始状态。 |
在 AQS 的源码中,经常使用 > 0 、 < 0 来对 waitStatus 进行判断。
如果 waitStatus > 0 ,表明节点的状态已经取消等待获取资源。
如果 waitStatus < 0 ,表明节点的状态处于正常的状态,即没有取消等待。
其中 SIGNAL 状态是最重要的,节点状态流转以及对应操作如下:
| 状态流转 | 对应操作 |
|---|---|
| 0 | 新节点入队时,初始状态为 0。 |
| 0 -> SIGNAL | 新节点入队时,它的前继节点状态会由 0 更新为 SIGNAL。SIGNAL 状态表明该节点的后续节点需要被唤醒。 |
| SIGNAL -> 0 | 在唤醒后继节点时,需要清除当前节点的状态。通常发生在 head 节点,比如 head 节点的状态由 SIGNAL 更新为 0,表示已经对 head 节点的后继节点唤醒了。 |
| 0 -> PROPAGATE | AQS 内部引入了 PROPAGATE 状态,为了解决并发场景下,可能造成的线程节点无法唤醒的情况。(在 AQS 共享模式获取资源的源码分析会讲到) |
stateDiagram-v2
[*] --> 0: 新节点创建\n入队时初始化
0 --> SIGNAL: 后继节点入队\nCAS 设置前驱状态
SIGNAL --> 0: head 节点唤醒后继\n清除 SIGNAL 标志
0 --> CANCELLED: 线程中断/超时\n主动取消
SIGNAL --> CANCELLED: 线程中断/超时\n主动取消
0 --> CONDITION: await()\n进入条件队列
CONDITION --> 0: signal()\n转移回同步队列
0 --> PROPAGATE: 共享模式\ndoReleaseShared
PROPAGATE --> 0: 共享释放\n唤醒传播
note right of 0
初始状态
无特殊含义
end note
note right of SIGNAL
最重要状态
表示"释放时要唤醒后继"
end note
note right of CANCELLED
终态,不可逆
waitStatus > 0
end note
note right of CONDITION
仅用于条件队列
不在同步队列中
end note
note right of PROPAGATE
仅用于共享模式
确保唤醒传播
end note
前面我们说过 AQS 是定义了资源获取和释放的通用流程,而具体的资源获取逻辑则由具体同步器通过重写模版方法来实现,主要有以下模版方法:
| 方法 | 作用 | 返回值含义 |
|---|---|---|
| tryAcquire | 独占模式获取 | true 成功,false 失败 |
| tryRelease | 独占模式释放 | true 完全释放,false 还有重入 |
| tryAcquireShared | 共享模式获取 | 负数失败,非负数成功 |
| tryReleaseShared | 共享模式释放 | true 允许唤醒后继 |
| isHeldExclusively | 是否独占持有 | 用于 Condition |
// Main exported methods
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Returns {@code true} if synchronization is held exclusively with
* respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link ConditionObject} methods, so need
* not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively;
* {@code false} otherwise
* @throws UnsupportedOperationException if conditions are not supported
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
从 ReentrantLock 来看资源获取和释放的源码
这里我们定义一份代码块,我们用 IDE 一步一步来看 AQS 关于资源获取和释放的流程
public class AQSRead {
public static void main(String[] args) {
Lock lock=new ReentrantLock();
lock.lock();
lock.unlock();
}
}
首先我们来从 lock.lock() 进入源码阅读,看如何加锁
资源获取流程
整体流程概览
flowchart TD
Start([线程调用 lock.lock]) --> FastCAS{CAS 尝试获取锁<br/>state: 0 → 1}
FastCAS -->|成功| SetOwner[设置 owner 线程] --> Success([获取锁成功])
FastCAS -->|失败| Acquire[调用 acquire 1]
Acquire --> TryAcquire1{tryAcquire<br/>第1次尝试}
TryAcquire1 -->|成功| Success
TryAcquire1 -->|失败| AddWaiter[addWaiter<br/>封装为 Node 节点]
AddWaiter --> TailNull{tail == null?}
TailNull -->|是| EnqInit[enq 初始化队列<br/>创建 dummy head] --> EnqAdd[enq 加入队列]
TailNull -->|否| EnqAdd
EnqAdd --> AcquireQueued[acquireQueued 入队循环]
AcquireQueued --> CheckPred{前驱节点是 head?}
CheckPred -->|是| TryAcquire2{tryAcquire<br/>第2次尝试}
CheckPred -->|否| CheckPark
TryAcquire2 -->|成功| SetHead[setHead<br/>当前节点变为 head] --> Success
TryAcquire2 -->|失败| CheckPark
CheckPark{shouldParkAfterFailedAcquire<br/>检查前驱 waitStatus}
CheckPark -->|waitStatus > 0| SkipCancel[跳过已取消节点] --> AcquireQueued
CheckPark -->|waitStatus == 0| SetSignal[CAS 设置 SIGNAL] --> AcquireQueued
CheckPark -->|waitStatus == SIGNAL| Park[parkAndCheckInterrupt<br/>LockSupport.park 阻塞]
Park --> Wake([被唤醒后继续循环]) --> AcquireQueued
style Success fill:#90EE90
style Park fill:#FFB6C1
style FastCAS fill:#FFE4B5
style TryAcquire1 fill:#FFE4B5
style TryAcquire2 fill:#FFE4B5
关键点:
- 黄色框:两次"快速尝试"获取锁的机会(入队前 + 刚入队时)
- 绿色框:获取锁成功
- 粉色框:线程阻塞,等待唤醒
ReentrantLock 入口 AQS 逻辑
IDE 点击lock.lock()的 lock()进入如下代码

点击继承按钮进入

来到

接着点击lock()进入

我们选择效率更高的非公平锁进入查看

此时可以看到代码ReentrantLock.java下的static final class NonfairSync extends Sync有如下加锁逻辑
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先注意一个点,这段逻辑是在ReentrantLock 中的,它
- 首先是尝试利用 CAS 机制看能否获取到锁,如果获取到了,则将当前对象的 owner 线程设置为自身线程
- 否则,调用
acquire()方法,这个方法在 AQS 中实现
AQS 中的 acquire 方法调用
我们在 IDE 中查看这个方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看到其中只有一个 if 判断,判断条件我们每个函数依次来看
- !tryAcquire(arg),尝试获取锁
- addWaiter(Node.EXCLUSIVE)
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
tryAcquire(arg)
我们进入tryAcquire(arg),发现如下

说明这个方法是需要子类来实现的,这个在 AQS 概述中也说过
我们选择ReentrantLock 的非公平锁实现

会看到代码ReentrantLock .java 的内部类 NonfairSync 实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
接着进入,代码如下
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在这里可以看到我们的 nonfairTryAcquire方法中其实主要就做了几个操作
- 首先得到当前运行的线程,得到当前 AQS 中的
state状态 - 若
state==0,表示当前共享资源没有被占用,尝试 CAS 操作进行资源占用 - 否则,state!=0,判断当前占用共享资源的线程是不是当前线程(这里也说明 ReentrantLock 是可重入锁)
- 是的话,则增加重入次数,返回 true(获得了锁)
- 否则,说明当前共享资源被其他线程占用,返回 false
接着我们再来看 addWaiter方法
addWaiter(Node.EXCLUSIVE)
其中static final Node EXCLUSIVE = null;
这里注意 addWaiter 方法再 AQS 中实现的
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
从注释来看,这个方法是用于:为当前线程和给定模式创建并排队节点。
其过程如下:
- 创建新的 Node 节点
- 找到当前 AQS 双向链表的尾节点
- 如果当前尾节点不为 null,则直接将当前新建的 Node 节点加入链表中去作为尾节点,返回这个新建 Node 节点
- 如果当前尾节点为 null,则说明当前双向链表还未初始化,调用
enq(node)方法进行初始化双向链表 - 最后都会返回新增的 Node 节点
接下来我们看看 enq方法
enq(node)
这个enq方法也是在 AQS 中实现的
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
根据其注释,其方法作用是:将节点插入队列,必要时进行初始化
根据其代码来看其流程就是
- 若链表 为 null,则先初始化
- 不为 null 后,将传入节点设为尾节点
- 返回尾节点
在返回之后节点会进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)函数。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
其中acquireQueued(addWaiter(Node.EXCLUSIVE), arg)源代码如下,位于 AQS 中
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
根据其注释,其函数作用是:以独占不可中断模式获取已在队列中的线程的资源(条件等待方法和获取方法都会用到此方法)。
其代码流程为:
- 先把
failed置为true,表示当前获取锁流程默认失败,防止中途异常时节点没清理。 - 进入死循环,不断尝试获取锁。
- 每次先拿到当前节点的前驱节点
p。 - 如果前驱节点就是
head,说明自己已经排到队首了,这时调用tryAcquire(arg)尝试抢锁。- 这里有一个需要注意的点,我们的 双向链表 head 节点中的 thread 会被置为 null,也就是其中不再有线程,转而变为了一个哨兵节点。当一个节点的线程抢夺到了锁资源时,其自身节点往往会被置为 head 节点。理解这里,对后续资源释放时,节点唤醒处有帮助。
- 如果抢锁成功:
- 把当前节点设为新的
head - 原头节点的
next置空,帮助 GC - 标记
failed = false - 返回在等待过程中是否被中断过
interrupted
- 把当前节点设为新的
- 如果前驱不是
head,或者虽然到队首了但抢锁失败,- 就调用
shouldParkAfterFailedAcquire(p, node)判断当前线程是否应该阻塞。 - 如果可以阻塞,再调用
parkAndCheckInterrupt()挂起当前线程,并记录挂起期间是否被中断。
- 就调用
- 整个过程一直循环,直到成功拿到锁。
finally里如果最终没有成功获取锁,就执行cancelAcquire(node)取消当前节点的排队,避免队列里留下脏节点。
在这里我们还需要注意一个内容,即如果前驱节点不是 head,则将当前及诶点进行阻塞的判断
shouldParkAfterFailedAcquire(p, node)
这里查看其在 AQS 中的源代码为
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
根据其注释,函数意义时: 判断当前线程获取锁失败后,是否可以安心挂起;如果前驱节点状态不合适,就先修正队列状态,而不是立刻阻塞。
流程如下:
- 先拿到前驱节点的
waitStatus。 - 如果前驱节点状态是
SIGNAL:- 说明前驱节点已经承诺,自己释放锁时会唤醒当前节点。
- 当前线程这时就可以安全地
park。 - 直接返回
true。
- 如果前驱节点状态
ws > 0:- 说明前驱节点已经取消了排队,属于无效节点。
- 这时不能挂在它后面等唤醒。
- 所以要一直向前跳过这些被取消的节点,找到一个有效前驱。
- 并重新建立前驱后继关系
pred.next = node。 - 返回
false,表示这次先不挂起,下一轮再重试。
- 如果前驱节点状态是
0或PROPAGATE:- 说明前驱节点还没设置成“释放后要通知后继”的状态。
- 先通过 CAS 把前驱节点状态改成
SIGNAL。 - 但此时仍然不能立刻挂起,因为刚改完状态,还要再尝试一次获取锁,避免错过抢锁机会。
- 返回
false。
只有当前驱节点的状态已经是
SIGNAL时,当前线程才会真正挂起;如果前驱失效,就跳过失效节点;如果前驱还没准备好发通知,就先把它改成SIGNAL,下一轮再决定是否阻塞。这个方法本质上是在做”挂起前检查”。它确保当前节点挂起前,前驱节点一定处于释放后会唤醒自己的状态;否则就先修正前驱状态或跳过取消节点,避免线程挂起后没人唤醒,造成死等。
如果前驱节点状态为 SIGNAL时,则返回 true,接着就会调用parkAndCheckInterrupt()方法,阻塞当前节点
parkAndCheckInterrupt()
其源代码为
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在调用这个函数后,在执行到LockSupport.park(this);时,线程被阻塞,后续的代码 return Thread.interrupted(); 不再执行,直到这个节点被唤醒。
这里我们先假设它被唤醒,看看唤醒后会在当前的 parkAndCheckInterrupt(),acquireQueued(addWaiter(Node.EXCLUSIVE), arg)函数中发生什么。
当被唤醒后,当前节点会继续执行parkAndCheckInterrupt()函数中的Thread.interrupted(),这个函数会返回一个 true,表明这个 Node 节点中的线程被阻塞过,但是需要注意的是Thread.interrupted()虽然会返回 true,表明线程被阻塞过,但是返回过程中也会清楚原本的中断标识位。
所以在返回结果后,回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)函数的循环中会有这样一段 代码
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
让外层的interrupted = true;记录下这个线程被中断过。
parkAndCheckInterrupt() 返回的是”这一次有没有被中断”,而 interrupted = true 是把”历史上有没有被中断过”永久记录下来,避免被 Thread.interrupted() 清掉后信息丢失。
即获取锁过程中不响应中断(不中断流程),但不能丢中断信息。
接着所有函数执行完毕,我们会回到acquire 函数中,有如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
为什么最后还要 selfInterrupt()
因为在排队过程中,parkAndCheckInterrupt() 内部调用了 Thread.interrupted(),会清除中断标志。但 AQS 的 acquire() 是一种不响应中断的获取方式:
- 中断来了,不会立刻退出获取锁流程
- 但中断信息也不能丢
- 所以等最终拿到锁后,再通过
selfInterrupt()把中断状态恢复
到此获取资源的流程就已经大致清楚了。
接下来我们结合看下资源释放的流程
资源释放流程
ReentrantLock 入口 AQS 逻辑
原本的调用代码为
public static void main(String[] args) {
Lock lock=new ReentrantLock();
lock.lock();
lock.unlock();
}
现在释放锁会进入lock.unlock()函数

然后会进入一个

接着就来到了 AQS 的
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
资源释放流程图
flowchart TD
Start([线程调用 lock.unlock]) --> Release[调用 release 1]
Release --> TryRelease{tryRelease<br/>子类实现}
TryRelease -->|state == 0| FullyReleased[完全释放<br/>free = true]
TryRelease -->|state > 0| PartialRelease[部分释放<br/>重入次数减一<br/>free = false]
FullyReleased --> ClearOwner[清空 owner 线程] --> CheckHead
PartialRelease --> ReturnFalse[返回 false<br/>不唤醒后继]
CheckHead{head != null<br/>& waitStatus != 0?}
CheckHead -->|否| NoWaiter[没有等待节点<br/>或无需唤醒] --> ReturnTrue([返回 true])
CheckHead -->|是| UnparkSuccessor[unparkSuccessor<br/>唤醒后继节点]
UnparkSuccessor --> FindSuccessor{找到有效后继节点}
FindSuccessor -->|next 正常| WakeNext[LockSupport.unpark<br/>唤醒 next 线程]
FindSuccessor -->|next 为 null/已取消| ScanFromTail[从 tail 向前扫描<br/>找到有效节点] --> WakeNext
WakeNext --> ReturnTrue
WakeNext -.->|被唤醒线程| WakeThread([从 parkAndCheckInterrupt 返回]) --> AcquireLoop[继续 acquireQueued 循环<br/>尝试获取锁]
style FullyReleased fill:#90EE90
style WakeNext fill:#FFE4B5
style ReturnFalse fill:#FFB6C1
style AcquireLoop fill:#E6E6FA
关键点:
- 绿色框:锁完全释放,可以唤醒后继
- 黄色框:执行 unpark 唤醒操作
- 粉色框:未完全释放(重入),不唤醒
- 虚线箭头:被唤醒线程回到获取流程
逻辑主线很简单:先尝试释放资源,释放成功后再决定要不要唤醒后继节点。
流程
- 调用
tryRelease(arg)尝试释放锁。- 这个方法由子类实现,比如
ReentrantLock会在这里做 state 减少、线程占有者置空等操作。 - 如果返回
false,说明这次释放还没彻底完成,比如重入锁还没完全释放干净,直接返回false。
- 这个方法由子类实现,比如
- 如果
tryRelease(arg)返回true:- 说明锁已经真正释放成功了。
- 取出头节点
head。
- 判断
head != null && h.waitStatus != 0head != null:说明同步队列已经存在等待节点。head.waitStatus != 0:说明头节点后面大概率有线程在等,并且需要被唤醒。
- 满足条件就调用
unparkSuccessor(h)- 唤醒头节点的后继节点,让它继续去竞争锁。
- 最后返回
true,表示本次释放成功。
接下来我们就来看tryRelease(arg)的实现
tryRelease(arg)

我们依旧以ReentrantLock 为例
其代码如下
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
流程简述
- 先根据当前锁的状态减去释放次数,本质就是减少重入次数。
- 校验当前线程是否是锁的持有者,如果不是,直接抛异常,禁止非法释放。
- 判断减少之后状态是否为 0:
- 如果是 0,说明锁已经完全释放,同时清空持有线程标记。
- 如果不是 0,说明只是释放了一层重入,还没彻底释放。
- 更新锁的状态值。
- 返回是否完全释放成功(只有 state 为 0 才算 true)。
这里就是释放锁的完整流程。
总结
通过对 AQS 源码的分析,可以发现其核心本质可以归纳为三个关键点:状态管理、队列排队、线程阻塞与唤醒机制。
首先,AQS 通过一个 volatile 的 state 变量来表示同步状态,不同的同步器对 state 赋予不同语义,例如在可重入锁中表示重入次数。所有对 state 的修改都通过 CAS 保证原子性,从而实现线程安全的状态竞争。
其次,AQS 基于 CLH 变种实现了一个 FIFO 的双向同步队列。当线程获取锁失败时,会被封装为 Node 节点加入队列尾部,通过排队机制保证线程获取锁的公平性(或近似公平)。队列中的每个节点通过 prev/next 维护前驱后继关系,通过 waitStatus 控制节点的行为状态。
在资源获取流程中,线程首先尝试通过 tryAcquire 直接获取锁,失败后进入同步队列,并通过自旋 + 阻塞的方式等待锁释放。只有当前驱节点为 head 时才有资格再次尝试获取锁,否则通过 shouldParkAfterFailedAcquire 判断是否需要阻塞,避免无效自旋。阻塞采用 LockSupport.park 实现,被唤醒后继续参与竞争。
刚加入队列阶段一共会有 两次获取锁 的机会:一次在入队前的快速尝试,一次是入队后如果正好排到队首时的立即重试。
第 1 次尝试(入队之前)
在
acquire()里一开始就会:
- 调用
tryAcquire- 能拿到锁就直接走人,不进队列
这是队列外的一次尝试
第 2 次尝试(刚入队后第一轮循环)
线程入队后进入
acquireQueued:第一轮循环:
- 如果发现自己前驱是
head- 会立刻再尝试一次
tryAcquire这是刚入队后的第一次(也是关键的一次)尝试
为什么只有这两次是”特殊的”?
因为这两次是:
- 没有阻塞的机会成本最低的尝试
- 避免”刚好轮到当前线程却傻乎乎去阻塞”的情况
特别是第二次:
防止这种尴尬:
- 线程刚入队
- 前面线程刚释放锁
- 结果线程直接 park 睡了
后面就进入正常节奏了:
- 不在队首 → 不尝试,准备睡
- 在队首但失败 → 可能再试一次
- 被唤醒 → 再试一次
就变成”唤醒驱动的尝试”,不再是”立即多次尝试”
在资源释放流程中,线程通过 tryRelease 释放资源,只有在 state 完全释放(例如重入次数归零)时,才会唤醒后继节点。唤醒操作通过 unpark 实现,保证队列中的线程能够继续竞争锁。
此外,AQS 在设计上采用“不中断获取,但不丢失中断信号”的策略。线程在等待过程中即使被中断,也不会立即退出获取流程,而是记录中断状态,在最终获取锁后再恢复中断标志。这种设计保证了同步语义的一致性。
AQS 本质上是一个基于 state + FIFO 队列 + park/unpark 的同步框架,通过”失败入队、自旋判断、条件阻塞、释放唤醒”这一套机制,实现线程安全的资源竞争。