独占锁ReentrantLock
ReentrantLock是一个可重入的独占锁,独占锁说明同一个时间,只有一个线程占用,用的场景非常多,也是最常见的一个中锁,其state在0时表示当前锁没有别线程占有,其他值则表示重入次数。首先看一下ReentrantLock的类图:
从图中可以看出,其内部用的同步锁Sync也是继承自AQS,所以我们大胆推测,其原理与AQS一本相同。
我们注意到,ReentrantLock有一个构造函数,也就是入参是否公平锁,从Sync子类也可看出,ReentrantLock提供了公平和非公平的实现。下面我们跟着函数源码去了解其原理
获取锁lock
当一个线程调用lock方法,其尝试获取锁
1 | public void lock() { |
其实现是由子类来实现的:
1 | final void lock() { // 非公平锁 |
如上,公平锁和非公平锁不同之处,在于非公平锁会首先直接取用CAS去抢占,抢占不到再调用acquire(1).这里怎么理解呢?原因是,非公平的概念是不管先来后到,能拿到锁的就是好汉,所以,非公平锁会直接去拿。后面的tryAcquire也是同样的道理。
tryAcquire
AQS中的acquire会首先调用子类的tryAcquire来判断是否拿到锁,没拿到再放入等待循环请求或等待。
重点关注子类的实现。这里有一点要说明:非公平锁的tryAcquire放在了Sync类里实现,而公平锁则由其子类FairSync来实现。原因是无论公平锁还是非公平锁,调用tryLock时,调用的是同一个方法也即是父类Sync的nonfairTryAcquire方法,也就是说无论公平还是非公平锁,使用tryLock函数时都是采用的非公平策略,线程空闲则直接获取,否则失败。
我们首先关注下nonfairTryAcquire方法,它是一个非阻塞的方法,如果获取到则返回true,获取不到则返回false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 这里直接cas和非公平锁lock方法逻辑一样
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果不是0,则判断是否是同一个线程进行重入
else if (current == getExclusiveOwnerThread()) {
// zhon
int nextc = c + acquires; // 可重入此时增加
if (nextc < 0) // overflow 说明次数溢出,超出限制
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}再来看下公平策略下的抢占方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 其他代码都是一样的,只有这里稍有不同,也就是判断前驱节点是否存在,不存在才能抢占,也就是先来后到不能插队。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}回到acquire,如果抢占失败则执行AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))系列方法放入同步队列并等待。这里不再详述。
获取锁unlock
释放锁unlock() 调用的是
sync.release(1);
,这个也是AQS的方法,其也是先调用子类的tryRelease(1),如果成功,则判断是否需要唤醒后继节点,如果是则唤醒。我们重点讲下子类的tryRelease方法,它的实现实在Sync里的,所以公平和非公平策略都一样。
tryRelease
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 状态值减1,也即是可重入次数。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果状态值为0,代表该线程已经完全释放锁了。
free = true;
setExclusiveOwnerThread(null); // 清空当前锁的线程
}
setState(c);
return free;// 返回释放结果
}
```
可以看到,这里释放主要是修改状态值,如果状态值达到0了,则需要进行下一步也就是唤醒
## 案例介绍
CopyOnWriteList
# 读写锁ReentrantReadWriteLock
ReentrantLock锁可以解决大部分并发问题,但是它是一个独占锁,也就是同一时刻只有一个线程持有,这在实际场景中可能性能不佳。其中读多写少的场景就无法满足,因此ReentrantReadWriteLock就产生了,这是一个单线程写,多线程多的实现。
其类图如下:
![读写锁.PNG](/images/thread/读写锁.png)
其类图比较复杂,但是其实核心还是Sync,只是,读写锁包含两把锁readerLock和writerLock来控制读写
,其sync作用只是用来作为参数构造这两把锁。同独占锁一样,读写锁也支持公平和非公平策略,所以其也有带参数的构造函数。
接下来我们跟着代码看
## 结构
CopyOnWriteList 结构其实很简单
1、属性主要是readerLock和writerLock,而它们的核心是Sync。
2、方法基本都是getXXX方法
3、ReadLock、WriteLock也很简单
+ 构造函数public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this);
}
1
2从函数里面看出来,readerLock和writerLock以同一个sync(从这可以看出来这个类非常重要)作为参数进行构建,FairSync和NonfairSync方法大都来自父类Sync,只有writerShouldBlock和readerShouldBlock稍有不同。
+ ReadLock// 忽略相似的响应超时和中断的lock方法
public static class ReadLock implements Lock, java.io.Serializable {private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); // 共享锁的方式 } public boolean tryLock() { return sync.tryReadLock(); // 读锁 } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); // 共享锁不支持条件队列 }
}
1
+ ReadLock
// 忽略相似的响应超时和中断的lock方法
public static class WriteLock implements Lock, java.io.Serializable {private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); // 独占方式 } public boolean tryLock() { return sync.tryWriteLock(); // 写锁 } public void unlock() { sync.release(1); } public Condition newCondition() { throw new UnsupportedOperationException(); }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17看了上面的代码发现,两个锁都很简单,一个就是独占方式用来写数据,此操作不支持同时,一个就是共享方式来实现多线程读,实现的逻辑都是其引用变量sync来实现的,很容易理解。关键的是它们的核心公共参数Sync。
## 核心类Sync
但其实Sync说难也不难,一共就400行代码,接下来我们一起逐行来分析源码
### 属性
+ static final int SHARED_SHIFT = 16;
+ static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1左移16位=1*2^16=65536
+ static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 65535
+ static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 16个1
+ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 共享锁数量
+ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 独占锁可重入次数
>注意 c>>>SHARED_SHIFT c是一个4字节32位整型 >>>表示无符号右移,右移16位,其实就是获取其高16位的值
再看 c & EXCLUSIVE_MASK,这是一个短连接,也就是获取其后面16位。我们知道AQS将由此可见,读写锁采用将int的前16位作为读的状态,后16位作为写的状态(可结合后文中代码理解)。
> 这些变量的作用是什么呢?
### 独占锁方法
写锁中的lock调用的是Sync中的acquire,这个方法来自于AQS,所以再重新回忆一下aquire(1)对应的AQS方法public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
1
2可以看出它跟其他所不同之处还是tryAcquire方法,释放unlock同样使用了AQS的release(1),其子类核心是tryReleaseShared。
+ tryAcquireprotected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //获取状态值的独占状态值,也就是可重入个数 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 走到这里说明是该线程占有锁,且可重入次数没有达到限制,重入次数+1 setState(c + acquires); return true; } // 走到这里说明锁还没有被占有,则判断以下两个条件: // 1. 写的时候是否等待,公平锁则需要等待前驱节点,非公平锁不需要直接return false // 2. 尝试CAS,如果失败,说明其他线程拿到锁了 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //走到这,说明当前线程竞争锁成功。 setExclusiveOwnerThread(current); return true;
}
1
2再次结合AQS回一下整个过程,首先去尝试获取锁,如果成功则返回,如果失败则加入同步队列等待唤醒(park),或自旋获取锁(头结点的下个节点)。这里的写锁与前面讲到的独占锁几乎一样。
+ tryReleaseprotected final boolean tryRelease(int releases) {
if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free;
}
1
2
3这一段也不再赘述,逻辑与独占锁一模一样
+ tryWriteLock()
还是与独占锁对比(这两个几乎一样),上面说了,独占锁中的trylock对于公平和非公平没有区别都是用非公平的策略去实现的。这里同样如此final boolean tryWriteLock() {
Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); // 不再有前驱节点的判断 return true;
}
1
2
3
4
5
6
7
8
9
10
11
12有一点要注意ReentrantLock中trylock和lock共用了nofairTryAquire方法,但公平和非公平的lock方法不一致。
而这里是ReentrantReadWriteLock中的写锁正好相反,也就是公平和非公平的lock方法共用逻辑,但是tryWriteLock和lock的方法不一致。
但其实原理是一样的。
### 共享锁方法
因为我们文中前面部分已经讲过独占锁,所以写锁再理解起来就非常简单了,但是读锁的具体实现我们还没讲过,不过原理大体是差不多的。
记得我们在[AQS源码解析文章](https://www.jianshu.com/p/63cf39b4dc15)中讲过,共享锁和独占所得区别:
1. 状态操作不一样,共享锁是判断state>0 ,则表示还可以被持有
2. 获取到锁后以及释放时的操作不一样,共享锁还会尝试去扩散通知其他节点
3. 共享锁没有条件队列
我们接下来跟诊源码看一下具体实现
+ tryAcquireSharedprotected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread(); int c = getState();// 获取当前状态和当前线程 // 判断写锁是否被占用,记住写的时候是不能读的。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 三个条件: 1.读的时候是否阻塞 2.判断是否达到共享数量最大值 3.CAS成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) {// 如果为0则说明该线程是第一个读线程,则记录 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 到这说明不是第一个读线程 HoldCounter rh = cachedHoldCounter; // 不是当前线程缓存值或者没有缓存则设置缓存 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // // 7. 为什么要 count == 0 时进行 ThreadLocal.set? 因为上面tryReleaseShared方法 中当 count == 0 时, 进行了ThreadLocal.remove readHolds.set(rh); rh.count++; } return 1; } // 到这说明没有获取到锁 return fullTryAcquireShared(current); // 自旋获取锁
}
1
2+ fullTryAcquireShared
这个方法其实是 tryAcquireShared 的冗余(redundant)方法, 主要补足 readerShouldBlock 导致的获取等待 和 CAS 修改 AQS 中 state 值失败进行的修补工作final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) // 到这里说明有线程正在写,直接返回加入AQS队列等待 return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 公平锁表示如果有前驱节点,非公平锁表示头节点是否是写锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 到这说明不是第一个线程 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 到这里说明aqs 同步队列里面有 获取 readLock 的node 或 head.next 是获取 writeLock 的节点 if (rh.count == 0) // 为真说明不是重入,且cas失败 return -1; } } // 到这里说明不需要等待,只要能cas成功就行 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } }
}
这段代码我们重点看下什么情况进入重复循环,这才是这个方法的意义,首先下列条件会返回-1: 1. 如果有写锁或者头节点的下一个节点想获取写锁 2.