六、并发包中其他锁

独占锁ReentrantLock

ReentrantLock是一个可重入的独占锁,独占锁说明同一个时间,只有一个线程占用,用的场景非常多,也是最常见的一个中锁,其state在0时表示当前锁没有别线程占有,其他值则表示重入次数。首先看一下ReentrantLock的类图:
ReentrantLock类图
从图中可以看出,其内部用的同步锁Sync也是继承自AQS,所以我们大胆推测,其原理与AQS一本相同。
我们注意到,ReentrantLock有一个构造函数,也就是入参是否公平锁,从Sync子类也可看出,ReentrantLock提供了公平和非公平的实现。下面我们跟着函数源码去了解其原理

获取锁lock

当一个线程调用lock方法,其尝试获取锁

1
2
public void lock() {
sync.lock();// 调用同步器的lock方法}

其实现是由子类来实现的:

1
2
3
4
5
6
7
8
9
final void lock() {  // 非公平锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final void lock() { // 公平锁
acquire(1); // 这个方法来自AQS,忘记的可以往前翻翻
}

如上,公平锁和非公平锁不同之处,在于非公平锁会首先直接取用CAS去抢占,抢占不到再调用acquire(1).这里怎么理解呢?原因是,非公平的概念是不管先来后到,能拿到锁的就是好汉,所以,非公平锁会直接去拿。后面的tryAcquire也是同样的道理。

  • tryAcquire
    AQS中的acquire会首先调用子类的tryAcquire来判断是否拿到锁,没拿到再放入等待循环请求或等待。
    重点关注子类的实现。

    这里有一点要说明:非公平锁的tryAcquire放在了Sync类里实现,而公平锁则由其子类FairSync来实现。原因是无论公平锁还是非公平锁,调用tryLock时,调用的是同一个方法也即是父类Sync的nonfairTryAcquire方法,也就是说无论公平还是非公平锁,使用tryLock函数时都是采用的非公平策略,线程空闲则直接获取,否则失败。

    我们首先关注下nonfairTryAcquire方法,它是一个非阻塞的方法,如果获取到则返回true,获取不到则返回false。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 这里直接cas和非公平锁lock方法逻辑一样
    if (compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 如果不是0,则判断是否是同一个线程进行重入
    else if (current == getExclusiveOwnerThread()) {
    // zhon
    int nextc = c + acquires; // 可重入此时增加
    if (nextc < 0) // overflow 说明次数溢出,超出限制
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    再来看下公平策略下的抢占方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    // 其他代码都是一样的,只有这里稍有不同,也就是判断前驱节点是否存在,不存在才能抢占,也就是先来后到不能插队。
    if (!hasQueuedPredecessors() &&
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

    回到acquire,如果抢占失败则执行AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))系列方法放入同步队列并等待。这里不再详述。

    获取锁unlock

    释放锁unlock() 调用的是sync.release(1);,这个也是AQS的方法,其也是先调用子类的tryRelease(1),如果成功,则判断是否需要唤醒后继节点,如果是则唤醒。我们重点讲下子类的tryRelease方法,它的实现实在Sync里的,所以公平和非公平策略都一样。

  1. tryRelease

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
        protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 状态值减1,也即是可重入次数。
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // 如果状态值为0,代表该线程已经完全释放锁了。
    free = true;
    setExclusiveOwnerThread(null); // 清空当前锁的线程
    }
    setState(c);
    return free;// 返回释放结果
    }
    ```
    可以看到,这里释放主要是修改状态值,如果状态值达到0了,则需要进行下一步也就是唤醒
    ## 案例介绍
    CopyOnWriteList
    # 读写锁ReentrantReadWriteLock
    ReentrantLock锁可以解决大部分并发问题,但是它是一个独占锁,也就是同一时刻只有一个线程持有,这在实际场景中可能性能不佳。其中读多写少的场景就无法满足,因此ReentrantReadWriteLock就产生了,这是一个单线程写,多线程多的实现。
    其类图如下:
    ![读写锁.PNG](/images/thread/读写锁.png)
    其类图比较复杂,但是其实核心还是Sync,只是,读写锁包含两把锁readerLock和writerLock来控制读写
    ,其sync作用只是用来作为参数构造这两把锁。同独占锁一样,读写锁也支持公平和非公平策略,所以其也有带参数的构造函数。
    接下来我们跟着代码看
    ## 结构
    CopyOnWriteList 结构其实很简单
    1、属性主要是readerLock和writerLock,而它们的核心是Sync。
    2、方法基本都是getXXX方法
    3、ReadLock、WriteLock也很简单
    + 构造函数

    public ReentrantReadWriteLock(boolean fair) {

    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);

    }

    1
    2
        从函数里面看出来,readerLock和writerLock以同一个sync(从这可以看出来这个类非常重要)作为参数进行构建,FairSync和NonfairSync方法大都来自父类Sync,只有writerShouldBlock和readerShouldBlock稍有不同。
    + ReadLock

    // 忽略相似的响应超时和中断的lock方法
    public static class ReadLock implements Lock, java.io.Serializable {

    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquireShared(1);  // 共享锁的方式
    }
    public boolean tryLock() {
        return sync.tryReadLock();  // 读锁
    }
    public void unlock() {
        sync.releaseShared(1);
    }
    
    public Condition newCondition() {
        throw new UnsupportedOperationException(); // 共享锁不支持条件队列
    }

    }

    1
    + ReadLock

    // 忽略相似的响应超时和中断的lock方法
    public static class WriteLock implements Lock, java.io.Serializable {

    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquire(1);  // 独占方式
    }
    public boolean tryLock() {
        return sync.tryWriteLock(); // 写锁
    }
    public void unlock() {
         sync.release(1);
    }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    看了上面的代码发现,两个锁都很简单,一个就是独占方式用来写数据,此操作不支持同时,一个就是共享方式来实现多线程读,实现的逻辑都是其引用变量sync来实现的,很容易理解。关键的是它们的核心公共参数Sync。
    ## 核心类Sync
    但其实Sync说难也不难,一共就400行代码,接下来我们一起逐行来分析源码
    ### 属性
    + static final int SHARED_SHIFT = 16;
    + static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1左移16位=1*2^16=65536
    + static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 65535
    + static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 16个1
    + static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 共享锁数量
    + static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 独占锁可重入次数

    >注意 c>>>SHARED_SHIFT c是一个4字节32位整型 >>>表示无符号右移,右移16位,其实就是获取其高16位的值
    再看 c & EXCLUSIVE_MASK,这是一个短连接,也就是获取其后面16位。我们知道AQS将由此可见,读写锁采用将int的前16位作为读的状态,后16位作为写的状态(可结合后文中代码理解)。

    > 这些变量的作用是什么呢?
    ### 独占锁方法
    写锁中的lock调用的是Sync中的acquire,这个方法来自于AQS,所以再重新回忆一下aquire(1)对应的AQS方法

    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();

    }

    1
    2
    可以看出它跟其他所不同之处还是tryAcquire方法,释放unlock同样使用了AQS的release(1),其子类核心是tryReleaseShared。
    + tryAcquire

    protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);  //获取状态值的独占状态值,也就是可重入个数
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 走到这里说明是该线程占有锁,且可重入次数没有达到限制,重入次数+1
        setState(c + acquires);
        return true;
    }
    // 走到这里说明锁还没有被占有,则判断以下两个条件:
    // 1. 写的时候是否等待,公平锁则需要等待前驱节点,非公平锁不需要直接return false
    // 2. 尝试CAS,如果失败,说明其他线程拿到锁了 
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    //走到这,说明当前线程竞争锁成功。
    setExclusiveOwnerThread(current);
    return true;

    }

    1
    2
        再次结合AQS回一下整个过程,首先去尝试获取锁,如果成功则返回,如果失败则加入同步队列等待唤醒(park),或自旋获取锁(头结点的下个节点)。这里的写锁与前面讲到的独占锁几乎一样。
    + tryRelease

    protected final boolean tryRelease(int releases) {

    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);   
    return free;

    }

    1
    2
    3
        这一段也不再赘述,逻辑与独占锁一模一样
    + tryWriteLock()
    还是与独占锁对比(这两个几乎一样),上面说了,独占锁中的trylock对于公平和非公平没有区别都是用非公平的策略去实现的。这里同样如此

    final boolean tryWriteLock() {

    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current); // 不再有前驱节点的判断
    return true;

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
        有一点要注意ReentrantLock中trylock和lock共用了nofairTryAquire方法,但公平和非公平的lock方法不一致。  
    而这里是ReentrantReadWriteLock中的写锁正好相反,也就是公平和非公平的lock方法共用逻辑,但是tryWriteLock和lock的方法不一致。
    但其实原理是一样的。
    ### 共享锁方法
    因为我们文中前面部分已经讲过独占锁,所以写锁再理解起来就非常简单了,但是读锁的具体实现我们还没讲过,不过原理大体是差不多的。
    记得我们在[AQS源码解析文章](https://www.jianshu.com/p/63cf39b4dc15)中讲过,共享锁和独占所得区别:
    1. 状态操作不一样,共享锁是判断state>0 ,则表示还可以被持有
    2. 获取到锁后以及释放时的操作不一样,共享锁还会尝试去扩散通知其他节点
    3. 共享锁没有条件队列

    我们接下来跟诊源码看一下具体实现
    + tryAcquireShared

    protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();// 获取当前状态和当前线程
    // 判断写锁是否被占用,记住写的时候是不能读的。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);  
    // 三个条件: 1.读的时候是否阻塞 2.判断是否达到共享数量最大值 3.CAS成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) { 
        if (r == 0) {// 如果为0则说明该线程是第一个读线程,则记录
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // 到这说明不是第一个读线程
            HoldCounter rh = cachedHoldCounter; // 不是当前线程缓存值或者没有缓存则设置缓存
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // // 7. 为什么要 count == 0 时进行 ThreadLocal.set? 因为上面tryReleaseShared方法 中当 count == 0 时, 进行了ThreadLocal.remove
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 到这说明没有获取到锁
    return fullTryAcquireShared(current); // 自旋获取锁

    }

    1
    2
    + fullTryAcquireShared  
    这个方法其实是 tryAcquireShared 的冗余(redundant)方法, 主要补足 readerShouldBlock 导致的获取等待 和 CAS 修改 AQS 中 state 值失败进行的修补工作

    final int fullTryAcquireShared(Thread current) {

    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                // 到这里说明有线程正在写,直接返回加入AQS队列等待
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // 公平锁表示如果有前驱节点,非公平锁表示头节点是否是写锁
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 到这说明不是第一个线程
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 到这里说明aqs 同步队列里面有 获取 readLock 的node 或 head.next 是获取 writeLock 的节点
                if (rh.count == 0) // 为真说明不是重入,且cas失败
                    return -1;
            }
        }
        // 到这里说明不需要等待,只要能cas成功就行
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }

    }

    这段代码我们重点看下什么情况进入重复循环,这才是这个方法的意义,首先下列条件会返回-1:  
    1. 如果有写锁或者头节点的下一个节点想获取写锁
    2.