AQS源码分析1——独占锁的获取与释放

AbstractQueuedSynchronizer

Posted by Jay on January 6, 2019

AQS源码分析1——独占锁的获取与释放

AQS(AbstractQueuedSynchronizer)是Java中众多锁以及并发工具的基础,其底层采用乐观锁,大量使用了CAS操作, 并且在失败时采用自旋方式重试,以实现轻量级和高效地获取锁。

AQS虽然被定义为抽象类,但事实上它并不包含任何抽象方法。这是因为AQS是被设计来支持多种用途,如果定义抽象方法,则子类在继承时必须要覆写所有的抽象方法,这显然是不合理的。所以AQS将一些需要子类覆写的方法都设计成protected方法,默认实现为抛出UnsupportedOperationException异常。如果子类使用到这些方法,但是没有覆写,则会抛出异常;如果子类没有使用到这些方法,则不需要做任何操作。

AQS中实现了锁的获取框架,锁的实际获取逻辑交由子类去实现,就独占锁的获取操作而言,子类必须重写 tryAcquire方法。

AQS核心实现

状态

在AQS中,状态是由state属性来表示的,是volatile类型的变量。

private volatile int state; // The synchronization state.

该属性的值表示了锁的状态,state为0表示锁没有被占用,state大于0表示当前已经有线程持有该锁,这里之所以说大于0而不说等于1是因为可能存在可重入的情况(独占锁)。

对于独占锁,同一时刻,锁只能被一个线程所持有。通过state变量是否为0,可以分辨当前锁是否被占用,但光知道锁是不是被占用是不够的,并不知道占用锁的线程。在AQS中,通过exclusiveOwnerThread属性来确定:

private transient Thread exclusiveOwnerThread; // 继承自AbstractOwnableSynchronizer

exclusiveOwnerThread属性的值即为当前持有锁的线程。

队列

AQS中,队列的实现是一个双向链表,被称为sync queue,它表示所有等待锁的线程的集合。在并发编程中使用队列,通常是将当前线程包装成某种类型的数据结构并放到等待队列中,先来看看队列中每一个节点的结构:

static final class Node {
    static final Node SHARED = new Node(); // 共享模式
    static final Node EXCLUSIVE = null; // 独占模式

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

这个结构的属性可以分为下面4类:

// 节点所代表的线程
volatile Thread thread;

// 双向链表,每个节点需要保存自己的前驱节点和后继节点的引用
volatile Node prev;
volatile Node next;

// 线程所处的等待锁的状态,初始化时waitStatus为0
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 该属性用于condition条件队列时,表示下一个节点;用于共享锁时,表示节点处于共享模式
Node nextWaiter;

注意,在这个Node类中也有一个状态变量waitStatus,它表示了当前Node所代表的线程等待锁的状态,在独占锁模式下,只需关注CANCELLED SIGNAL两种状态即可。这里还有一个nextWaiter属性,在独占锁模式下永远为null,仅仅起到一个标记作用,没有实际意义。

描述完队列中的节点,对于sync queue,既然是双向链表,操纵它自然只需一个头结点和一个尾节点:

// 头结点,不代表任何线程,是一个哑结点dummy node
private transient volatile Node head;

// 尾节点,每一个请求锁的线程会加到队尾
private transient volatile Node tail;

于是可以得到sync queue的结构

在AQS中的队列是一个CLH队列,它的head节点永远是一个哑结点(dummy node),它不代表任何线程(某些情况下可以看做是代表了当前持有锁的线程),因此head所指向的Node的thread属性永远是null。只有从次节点往后的所有节点才代表了所有等待锁的线程。因此,在当前线程没有抢到锁被包装成Node扔到队列中时,即使队列是空的,它也会排在第二个,我们会在它的前面新建一个dummy节点(具体代码在后面分析源码时再说)。为了便于描述,下文中把除去head节点的队列称作是等待队列,在这个队列中的节点才代表了所有等待锁的线程:

在继续分析之前再对着上图总结一下Node节点各个参数的含义:

  • thread:表示当前Node所代表的线程。
  • waitStatus:表示节点所处的等待状态,独占锁模式下关注SIGNAL CANCELLED 这两种状态,共享锁模式下关注SIGNAL CANCELLED 初始态(0)三种状态。
  • prev、next:当前节点的前驱和后继节点。
  • nextWaiter:作为标记,值永远为null,表示当前处于独占锁模式。
CAS操作

CAS操作大对数是用来改变状态state的,且一般在静态代码块中初始化需要CAS操作的属性的偏移量:

// AQS源码
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

从这个静态代码块中也可以看出,CAS操作主要针对5个属性,包括AQS的3个属性state/head/tail,以及Node对象的两个属性waitStatus/next,这说明5个属性是会被多个线程同时访问的。

得到了属性的偏移量之后,接下来就是CAS操作本身:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

最终CAS操作调用的是Unsafe类的compareAndSwapXXX方法。

AQS核心属性

前面分析了AQS的状态、队列和CAS操作,对这个工具类有了初步的认识。接下来在进入源码分析之前,先来总结下AQS核心属性:

(1)锁相关的属性

// 锁的状态
private volatile int state; 
// 当前持有锁的线程,注意这个属性是从AbstractOwnableSynchronizer继承而来
private transient Thread exclusiveOwnerThread; 

(2)sync queue相关的属性

private transient volatile Node head; // 队头,为dummy node
private transient volatile Node tail; // 队尾,新入队的节点

(3)队列中的Node中需要关注的属性(三组)

// 节点所代表的线程
volatile Thread thread;

// 双向链表,每个节点需要保存自己的前驱节点和后继节点的引用
volatile Node prev;
volatile Node next;

// 线程所处的等待锁的状态,初始化时为0
volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;

独占锁的获取

前面已经提到,AQS大多数情况下都是通过继承来使用的,子类通过覆写 tryAcquire 来实现自己的获取锁的逻辑,这里以ReentrantLock为例来说明独占锁的获取流程。值得注意的是, ReentrantLock公平锁非公平锁 两种实现,默认实现为非公平锁,这体现在它的构造函数中。FairSync 继承自 Sync,而Sync继承自 AQSReentrantLock获取锁的逻辑是直接调用了 FairSync 或者 NonfairSync的逻辑。这里以FairSync为例, 来逐行分析独占锁的获取:

// ReentrantLock公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    // 获取锁
    final void lock() {
        acquire(1);
    }
    ...
}

lock()方法调用的 acquire()方法来自父类AQS。下面给出完整的获取锁的流程图,再逐行分析代码。

acquire

acquire定义在AQS类中,描述了获取锁的流程。

// 获取独占锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法中涉及了四个方法的调用:

(1)tryAcquire(arg)

该方法由继承AQS的子类实现,为获取锁的具体逻辑。

(2)addWaiter(Node mode)

该方法由AQS实现,在尝试获取锁失败后调用,将当前请求锁的线程包装成Node放到sync queue中去,并返回这个Node。

(3)acquireQueued(final Node node, int arg)

该方法由AQS实现,这个方法比较复杂,主要对上面刚加入队列的Node不断尝试以下两种操作之一:

  • 在前驱节点就是head节点的时候,继续尝试获取锁;
  • 将当前线程挂起,使CPU不再调度它。

(4)selfInterrupt()

该方法由AQS实现,用于中断当前线程。由于在整个抢锁过程中是不响应中断的,如果在抢锁的过程中发生了中断,为了记录下中断的状态,AQS的做法是简单的记录有没有发生过中断,如果返回的时候发现曾经发生过中断,则在退出acquire方法之前,就调用selfInterrupt()进行自我中断,就好像将这个发生在抢锁过程中的中断“推迟”到抢锁结束以后再发生一样。

从上面的介绍中可以看出,除了获取锁的逻辑 tryAcquire(arg)由子类实现外,其余方法均由AQS实现。

tryAcquire

tryAcquire 获取独占锁的逻辑很简单,就是判断当前锁有没有被占用:

  1. 如果锁没有被占用, 尝试以公平的方式获取锁;
  2. 如果锁已经被占用,检查是不是锁重入;
  3. 获取锁成功返回true,失败则返回false
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 首先获取当前锁的状态
    int c = getState(); 
    
    // c=0 说明当前锁是可获取的,没有被任何线程占用,可以尝试获取。
    // 因为是公平锁, 所以在抢占之前首先看看队列中有没有排在自己前面的Node,
    // 如果没有人在排队, 则通过CAS方式获取锁, 就可以直接退出了。
    if (c == 0) {
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current); // 将当前线程设置为占用锁的线程
            return true;
        }
    }
    
    // 如果 c>0 说明锁已经被占用了。
    // 对于可重入锁,这个时候检查占用锁的线程是不是就是当前线程,是则说明已经拿到了锁,直接重入。
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    
    // 到这里说明有人占用了锁,并且占用锁的不是当前线程,则获取锁失败。
    return false;
}

从这里可以看出,获取锁的过程主要就是一件事:state状态通过CAS操作由0改写成1

由于是CAS操作,只有一个线程能执行成功。则执行成功的线程即获取了锁,在这之后,才有权利将exclusiveOwnerThread的值设成自己。另外对于可重入锁,如果当前线程已经是获取了锁的线程了,它还要注意增加锁的重入次数。

注意:这里修改state状态的操作,一个用了CAS方法compareAndSetState,一个用了普通的setState方法。这是因为用CAS操作时,当前线程还没有获得锁,所以可能存在多线程同时在竞争锁的情况;而调用setState方法时,是在当前线程已经是持有锁的情况下,因此对state的修改是安全的,只需要普通的方法就可以了。

addWaiter

如果执行到此方法,说明前面尝试获取锁的tryAcquire已经失败了。既然获取锁已经失败了,就要将当前线程包装成Node,加到等待锁的队列中去。因为是FIFO队列,所以自然是直接加在队尾。方法调用为:

addWaiter(Node.EXCLUSIVE) // 添加节点
private Node addWaiter(Node mode) {
    // 将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), mode); 
    // 这里用注释的形式把Node的构造函数贴出来
    // 因为传入的mode值为Node.EXCLUSIVE,所以节点的nextWaiter属性被设为null
    /*
        static final Node EXCLUSIVE = null;
        
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    */
    Node pred = tail;
    // 如果队列不为空, 则用CAS方式将当前节点设为尾节点(快速尝试)
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    // 代码会执行到这里, 只有两种情况:
    //  1. 队列为空
    //  2. CAS失败
    // 注意,这里是并发条件下,所以什么都有可能发生,尤其注意CAS失败后也会来到这里。
    enq(node); // 将节点插入队列
    return node;
}

可见,每一个处于独占锁模式下的节点,它的nextWaiter一定是null。在这个方法中,首先会尝试直接入队,但是因为目前是在并发条件下,所以有可能同一时刻,有多个线程都在尝试入队,导致compareAndSetTail(pred, node)操作失败——因为有可能其他线程已经成为了新的尾节点,导致尾节点不再是我们之前看到的那个pred了。如果入队失败了,接下来我们就需要调用enq(node)方法,在该方法中我们将通过自旋+CAS的方式,确保当前节点入队。

enq

能执行到这个方法,说明当前线程获取锁已经失败了,已经把它包装成一个Node,准备把它放到等待队列中去,但是在这一步又失败了。这个失败的原因可能是以下两种之一:

  1. 等待队列现在是空的,没有线程在等待。
  2. 其他线程在当前线程入队的过程中率先完成了入队,导致尾节点的值已经改变了,CAS操作失败。

enq方法中使用了死循环, 即以自旋方式将节点插入队列,如果失败则不停的尝试,直到成功为止。另外,该方法也负责在队列为空时初始化队列,这也说明,队列是延时初始化的(lazily initialized)

// 入队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果是空队列, 首先进行初始化
        // 这里也可以看出, 队列不是在构造的时候初始化的, 而是延迟到需要用的时候再初始化, 以提升性能
        if (t == null) { 
            // 注意,初始化时使用new Node()方法新建了一个dummy节点
            if (compareAndSetHead(new Node()))
                tail = head; // 这里仅仅是将尾节点指向dummy节点,并没有返回
        } else {
        	// 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

这里尤其要注意的是,当队列为空时,初始化队列并没有使用当前传进来的节点,而是新建了一个空节点。在新建完空的头节点之后,并没有立即返回,而是将尾节点指向当前的头节点,然后进入下一轮循环。在下一轮循环中,尾节点已经不为null了,此时再将包装了当前线程的Node加到这个空节点后面。 这意味着在这个等待队列中,头结点是一个“哑节点”,它不代表任何等待的线程。(head节点不代表任何线程,它是一个空节点)

尾分叉

在继续往下之前,先分析enq方法中一个比较有趣的现象(尾分叉)。着重看下将当前节点设置成尾节点的操作:

} else {
	// 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
    node.prev = t;
    if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
    }
}

将一个节点node添加到sync queue的尾部需要三步:

  1. 设置node的前驱节点为当前的尾节点:node.prev = t
  2. 修改tail属性,使它指向当前节点;
  3. 修改原来的尾节点,使它的next指向当前节点。

但是需要注意的,这里的三步并不是一个原子操作,第一步很容易成功;而第二步由于是一个CAS操作,在并发条件下有可能失败,第三步只有在第二步成功的条件下才执行。这里的CAS保证了同一时刻只有一个节点能成为尾节点,其他节点将失败,失败后将回到for循环中继续重试。

所以当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这三步,而其他线程只能完成第一步,于是就出现了尾分叉:

注意,这里第三步是在第二步执行成功后才执行的,这就意味着,有可能即使已经完成了第二步,将新的节点设置成了尾节点,此时原来旧的尾节点的next值可能还是null(因为还没有来的及执行第三步),所以如果此时有线程恰巧从头节点开始向后遍历整个链表,则它是遍历不到新加进来的尾节点的,但是这显然是不合理的,因为现在的tail已经指向了新的尾节点。 另一方面,当完成了第二步之后,第一步一定是完成了的,所以如果我们从尾节点开始向前遍历,已经可以遍历到所有的节点。这也就是为什么在AQS相关的源码中,有时候常常会出现从尾节点开始逆向遍历链表——因为一个节点要能入队,则它的prev属性一定是有值的,但是它的next属性可能暂时还没有值。

至于那些“分叉”的入队失败的其他节点,在下一轮的循环中,它们的prev属性会重新指向新的尾节点,继续尝试新的CAS操作,最终所有节点都会通过自旋不断的尝试入队,直到成功为止。

addWaiter总结

至此,完成了addWaiter(Node.EXCLUSIVE)方法完整的分析,该方法并不设计到任何关于锁的操作,它就是解决了并发条件下的节点入队问题。具体来说是该方法保证了将当前线程包装成Node节点加入到等待队列的队尾,如果队列为空,则会新建一个哑节点作为头节点,再将当前节点接在头节点的后面。

addWaiter(Node.EXCLUSIVE)方法最终返回了代表了当前线程的Node节点,在返回的那一刻,这个节点必然是当时的sync queue的尾节点。

值得注意的是,enq方法也是有返回值(虽然这里并没有使用它的返回值),但是它返回的是node节点的前驱节点,这个返回值虽然在addWaiter方法中并没有使用,但是在其他地方会被用到。

再回到获取独占锁的逻辑中:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter(Node.EXCLUSIVE)执行完毕后,节点现在已经被成功添加到sync queue中了,接下来将执行acquireQueued方法。

acquireQueued

该方法是最复杂的一个方法,看代码之前首先简单的说明几点:

(1) 能执行到该方法,说明addWaiter 方法已经成功将包装了当前Thread的节点添加到了等待队列的队尾; (2) 该方法中将再次尝试去获取锁; (3) 在再次尝试获取锁失败后,判断是否需要把当前线程挂起。

前面获取锁失败了,这里还要再次尝试获取锁:

​ 这里再次尝试获取锁是基于一定的条件的,即当前节点的前驱节点就是head节点。head节点就是个哑节点,它不代表任何线程,或者代表了持有锁的线程,如果当前节点的前驱节点就是head节点,那就说明当前节点已经是排在整个等待队列最前面的了。

// 排队获取锁
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 前驱结点
            final Node p = node.predecessor();
            // 在当前节点的前驱就是head节点时, 再次尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 在获取锁失败后, 判断是否需要把当前线程挂起
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先看再次获取锁的代码:

final Node p = node.predecessor();
// 在当前节点的前驱就是head节点时, 再次尝试获取锁
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

首先获取节点的前驱节点,如果前驱节点就是head节点,那说明当前线程已经排在了等待队列的最前面,再试着去获取锁。如果这一次获取成功了,即tryAcquire方法返回了true,则将进入if代码块,调用setHead方法:

// 设置头结点,前头结点出队列
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

这个方法将head指向传进来的node,并且将node的thread和prev属性置为null。

可以看出,这个方法的本质是丢弃原来的head,将head指向已经获得了锁的node。但是接着又将该node的thread属性置为null了,这某种意义上导致了这个新的head节点又成为了一个哑节点,它不代表任何线程。这么做的原因是因为在tryAcquire调用成功后,exclusiveOwnerThread属性已经记录了当前获取锁的线程了,此处没有必要再记录。这某种程度上就是将当前线程从等待队列里面拿出来了,是一个变相的出队操作。

还有另外一个特点是,这个setHead方法只是个普通方法,并没有像之前enq方法中那样采用compareAndSetHead方法,因为这里不会产生竞争。

enq方法中,当我们设置头节点的时候,是新建一个哑节点并将它作为头节点,这个时候,可能多个线程都在执行这一步,因此我们需要通过CAS操作保证只有一个线程能成功。

acquireQueued方法里,由于在调用到setHead的时,已经通过tryAcquire方法获得了锁,这意味着:

  1. 此时没有其他线程在创建新的头节点——因为很明显此时队列并不是空的,不会执行到创建头节点的代码;
  2. 此时能执行setHead的只有一个线程——因为要执行到setHead, 必然是tryAcquire已经返回了true, 而同一时刻,只有一个线程能获取到锁。

综上,在整个if语句内的代码即使不加锁,也是线程安全的,不需要采用CAS操作。

接下来再看另一种情况,即p == head && tryAcquire(arg)返回false,此时需要判断是否需要将当前线程挂起:

shouldParkAfterFailedAcquire

该方法用于决定在获取锁失败后,是否将线程挂起。决定的依据是前驱节点的waitStatus值。

waitStatus有如下状态值:

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

一共有四种状态,在独占锁的获取过程中,只用到了其中的两个——CANCELLEDSIGNAL。 当然,前面在创建节点的时候并没有给waitStatus赋值,因此每一个节点最开始的时候waitStatus的值都被初始化为0,即不属于上面任何一种状态。

  • CANCELLED: 表示Node所代表的线程已经取消了排队,即放弃获取锁了。

  • SIGNAL: 不是表征当前节点的状态,而是当前节点的下一个节点的状态。

    1.当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继节点)已经被挂起了(或者马上就要被挂起了),因此在当前节点释放了锁或者放弃获取锁时,如果它的waitStatus属性为SIGNAL,它还要完成一个额外的操作——唤醒它的后继节点。

    2.SIGNAL这个状态的设置常常不是节点自己给自己设的,而是后继节点设置的。当决定要将一个线程挂起之前,首先要确保自己的前驱节点的waitStatusSIGNAL,这就相当于给自己设一个闹钟再去睡,这个闹钟会在恰当的时候叫醒自己。

  • CONDITION: 与Condition相关,该标识的结点处于条件队列中,结点的线程等待在Condition上,当其他线程调用了Conditionsignal()方法后,CONDITION状态的结点将从条件队列转移到同步队列中,等待获取同步锁。

  • PROPAGATE: 在共享模式中头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播。

在理解了这些状态之后,看shouldParkAfterFailedAcquire的执行过程:

// 判断是否阻塞线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 获得前驱节点的ws
    if (ws == Node.SIGNAL)
        // 前驱节点的状态已经是SIGNAL,可以直接park
        return true;
    if (ws > 0) {
        // 前驱节点的 ws > 0,则为 Node.CANCELLED,说明前驱节点已经取消了等待锁(由于超时或者中断等
        // 原因)
        // 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点
        // 然后跨过这些不等待锁的节点, 直接排在等待锁的节点的后面(这里存在node节点的调整)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 前驱节点的状态既不是SIGNAL,也不是CANCELLED
        // 用CAS设置前驱节点的ws为Node.SIGNAL,返回false,再次尝试获取锁
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

可以看出shouldParkAfterFailedAcquire的执行逻辑如下:

  • 如果前驱节点的waitStatus值为 Node.SIGNAL ,则直接返回 true;
  • 如果前驱节点的waitStatus值为 Node.CANCELLED (ws > 0), 则跳过那些ws > 0的节点,重新寻找正常等待中的前驱节点,然后排在它后面,返回false;
  • 其他情况,将前驱节点的状态改为 Node.SIGNAL,返回false。

这个方法只有在当前节点的前驱节点的waitStatus状态本身就是SIGNAL的时候才会返回true,其他时候都会返回false。当shouldParkAfterFailedAcquire返回false后,会继续回到循环中再次尝试获取锁——这是因为此时前驱节点可能已经变了(shouldParkAfterFailedAcquire调整了node节点)。

shouldParkAfterFailedAcquire返回true,即当前节点的前驱节点的waitStatus状态已经设为SIGNAL后,就可以安心的将当前线程挂起了,此时将调用parkAndCheckInterrupt

parkAndCheckInterrupt

到这个函数已经是最后一步了,就是将线程挂起,等待被唤醒。

// 阻塞当前线程,获取中断状态
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 线程被挂起,停在这里不再往下执行
    return Thread.interrupted();
}

注意LockSupport.park(this)执行完成后线程就被挂起了,除非其他线程unpark了当前线程,或者当前线程被中断了,否则代码是不会再往下执行的,后面的Thread.interrupted()也不会执行。

独占锁获取的总结
  1. AQS中用state属性表示锁,如果能成功将state属性通过CAS操作从0设置成1即获取了锁;
  2. 获取了锁的线程才能将exclusiveOwnerThread设置成自己;
  3. addWaiter负责将当前等待锁的线程包装成Node,并成功地添加到队列的末尾,这一点是由它调用的enq方法保证的,enq方法同时还负责在队列为空时初始化队列;
  4. acquireQueued方法用于在Node成功入队后,继续尝试获取锁(取决于Node的前驱节点是不是head),或者将线程挂起;
  5. shouldParkAfterFailedAcquire方法用于保证当前线程的前驱节点的waitStatus属性值为SIGNAL,从而保证了自己挂起后,前驱节点会负责在合适的时候唤醒自己;
  6. parkAndCheckInterrupt方法用于挂起当前线程,并检查中断状态;
  7. 如果最终成功获取了锁,线程会从lock()方法返回,继续往下执行;否则,线程会阻塞等待。

独占锁的释放

Java的内置锁在退出临界区之后自动释放锁,但是ReentrantLock这样的显式锁是需要自己显式的释放的,所以在加锁之后需要在finally块中进行显式的锁释放:

Lock lock = new ReentrantLock();
lock.lock();
try {
    // ...
} finally {
    lock.unlock();
}

下面以ReentrantLock的锁释放为例来分析独占锁的释放过程。由于锁的释放操作对于公平锁和非公平锁都是一样的,所以unlock的逻辑并没有放在 FairSyncNonfairSync 里面,而是直接定义在 ReentrantLock类中:

public void unlock() {
    // 释放独占锁
    sync.release(1);
}
release

release方法定义在AQS类中,描述了释放锁的流程。

// 注意:release()是根据 tryRelease()的返回值 来判断该线程是否已经完全释放掉了资源(注意tryRelease()
// 方法的返回值 int)
public final boolean release(int arg) {
    if (tryRelease(arg)) { // 尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 唤醒后继节点
        return true;
    }
    return false;
}

可以看出,相比于获取锁的acquire方法,释放锁的过程要简单很多,只涉及到两个子函数的调用:

  • tryRelease(arg)——该方法由继承AQS的子类实现,为释放锁的具体逻辑;
  • unparkSuccessor(h)——唤醒后继线程。

下面分别分析这两个子函数的逻辑。

tryRelease

tryRelease方法由ReentrantLock的静态类Sync实现。此外,相比获取锁的操作,这里并没有使用任何CAS操作,也是因为当前线程已经持有了锁,所以可以直接安全的操作,不会产生竞争。

// 注意:release()是根据tryRelease()的返回值来判断该线程是否已经完全释放掉了资源 正值、0、负值
// 完全释放了锁,返回true;否则返回false。
protected final boolean tryRelease(int releases) {
    
    // 首先将当前持有锁的线程状态减1(回溯到调用源头sync.release(1)可知, releases的值为1)
    // 这里的操作主要是针对可重入锁的情况下, c可能大于1
    int c = getState() - releases; 
    
    // 释放锁的线程当前必须是持有锁的线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    
    // 如果c为0了, 说明锁已经完全释放了
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
unparkSuccessor
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

锁成功释放之后,接下来就是唤醒后继节点,这个方法同样定义在AQS中。值得注意的是,在成功释放锁之后(tryRelease 返回 true之后),唤醒后继节点只是一个 “附加操作”,无论该操作结果怎样,最后 release操作都会返回 true

接下来是unparkSuccessor的源码:

// 唤醒后继节点
private void unparkSuccessor(Node node) { // node = head
    int ws = node.waitStatus;
    
    // 如果head节点的ws<0, 则直接将它设为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 通常情况下,要唤醒的节点就是自己的后继节点,
    // 如果后继节点存在且也在等待锁, 那就直接唤醒它。
    // 但是有可能存在 后继节点取消等待锁 的情况,
    // 此时 从尾节点开始向前找起 ,直到找到距离head节点最近的ws<=0的节点。
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t; // 注意! 这里找到了之并有return,而是继续向前找
    }
    // 如果找到了还在等待锁的节点,则唤醒它
    if (s != null)
        LockSupport.unpark(s.thread);
}

上文分析了 shouldParkAfterFailedAcquire 方法执行的逻辑,提到了当前节点的前驱节点的 waitStatus 属性,该属性决定了是否要挂起当前线程,并且如果一个线程被挂起了,它的前驱节点的 waitStatus值必然是Node.SIGNAL。在唤醒后继节点的操作中, 也需要依赖于节点的waitStatus值。下面详细分析unparkSuccessor方法的逻辑:

1.首先,传入该方法的参数node就是头节点head,并且条件是h != null && h.waitStatus != 0

  • h != null 条件必须满足;

  • ` h.waitStatus != 0`的含义:

    waitStatus为0的情况:

    shouldParkAfterFailedAcquire 方法中将前驱节点的 waitStatus设为Node.SIGNAL;而新建一个节点的时候,在addWaiter函数中,当将一个新的节点添加进队列或者初始化空队列的时候,都会新建节点,而新建的节点的waitStatus在没有赋值的情况下都会初始化为0。

    所以当一个head节点的waitStatus为0时,说明这个head节点后面没有在挂起等待中的后继节点了(如果有的话,head的ws就会被后继节点设为Node.SIGNAL了),自然也就不要执行 unparkSuccessor 操作了。

2.从尾节点开始逆向查找,而不是直接从head节点往后正向查找的原因:

  • 从后往前找是基于if (s == null || s.waitStatus > 0)条件的,即后继节点不存在或者后继节点取消了排队,这一条件大多数条件下是不满足的。因为虽然后继节点取消排队很正常,但是通过上面介绍的shouldParkAfterFailedAcquire方法可知,节点在挂起前,都会给自己找一个waitStatus状态为SIGNAL的前驱节点,而跳过那些已经cancel掉的节点。所以,这个从后往前找的目的其实是为了照顾刚刚加入到队列中的节点,这个与上面提到的尾分叉有关。
// sync queue添加节点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //将当前线程包装成Node
    Node pred = tail;
    // 如果队列不为空, 则用CAS方式将当前节点设为尾节点
    if (pred != null) {
        node.prev = pred; // step 1, 设置前驱节点
        if (compareAndSetTail(pred, node)) { // step2, 将当前节点设置成新的尾节点
            pred.next = node; // step 3, 将前驱节点的next属性指向自己
            return node;
        }
    }
    enq(node); 
    return node;
}

仔细看上面这段代码,可以发现节点入队不是一个原子操作。虽然用了compareAndSetTail操作保证了当前节点被设置成尾节点,但是只能保证此时step1和step2是执行完成的,有可能在step3还没有来的及执行到的时候,unparkSuccessor方法就开始执行了,此时pred.next的值还没有被设置成node,所以从前往后遍历的话是遍历不到尾节点的,但是因为尾节点此时已经设置完成,node.prev = pred操作也被执行过了,也就是说,如果从后往前遍历的话,新加的尾节点就可以遍历到了,并且可以通过它一直往前找。

所以之所以从后往前遍历是因为,处于多线程并发的条件下,如果一个节点的next属性为null, 并不能保证它就是尾节点(可能是因为新加的尾节点还没来得及执行pred.next = node),但是一个节点如果能入队,则它的prev属性一定是有值的,所以反向查找一定是最精确的。

最后在调用了 LockSupport.unpark(s.thread) 后唤醒了线程。回到park的代码:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 唤醒之后继续往下执行
    return Thread.interrupted();
}

线程从LockSupport.park(this);唤醒后将接着往下执行。这里有两个线程:一个是正在释放锁的线程,并调用了LockSupport.unpark(s.thread) 唤醒了另外一个线程; 而这个另外一个线程,就是因为抢锁失败而被阻塞在LockSupport.park(this)处的线程。这个被阻塞的线程被唤醒后,将调用 Thread.interrupted()并返回。

Thread.interrupted()这个函数将返回当前正在执行的线程的中断状态,并清除它。接着再返回到parkAndCheckInterrupt被调用的地方:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true; // 这里
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

返回到这个if语句

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;

可见,如果Thread.interrupted()返回true,则 parkAndCheckInterrupt()就返回true,if条件成立,interrupted状态将设为true;如果Thread.interrupted()返回false,则 interrupted 仍为false

再接下来又回到了for(;;) 死循环的开头,进行新一轮的抢锁。假设这次抢到了,将从 return interrupted处返回到acquireQueued的调用处:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果acquireQueued的返回值为true,将执行 selfInterrupt()

// 自我中断
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

而它的作用就是中断当前线程。这么做的原因是:并不知道线程被唤醒的原因。具体来说,当从LockSupport.park(this)处被唤醒,并不知道是因为什么原因被唤醒,可能是因为别的线程释放了锁,调用了 LockSupport.unpark(s.thread)也有可能是因为当前线程在等待中被中断了,因此通过Thread.interrupted()方法检查了当前线程的中断标志,并将它记录下来,在最后返回acquire方法后,如果发现当前线程曾经被中断过,那就把当前线程再中断一次。

从上面的代码中知道,即使线程在等待资源的过程中被中断唤醒,它还是会不依不饶的再抢锁,直到它抢到锁为止。也就是说,它是不响应这个中断的,仅仅是记录下自己被人中断过。最后当它抢到锁返回了,如果它发现自己曾经被中断过,它就再中断自己一次,将这个中断补上。

注意:中断对线程来说只是一个建议,一个线程被中断只是其中断状态被设为true,线程可以选择忽略这个中断,中断一个线程并不会影响线程的执行。

最后再说明下,事实上在从return interrupted;处返回时并不是直接返回的,因为还有一个finally代码块:

finally {
    if (failed)
        cancelAcquire(node);
}

它做了一些善后工作,但是条件是failedtrue,而从前面的分析中知道,要从for(;;)中跳出来,只有一种可能,那就是当前线程已经拿到了锁,因为整个争锁过程我们都是不响应中断的,所以不可能有异常抛出,既然是拿到了锁,failed就一定是false,所以这个finally块在这里实际上并没有什么用,它是为响应中断式的抢锁所服务的。

参考