AQS

04月 13日

AQS 概述

AQS 本身是一个抽象类,为开发者实现同步器提供了一个通用的执行框架,主要定义了资源获取和释放的通用流程,而具体的资源获取逻辑则由具体同步器通过重写模版方法来实现。对于 AQS 来说,它的核心思想就是如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁进一步优化实现的。所以 AQS 核心就两样东西,一个 volatile int 类型的 state 变量,一个双向链表实现的 FIFO 同步队列。

首先我们来介绍 state 变量,它更多的是在表示一种同步状态,来展示当前临界资源的获取情况,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。

/**
     * The synchronization state.
     */
private volatile int state;

state 的所有获取设置的方法都通过 final 关键字修饰,无法被子类重写

AQS 核心架构

flowchart LR
    subgraph AQS [AbstractQueuedSynchronizer]
        direction TB
        State[volatile int state<br/>同步状态]
        Queue[FIFO 同步队列<br/>双向链表]

        State ---|CAS 操作| StateOps[getState/setState/<br/>compareAndSetState]

        Queue --- QueueHead[head<br/>哑节点/已获取线程]
        Queue --- QueueTail[tail<br/>队列尾部]

        QueueHead -.->|prev/next| Node1[Node 1<br/>thread=A<br/>waitStatus]
        QueueTail -.->|prev/next| Node2[Node 2<br/>thread=B<br/>waitStatus]

        Node1 -.->|prev/next| Node2

        State ---|语义由子类定义| Examples[ReentrantLock: 重入次数<br/>Semaphore: 可用许可数<br/>CountDownLatch: 倒计数]
    end

    Subclass[ReentrantLock/Semaphore/...] ---|继承| AQS
    Subclass ---|重写| Template[tryAcquire/tryRelease<br/>tryAcquireShared/tryReleaseShared]

    style State fill:#FFE4B5
    style Queue fill:#E6E6FA
    style Node1 fill:#90EE90
    style Node2 fill:#FFB6C1
    style Template fill:#FFD700

AQS = state + 队列 + 模板方法

  • state:同步状态,由子类定义语义
  • 队列:FIFO 双向链表,管理等待线程
  • 模板方法:由子类实现具体的获取/释放逻辑
//返回同步状态的当前值
protected final int getState() {
     return state;
}
 // 设置同步状态的值
protected final void setState(int newState) {
     state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

接着我们来介绍 AQS 的双向队列,AQS 的双向队列有一个 Node 类型的 head 节点和 Node 类型的 tail 节点。其中 AQS 会将每条请求共享资源的线程封装成 CLH 队列变体的一个 Node 节点来实现锁的分配。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

其中对于 Node 节点主要关注以下字段需要注意

  • volatile Node prev,指向前驱节点
  • volatile Node next;,指向后驱节点
  • volatile Thread thread,当前请求共享资源的线程
  • volatile int waitStatus,节点的状态。

这里对于 Node 节点,就有以下几种状态

Node 节点状态 含义
CANCELLED 1 表示线程已经取消获取锁。线程在等待获取资源时被中断,等待资源超时会更新为该状态。
SIGNAL -1 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒
CONDITION -2 表示节点在等待 Condition。当其他线程调用了 Condition 的 signal() 方法后,节点会从等待队列转移到同步队列中等待获取资源。
PROPAGATE -3 用于共享模式。在共享模式下,可能会出现线程在队列中无法被唤醒的情况,因此引入了 PROPAGATE 状态来解决这个问题。
(初始状态) 0 加入队列的新节点的初始状态。

在 AQS 的源码中,经常使用 > 0< 0 来对 waitStatus 进行判断。

如果 waitStatus > 0 ,表明节点的状态已经取消等待获取资源。

如果 waitStatus < 0 ,表明节点的状态处于正常的状态,即没有取消等待。

其中 SIGNAL 状态是最重要的,节点状态流转以及对应操作如下:

状态流转 对应操作
0 新节点入队时,初始状态为 0。
0 -> SIGNAL 新节点入队时,它的前继节点状态会由 0 更新为 SIGNAL。SIGNAL 状态表明该节点的后续节点需要被唤醒。
SIGNAL -> 0 在唤醒后继节点时,需要清除当前节点的状态。通常发生在 head 节点,比如 head 节点的状态由 SIGNAL 更新为 0,表示已经对 head 节点的后继节点唤醒了。
0 -> PROPAGATE AQS 内部引入了 PROPAGATE 状态,为了解决并发场景下,可能造成的线程节点无法唤醒的情况。(在 AQS 共享模式获取资源的源码分析会讲到)
stateDiagram-v2
    [*] --> 0: 新节点创建\n入队时初始化
    0 --> SIGNAL: 后继节点入队\nCAS 设置前驱状态
    SIGNAL --> 0: head 节点唤醒后继\n清除 SIGNAL 标志
    0 --> CANCELLED: 线程中断/超时\n主动取消
    SIGNAL --> CANCELLED: 线程中断/超时\n主动取消
    0 --> CONDITION: await()\n进入条件队列
    CONDITION --> 0: signal()\n转移回同步队列
    0 --> PROPAGATE: 共享模式\ndoReleaseShared
    PROPAGATE --> 0: 共享释放\n唤醒传播

    note right of 0
        初始状态
        无特殊含义
    end note

    note right of SIGNAL
        最重要状态
        表示"释放时要唤醒后继"
    end note

    note right of CANCELLED
        终态,不可逆
        waitStatus > 0
    end note

    note right of CONDITION
        仅用于条件队列
        不在同步队列中
    end note

    note right of PROPAGATE
        仅用于共享模式
        确保唤醒传播
    end note

前面我们说过 AQS 是定义了资源获取和释放的通用流程,而具体的资源获取逻辑则由具体同步器通过重写模版方法来实现,主要有以下模版方法:

方法 作用 返回值含义
tryAcquire 独占模式获取 true 成功,false 失败
tryRelease 独占模式释放 true 完全释放,false 还有重入
tryAcquireShared 共享模式获取 负数失败,非负数成功
tryReleaseShared 共享模式释放 true 允许唤醒后继
isHeldExclusively 是否独占持有 用于 Condition
// Main exported methods

    /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * Attempts to acquire in shared mode. This method should query if
     * the state of the object permits it to be acquired in the shared
     * mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread.
     *
     * <p>The default implementation throws {@link
     * UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return a negative value on failure; zero if acquisition in shared
     *         mode succeeded but no subsequent shared-mode acquire can
     *         succeed; and a positive value if acquisition in shared
     *         mode succeeded and subsequent shared-mode acquires might
     *         also succeed, in which case a subsequent waiting thread
     *         must check availability. (Support for three different
     *         return values enables this method to be used in contexts
     *         where acquires only sometimes act exclusively.)  Upon
     *         success, this object has been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * Attempts to set the state to reflect a release in shared mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this release of shared mode may permit a
     *         waiting acquire (shared or exclusive) to succeed; and
     *         {@code false} otherwise
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * Returns {@code true} if synchronization is held exclusively with
     * respect to the current (calling) thread.  This method is invoked
     * upon each call to a non-waiting {@link ConditionObject} method.
     * (Waiting methods instead invoke {@link #release}.)
     *
     * <p>The default implementation throws {@link
     * UnsupportedOperationException}. This method is invoked
     * internally only within {@link ConditionObject} methods, so need
     * not be defined if conditions are not used.
     *
     * @return {@code true} if synchronization is held exclusively;
     *         {@code false} otherwise
     * @throws UnsupportedOperationException if conditions are not supported
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }


从 ReentrantLock 来看资源获取和释放的源码

这里我们定义一份代码块,我们用 IDE 一步一步来看 AQS 关于资源获取和释放的流程

public class AQSRead {
    public static void main(String[] args) {
        
        Lock lock=new ReentrantLock();
        lock.lock();
        lock.unlock();
    }
}

首先我们来从 lock.lock() 进入源码阅读,看如何加锁

资源获取流程

整体流程概览

flowchart TD
    Start([线程调用 lock.lock]) --> FastCAS{CAS 尝试获取锁<br/>state: 0 → 1}

    FastCAS -->|成功| SetOwner[设置 owner 线程] --> Success([获取锁成功])
    FastCAS -->|失败| Acquire[调用 acquire 1]

    Acquire --> TryAcquire1{tryAcquire<br/>第1次尝试}

    TryAcquire1 -->|成功| Success
    TryAcquire1 -->|失败| AddWaiter[addWaiter<br/>封装为 Node 节点]

    AddWaiter --> TailNull{tail == null?}
    TailNull -->|是| EnqInit[enq 初始化队列<br/>创建 dummy head] --> EnqAdd[enq 加入队列]
    TailNull -->|否| EnqAdd

    EnqAdd --> AcquireQueued[acquireQueued 入队循环]

    AcquireQueued --> CheckPred{前驱节点是 head?}
    CheckPred -->|是| TryAcquire2{tryAcquire<br/>第2次尝试}
    CheckPred -->|否| CheckPark

    TryAcquire2 -->|成功| SetHead[setHead<br/>当前节点变为 head] --> Success
    TryAcquire2 -->|失败| CheckPark

    CheckPark{shouldParkAfterFailedAcquire<br/>检查前驱 waitStatus}
    CheckPark -->|waitStatus > 0| SkipCancel[跳过已取消节点] --> AcquireQueued
    CheckPark -->|waitStatus == 0| SetSignal[CAS 设置 SIGNAL] --> AcquireQueued
    CheckPark -->|waitStatus == SIGNAL| Park[parkAndCheckInterrupt<br/>LockSupport.park 阻塞]

    Park --> Wake([被唤醒后继续循环]) --> AcquireQueued

    style Success fill:#90EE90
    style Park fill:#FFB6C1
    style FastCAS fill:#FFE4B5
    style TryAcquire1 fill:#FFE4B5
    style TryAcquire2 fill:#FFE4B5

关键点:

  • 黄色框:两次"快速尝试"获取锁的机会(入队前 + 刚入队时)
  • 绿色框:获取锁成功
  • 粉色框:线程阻塞,等待唤醒

ReentrantLock 入口 AQS 逻辑

IDE 点击lock.lock()lock()进入如下代码

1774343546896-f8676197-50a4-4fbd-9b24-29e8260501f6.png

点击继承按钮进入

1774343580421-8a88c1a4-5d44-4e3c-901d-095eebd21d13.png

来到

1774343614685-dc83fae3-979f-4f21-bdf8-9f9236ff6b0c.png

接着点击lock()进入

1774343674922-15ded1e0-fffa-4350-b4e0-29d27b8799b7.png

我们选择效率更高的非公平锁进入查看

1774343708513-9c8ca151-ea2c-464f-8bbe-32af1c0b3b19.png

此时可以看到代码ReentrantLock.java下的static final class NonfairSync extends Sync有如下加锁逻辑

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

首先注意一个点,这段逻辑是在ReentrantLock 中的,它

  • 首先是尝试利用 CAS 机制看能否获取到锁,如果获取到了,则将当前对象的 owner 线程设置为自身线程
  • 否则,调用 acquire() 方法,这个方法在 AQS 中实现

AQS 中的 acquire 方法调用

我们在 IDE 中查看这个方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

可以看到其中只有一个 if 判断,判断条件我们每个函数依次来看

  • !tryAcquire(arg),尝试获取锁
  • addWaiter(Node.EXCLUSIVE)
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

tryAcquire(arg)

我们进入tryAcquire(arg),发现如下

1774344631468-df8ed1f1-4163-4c3d-9ef0-e90bc5a71e3d.png

说明这个方法是需要子类来实现的,这个在 AQS 概述中也说过

我们选择ReentrantLock 的非公平锁实现

1774344729703-0528e5d5-c393-4dbe-bdd2-b6755de94303.png

会看到代码ReentrantLock .java 的内部类 NonfairSync 实现

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

接着进入,代码如下

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

在这里可以看到我们的 nonfairTryAcquire方法中其实主要就做了几个操作

  • 首先得到当前运行的线程,得到当前 AQS 中的state状态
  • state==0 ,表示当前共享资源没有被占用,尝试 CAS 操作进行资源占用
  • 否则,state!=0,判断当前占用共享资源的线程是不是当前线程(这里也说明 ReentrantLock 是可重入锁)
    • 是的话,则增加重入次数,返回 true(获得了锁)
  • 否则,说明当前共享资源被其他线程占用,返回 false

接着我们再来看 addWaiter方法

addWaiter(Node.EXCLUSIVE)

其中static final Node EXCLUSIVE = null;

这里注意 addWaiter 方法再 AQS 中实现的

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

从注释来看,这个方法是用于:为当前线程和给定模式创建并排队节点。

其过程如下:

  • 创建新的 Node 节点
  • 找到当前 AQS 双向链表的尾节点
  • 如果当前尾节点不为 null,则直接将当前新建的 Node 节点加入链表中去作为尾节点,返回这个新建 Node 节点
  • 如果当前尾节点为 null,则说明当前双向链表还未初始化,调用enq(node)方法进行初始化双向链表
  • 最后都会返回新增的 Node 节点

接下来我们看看 enq方法

enq(node)

这个enq方法也是在 AQS 中实现的

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

根据其注释,其方法作用是:将节点插入队列,必要时进行初始化

根据其代码来看其流程就是

  • 若链表 为 null,则先初始化
  • 不为 null 后,将传入节点设为尾节点
  • 返回尾节点

在返回之后节点会进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)函数。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

其中acquireQueued(addWaiter(Node.EXCLUSIVE), arg)源代码如下,位于 AQS 中

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

根据其注释,其函数作用是:以独占不可中断模式获取已在队列中的线程的资源(条件等待方法和获取方法都会用到此方法)。

其代码流程为:

  • 先把 failed 置为 true,表示当前获取锁流程默认失败,防止中途异常时节点没清理。
  • 进入死循环,不断尝试获取锁。
  • 每次先拿到当前节点的前驱节点 p
  • 如果前驱节点就是 head,说明自己已经排到队首了,这时调用 tryAcquire(arg) 尝试抢锁。
    • 这里有一个需要注意的点,我们的 双向链表 head 节点中的 thread 会被置为 null,也就是其中不再有线程,转而变为了一个哨兵节点。当一个节点的线程抢夺到了锁资源时,其自身节点往往会被置为 head 节点。理解这里,对后续资源释放时,节点唤醒处有帮助。
  • 如果抢锁成功:
    • 把当前节点设为新的 head
    • 原头节点的 next 置空,帮助 GC
    • 标记 failed = false
    • 返回在等待过程中是否被中断过 interrupted
  • 如果前驱不是 head,或者虽然到队首了但抢锁失败,
    • 就调用 shouldParkAfterFailedAcquire(p, node) 判断当前线程是否应该阻塞。
    • 如果可以阻塞,再调用 parkAndCheckInterrupt() 挂起当前线程,并记录挂起期间是否被中断。
  • 整个过程一直循环,直到成功拿到锁。
  • finally 里如果最终没有成功获取锁,就执行 cancelAcquire(node) 取消当前节点的排队,避免队列里留下脏节点。

在这里我们还需要注意一个内容,即如果前驱节点不是 head,则将当前及诶点进行阻塞的判断

shouldParkAfterFailedAcquire(p, node)

这里查看其在 AQS 中的源代码为

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

根据其注释,函数意义时: 判断当前线程获取锁失败后,是否可以安心挂起;如果前驱节点状态不合适,就先修正队列状态,而不是立刻阻塞。

流程如下:

  1. 先拿到前驱节点的 waitStatus
  2. 如果前驱节点状态是 SIGNAL
    • 说明前驱节点已经承诺,自己释放锁时会唤醒当前节点。
    • 当前线程这时就可以安全地 park
    • 直接返回 true
  3. 如果前驱节点状态 ws > 0
    • 说明前驱节点已经取消了排队,属于无效节点。
    • 这时不能挂在它后面等唤醒。
    • 所以要一直向前跳过这些被取消的节点,找到一个有效前驱。
    • 并重新建立前驱后继关系 pred.next = node
    • 返回 false,表示这次先不挂起,下一轮再重试。
  4. 如果前驱节点状态是 0PROPAGATE
    • 说明前驱节点还没设置成“释放后要通知后继”的状态。
    • 先通过 CAS 把前驱节点状态改成 SIGNAL
    • 但此时仍然不能立刻挂起,因为刚改完状态,还要再尝试一次获取锁,避免错过抢锁机会。
    • 返回 false

只有当前驱节点的状态已经是 SIGNAL 时,当前线程才会真正挂起;如果前驱失效,就跳过失效节点;如果前驱还没准备好发通知,就先把它改成 SIGNAL,下一轮再决定是否阻塞。

这个方法本质上是在做”挂起前检查”。它确保当前节点挂起前,前驱节点一定处于释放后会唤醒自己的状态;否则就先修正前驱状态或跳过取消节点,避免线程挂起后没人唤醒,造成死等。

如果前驱节点状态为 SIGNAL时,则返回 true,接着就会调用parkAndCheckInterrupt()方法,阻塞当前节点

parkAndCheckInterrupt()

其源代码为

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

在调用这个函数后,在执行到LockSupport.park(this);时,线程被阻塞,后续的代码 return Thread.interrupted(); 不再执行,直到这个节点被唤醒。

这里我们先假设它被唤醒,看看唤醒后会在当前的 parkAndCheckInterrupt(),acquireQueued(addWaiter(Node.EXCLUSIVE), arg)函数中发生什么。

当被唤醒后,当前节点会继续执行parkAndCheckInterrupt()函数中的Thread.interrupted(),这个函数会返回一个 true,表明这个 Node 节点中的线程被阻塞过,但是需要注意的是Thread.interrupted()虽然会返回 true,表明线程被阻塞过,但是返回过程中也会清楚原本的中断标识位。

所以在返回结果后,回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)函数的循环中会有这样一段 代码

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

让外层的interrupted = true;记录下这个线程被中断过。

parkAndCheckInterrupt() 返回的是”这一次有没有被中断”,而 interrupted = true 是把”历史上有没有被中断过”永久记录下来,避免被 Thread.interrupted() 清掉后信息丢失。

即获取锁过程中不响应中断(不中断流程),但不能丢中断信息。

接着所有函数执行完毕,我们会回到acquire 函数中,有如下

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

为什么最后还要 selfInterrupt()

因为在排队过程中,parkAndCheckInterrupt() 内部调用了 Thread.interrupted(),会清除中断标志。但 AQS 的 acquire() 是一种不响应中断的获取方式:

  • 中断来了,不会立刻退出获取锁流程
  • 但中断信息也不能丢
  • 所以等最终拿到锁后,再通过 selfInterrupt() 把中断状态恢复

到此获取资源的流程就已经大致清楚了。

接下来我们结合看下资源释放的流程

资源释放流程

ReentrantLock 入口 AQS 逻辑

原本的调用代码为

public static void main(String[] args) {

        Lock lock=new ReentrantLock();
        lock.lock();
        lock.unlock();
    }

现在释放锁会进入lock.unlock()函数

1776069644148-ab9016e0-b88b-4dfd-933f-778eac7a2239.png

然后会进入一个

1776069674429-84aafee9-4c0b-4091-91bd-08bcf48faa5d.png

接着就来到了 AQS 的

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


资源释放流程图

flowchart TD
    Start([线程调用 lock.unlock]) --> Release[调用 release 1]

    Release --> TryRelease{tryRelease<br/>子类实现}

    TryRelease -->|state == 0| FullyReleased[完全释放<br/>free = true]
    TryRelease -->|state > 0| PartialRelease[部分释放<br/>重入次数减一<br/>free = false]

    FullyReleased --> ClearOwner[清空 owner 线程] --> CheckHead
    PartialRelease --> ReturnFalse[返回 false<br/>不唤醒后继]

    CheckHead{head != null<br/>& waitStatus != 0?}
    CheckHead -->|否| NoWaiter[没有等待节点<br/>或无需唤醒] --> ReturnTrue([返回 true])
    CheckHead -->|是| UnparkSuccessor[unparkSuccessor<br/>唤醒后继节点]

    UnparkSuccessor --> FindSuccessor{找到有效后继节点}
    FindSuccessor -->|next 正常| WakeNext[LockSupport.unpark<br/>唤醒 next 线程]
    FindSuccessor -->|next 为 null/已取消| ScanFromTail[从 tail 向前扫描<br/>找到有效节点] --> WakeNext

    WakeNext --> ReturnTrue

    WakeNext -.->|被唤醒线程| WakeThread([从 parkAndCheckInterrupt 返回]) --> AcquireLoop[继续 acquireQueued 循环<br/>尝试获取锁]

    style FullyReleased fill:#90EE90
    style WakeNext fill:#FFE4B5
    style ReturnFalse fill:#FFB6C1
    style AcquireLoop fill:#E6E6FA

关键点:

  • 绿色框:锁完全释放,可以唤醒后继
  • 黄色框:执行 unpark 唤醒操作
  • 粉色框:未完全释放(重入),不唤醒
  • 虚线箭头:被唤醒线程回到获取流程

逻辑主线很简单:先尝试释放资源,释放成功后再决定要不要唤醒后继节点。

流程

  1. 调用 tryRelease(arg) 尝试释放锁。
    • 这个方法由子类实现,比如 ReentrantLock 会在这里做 state 减少、线程占有者置空等操作。
    • 如果返回 false,说明这次释放还没彻底完成,比如重入锁还没完全释放干净,直接返回 false
  2. 如果 tryRelease(arg) 返回 true
    • 说明锁已经真正释放成功了。
    • 取出头节点 head
  3. 判断 head != null && h.waitStatus != 0
    • head != null:说明同步队列已经存在等待节点。
    • head.waitStatus != 0:说明头节点后面大概率有线程在等,并且需要被唤醒。
  4. 满足条件就调用 unparkSuccessor(h)
    • 唤醒头节点的后继节点,让它继续去竞争锁。
  5. 最后返回 true,表示本次释放成功。

接下来我们就来看tryRelease(arg)的实现

tryRelease(arg)

1776070092317-3ce6d2d6-6254-4cb4-bd44-723ea123262a.png

我们依旧以ReentrantLock 为例

其代码如下

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

流程简述

  1. 先根据当前锁的状态减去释放次数,本质就是减少重入次数。
  2. 校验当前线程是否是锁的持有者,如果不是,直接抛异常,禁止非法释放。
  3. 判断减少之后状态是否为 0:
    • 如果是 0,说明锁已经完全释放,同时清空持有线程标记。
    • 如果不是 0,说明只是释放了一层重入,还没彻底释放。
  4. 更新锁的状态值。
  5. 返回是否完全释放成功(只有 state 为 0 才算 true)。

这里就是释放锁的完整流程。

总结

通过对 AQS 源码的分析,可以发现其核心本质可以归纳为三个关键点:状态管理、队列排队、线程阻塞与唤醒机制

首先,AQS 通过一个 volatile 的 state 变量来表示同步状态,不同的同步器对 state 赋予不同语义,例如在可重入锁中表示重入次数。所有对 state 的修改都通过 CAS 保证原子性,从而实现线程安全的状态竞争。

其次,AQS 基于 CLH 变种实现了一个 FIFO 的双向同步队列。当线程获取锁失败时,会被封装为 Node 节点加入队列尾部,通过排队机制保证线程获取锁的公平性(或近似公平)。队列中的每个节点通过 prev/next 维护前驱后继关系,通过 waitStatus 控制节点的行为状态。

在资源获取流程中,线程首先尝试通过 tryAcquire 直接获取锁,失败后进入同步队列,并通过自旋 + 阻塞的方式等待锁释放。只有当前驱节点为 head 时才有资格再次尝试获取锁,否则通过 shouldParkAfterFailedAcquire 判断是否需要阻塞,避免无效自旋。阻塞采用 LockSupport.park 实现,被唤醒后继续参与竞争。

刚加入队列阶段一共会有 两次获取锁 的机会:一次在入队前的快速尝试,一次是入队后如果正好排到队首时的立即重试。

第 1 次尝试(入队之前)

acquire() 里一开始就会:

  • 调用 tryAcquire
  • 能拿到锁就直接走人,不进队列

这是队列外的一次尝试

第 2 次尝试(刚入队后第一轮循环)

线程入队后进入 acquireQueued

第一轮循环:

  • 如果发现自己前驱是 head
  • 会立刻再尝试一次 tryAcquire

这是刚入队后的第一次(也是关键的一次)尝试

为什么只有这两次是”特殊的”?

因为这两次是:

  • 没有阻塞的机会成本最低的尝试
  • 避免”刚好轮到当前线程却傻乎乎去阻塞”的情况

特别是第二次:

防止这种尴尬:

  • 线程刚入队
  • 前面线程刚释放锁
  • 结果线程直接 park 睡了

后面就进入正常节奏了:

  • 不在队首 → 不尝试,准备睡
  • 在队首但失败 → 可能再试一次
  • 被唤醒 → 再试一次

就变成”唤醒驱动的尝试”,不再是”立即多次尝试”

在资源释放流程中,线程通过 tryRelease 释放资源,只有在 state 完全释放(例如重入次数归零)时,才会唤醒后继节点。唤醒操作通过 unpark 实现,保证队列中的线程能够继续竞争锁。

此外,AQS 在设计上采用“不中断获取,但不丢失中断信号”的策略。线程在等待过程中即使被中断,也不会立即退出获取流程,而是记录中断状态,在最终获取锁后再恢复中断标志。这种设计保证了同步语义的一致性。

AQS 本质上是一个基于 state + FIFO 队列 + park/unpark 的同步框架,通过”失败入队、自旋判断、条件阻塞、释放唤醒”这一套机制,实现线程安全的资源竞争。

参考资料