五、AQS源码解读

目录

  1. AbstractQueuedSynchronizer
  2. ConditionObject
  3. 结语

    AbstractQueuedSynchronizer介绍

    刚开始,我学习AQS的时候一团麻,因为里面的函数、内部类ConditionObject比较复杂,一直没有滤清关系。
    等后面看到就发现不能这么一股脑的看,那应该怎么看呢?
  4. 首先你就把它想象成替代synchronized的替代品,synchronized的功能是什么呢,我们回忆一下,就是请求获取内部监视器锁,然后锁住资源,其他线程在请求的时候就会阻塞,直到其执行完并或释放锁。
  5. 了解AQS的结构也就是它的属性,知道它是一个FIFO队列,并用state来维持同步状态
  6. 结合以上,再去理解AQS是如何实现获取锁和释放锁的步骤的。
  7. 理解了以上,我们再去理解ConditionObject,这个内部类同样可以和原生并发中的一一对应,也就是Object的notify和wait。AQS通过ConditionObject实现signal、await

    AQS主要属性

  • volatile Node head 队列头结点
  • volatile Node tail 队列尾节点
  • state 同步状态

AQS实现原理最重要的就是以上三个属性,通过他们来实现加锁、释放、同步、FIFO。

单独说一下state这个属性,因为它比较难理解,但又是核心属性。可以将其理解为资源,也可以理解为钥匙、还可以理解为锁状态。举个例子比如独占锁:当state为0时,表示还没人拿到这把锁,当state为1时,表示锁已经被其他线程持有了,我们就必须等待再请求,我们占据、释放锁,也就是对state进行了CAS原子操作。

Node

Node是同步队列中的元素,就像链表中的元素,结构比较简单,通过next和pre实现一个双向的数据结构。

Node属性

  • thread:用来记录进入AQS队列里面的线程
  • prev: 前驱节点
  • next: 后继节点
  • nextWaiter: 下个等待节点(也就是状态为CONDITION)
  • waitStatus: 记录当前线程等待状态
    • 1 CANCELLED 表示被取消了
    • -1 SIGNAL 表示当前节点的后续节点需要被唤醒,也就是释放此资源,要通知其他人
    • -2 CONDITION 在条件队列里面等待
    • -3 PROPAGATE 释放共享资源时需要通知其他节点,这个是专属共享锁
    • 0 其他状态,在同步队列中,等待acquire
  • static final Node SHARED:用来标记该线程是在获取共享资源时被挂起的
  • static final Node EXCLUSIVE: 用来标记该线程是在获取独占资源时被挂起的
    思考:
  1. next和nextWaiter可能让人有点迷惑
  2. SHARED和EXCLUSIVE为什么类型是Node而不是用int或枚举这种?

Node方法

Node一共就5个方法,其中有3个是构造方法,另外一个是判断是否共享、一个是获取前驱节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean isShared() {
return nextWaiter == SHARED;
}
Node() { // Used to establish initial head or SHARED marker
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node(Thread thread, Node mode) { // Used by addWaiter 同步队列
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition 条件队列
this.waitStatus = waitStatus;
this.thread = thread;
}

如果对上面函数不太理解,可以先放一放,继续往下看

获取锁

如上面所说,获取锁其实就是获取state,请求这个资源。下面我们看具体源代码

  • acquire() 获取入口

    1
    2
    3
    4
    5
    6
    7
    public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 尝试获取锁,如果失败则
    // 1. 调用addWaiter 插入同步队列的,队尾插入
    // 2. 调用acquireQueued,在同步队列中自旋或者挂起。
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  • addWaiter 以当前线程作为参数新建节点加入队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node; // 此处有可能并发,所以并不保证后一个节点。
    return node;
    }
    }
    // 如果失败如其他线程加入,则进行自旋不断尝试加入队尾
    enq(node);
    return node;
    }

    注意失败有两种情况:

    • 一种是队列没有初始化tail为空
    • 一种是cas失败,也就是说其他线程插入到尾节点了
  • enq: 自旋加入队尾

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))// 重点
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }

    这段代码重点是初始化队列的方法,使用一个空节点作为head来初始化队列,这就是为什么很多地方处理都是从头结点的下一个节点开始
    头结点是空节点
    头结点是空节点

  • acquireQueued 同样也是一个循环自旋,去获取资源,获取到则进行出队操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false; // 中断标识
    for (;;) {
    final Node p = node.predecessor();
    // 如果前驱节点是头节点则重新设置头节点
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    // 如果不是,则判断该节点是否应该park挂起
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    private final boolean parkAndCheckInterrupt() {
    //使用LockSupport,挂起当前线程
    LockSupport.park(this);
    return Thread.interrupted();
    }

    重点分析下头节点的交换过程, p == head && tryAcquire(arg),这句话的表示当前线程的前驱节点是头节点,且当前线程已经获取到锁了则执行如下操作:
    AQS出队

    • head出列,释放head
    • 将当前线程节点作为head,并使之成为一个空节点
    • 这个过程不会产生并发,因为同一时间只有一个线程能获取锁。
  • shouldParkAfterFailedAcquire
    根据节点阻塞状态来判断是否park,进行到这一步,说明上一步失败,有两种情况导致失败,一是当前的前驱节点不是头结点,二是在尝试获取线程失败了,也就是有其他线程抢占到了资源。那么我们就去判断是否需要park(这里和自旋操作不一样了,不再是无限自旋浪费cpu,而是阻塞等待)
    下面的代码比较简单,就是根据node的waitStatus来判断:

    1. 如果前驱节点是无效节点,则放弃前驱节点,向上找一个有效节点作为前驱。

    2. 如果前驱节点是SIGNAL,则安全的park

    3. 其他状态则将前驱节点设为SIGNAL

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL)
      // 当前节点的前驱节点为SIGNAL,说明要唤醒当前线程,则当前线程可以安全的park
      return true;
      if (ws > 0) {
      // 如果节点被取消,则放弃前面的节点直到有效节点
      do {
      node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
      } else {
      //其他(这里只有0或者-3),需要设置前驱节点为SIGNAL
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
      }

      至此,完成一个请求锁(资源)的过程,总结获取锁的三个重要步骤及其原理

  1. 尝试获取锁,CAS操作
  2. 如果获取不到尝试加队列并自旋请求获取锁
  3. 使用LockSupport,减少不停的自旋带来的开销。

获取锁时独占和共享区别

共享模式下调用acquireShared(),步骤大体相似,我们重点看一下不同的地方。

  1. acquireShared
    1
    2
    3
    4
    public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 也是想try,只不过判断方式不一样,一般共享下,state表示剩余占有次数。
    doAcquireShared(arg); // 如果失败则加入同步队列并等待
    }
  2. doAcquireShared
    这个方法和独占锁相对应其实是 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))结合体,它将加入队列和阻塞等待、获取到锁后出列操作放到一起。之所以独占锁分成两个函数,是因为acquireQueued()函数在条件队列里也有调用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 加入同步队列
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) { // 自旋尝试获取锁
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r); // 出列并唤醒其他等待节点,这里叫传播,因为可能唤醒不止一个节点
    p.next = null; // help GC
    if (interrupted)
    selfInterrupt();
    failed = false;
    return;
    }
    }
    // 同样没有获取到则阻塞
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
  3. setHeadAndPropagate(node, r); 这个函数是重点不一样的地方,前面说过独占模式调用的是setHeader,进行一个头结点出列的处理。但是共享模式不一样,它是一个传播的方式。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private void setHeadAndPropagate(Node node, int propagate) { // node 前驱节点
    Node h = head; // Record old head for check below
    setHead(node); // 和独占锁一样,头结点出列
    // 但共享锁还有进一步操作,依序获取后继节点,如果是空或者是共享的,则依次释放
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared();
    }
    }

释放锁

释放锁过程相对来说更简单一点:

  • release
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    // 如果释放资源成功,则唤醒有效头结点
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
    这段代码重点关注if (h != null && h.waitStatus != 0),waitStatus在什么时候不为0?-3或者-1,而这两种状态都需要唤醒下一个线程,反过来讲,如果waitStatus为0,代表当前节点后面的节点没有挂起等待,那就不需要unpark了。
  • unparkSuccessor
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private void unparkSuccessor(Node node) {
    // 将头节点的状态改为初始状态
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

    //如果头节点的后继节点为无效状态则从队尾开始找并唤醒
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }

释放锁时独占和共享区别

  1. releaseShared

    1
    2
    3
    4
    5
    6
    7
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared(); // 同样只有稍有不同
    return true;
    }
    return false;
    }
  2. doReleaseShared

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private void doReleaseShared() {
    for (;;) { // 这里区别比较大,是一个循环,也就是达到传播的目的
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {// 如果是Signal,则重置当前节点并释放下一个节点
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed
    break;
    }
    }
  3. 总结
    共享和独占模式下,获取锁和释放锁都需要以传播的方式进行,也就是说只要拿到锁,并且可以共享获取,那么就扩散到其他节点,让其他节点也可以获取。
    借三个问题复习一下:
    ① 为什么从队尾往后找?
    ② 当一个head节点的waitStatus为0说明什么呢?
    ③ 为什么唤醒的是头结点的后继节点而不是头结点?

ConditionObject

正如上面所讲,notify和wait是配合synchronized内置锁实现线程间同步的基础一样,条件变量的signal和await方法也是配合aqs锁来实现线程间同步的。而且AQS的独占模式才支持ConditionObject

属性

1
2
private transient Node firstWaiter;
private transient Node lastWaiter;

只有两个属性,可以看出设计者的思想就是在队首和队尾操作,一个先进先出的队列。
而且注意都是以waiter结尾,其实跟Node中的属性nextWaiter对应,在条件队列中,只有nextWaiter有用,其prev和next都是null

await方法

  1. await

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public final void await() throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    Node node = addConditionWaiter();// 插入一个节点(当前线程)
    int savedState = fullyRelease(node); // 释放锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {// 如果不在同步队列中则一直挂起
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
    //通过acquireQueued来获取锁状态直至成功,该方法返回值指示该过程是否被中断
    // 到这一步说明已经在同步队列了,接下来就是去竞争获取锁了。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    //signal方法会将节点的nextWaiter字段赋值为null,但有可能是因为中断而取消的
    if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
    //根据中断模式,进行相应的处理
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }

    由上面可以看出await做了以下事情:

    1. 插入一个ConditionWaiter节点

    2. 释放锁

    3. 循环判断是否在同步队列中(因为有可能唤醒自己或者被其他唤醒),如果不在则park

    4. 如果在同步队列中则调用aqs的acquireQueued方法尝试获取资源(上节已有介绍)

    5. 后续处理

      接下来逐个解析源码

  2. addConditionWaiter
    插入一个节点,将当前线程插入条件队列,状态默认是CONDITION

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters(); // 如果队尾的节点状态不为CONDITION,触发清理
    t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
    firstWaiter = node;
    else
    t.nextWaiter = node;
    lastWaiter = node;
    return node;
    }

    从上面看到,都没有加锁或者cas操作,那是因为在独占模式下,进入到此处的已经获取锁了,不存在并发情况。

  3. fullyRelease
    释放资源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    final int fullyRelease(Node node) {
    boolean failed = true;
    try {
    int savedState = getState();
    if (release(savedState)) {// 释放锁,参考aqs解析
    failed = false;
    return savedState;
    } else {
    // 顺便提一下,release的返回值由tryRelease来决定,也就是子类来定义
    throw new IllegalMonitorStateException();
    }
    } finally {
    if (failed)
    node.waitStatus = Node.CANCELLED;
    }
    }
  4. isOnSyncQueue
    判断是否在同步队列中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     final boolean isOnSyncQueue(Node node) {
    // 只有一种情况同步队列prev==null,也就是头结点,但之前也说过,此时的头结点已经拿到锁了,所以不可能进入此(进入这个函数说明已经释放了锁)
    if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
    if (node.next != null) // If has successor, it must be on queue
    return true;
    /*
    * 这里为什么不用node.prev != null,因为插入同步队列的操作是先node.prev =
    * pred; 再compareAndSetTail(pred, node),也就是说cas之前prev是有可能
    * 有值的,这个时候使用node.pre搜出来并不代表它在队列中,
    * 但只有cas成功才算加入同步队列。
    * 同样下面搜索过程因为如果我们正好在cas放置为tail之后,设置pred.next=node之
    * 前,也就是说,如队尾的时候先cas成功,再设置它的前驱节点的next为此节点,那么当
    * 我们搜索的时候正好介于这两个步骤之间如果从前往后搜,则无法搜到
    * 所以需要从后往前搜
    */
    return findNodeFromTail(node);
    }
  5. 响应中断
    transferAfterCancelledWait方法在发生中断时,将节点从condition queue中转移到sync queue中去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
    }
    final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    enq(node);
    return true;
    }
    while (!isOnSyncQueue(node))
    Thread.yield();
    return false;
    }
  6. signal
    将一个节点(线程)从条件变量的阻塞队列(condition queue)中移动到同步队列中(sync queue),让该节点重新尝试获取资源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public final void signal() {
    if (!isHeldExclusively())//当前线程是否在独占资源
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignal(first); // 总是唤醒队首
    }
    // 首先修改队首,然后调用transferForSignal()(AbstractQueuedSynchronizer方法)到同步队列
    // 如果失败,则说明该节点被取消,则继续循环直到找到一个等待节点
    private void doSignal(Node first) {
    do {
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
    }
  7. doSignal

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private void doSignal(Node first) {
    do {
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
    // 更新头结点,直到成功,或者队列为空
    }
    final boolean transferForSignal(Node node) {
    /**
    * 可能存在竞争:
    * 另一个线程在执行await时,被中断,然后执行transferAfterCancelledWait方法;
    * 当前线程执行signal,然后执行次方法。这两个方法都会通过CAS操作将节的状态从CONDITION改为0
    * 或者节点的状态已经是CANCELLED
    **/
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
    // 将头结点加入同步队列,并unpark唤醒(记得上面await中的park吗)
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
    return true;
    }

    结语

    其他方式的唤醒和阻塞等待原理类似。
    再回顾一下重点:

  8. ConditionObject是只有独占锁才有的,因此学习AQS可以先将它放一边;

  9. prev和next是同步队列的,nextWaiter是条件队列的;

  10. 同步队列中,头结点永远是对标拿到锁的当前线程,但是个空节点;

  11. 同步队列,从后往前是可靠的,从前往后是不可靠的;

  12. AQS的实现主要在于锁如何操作,也就是state

作用 wait() park()
如何唤醒 notify,有顺序要求
休眠时状态 waiting
是否释放锁
是否响应中断 报错