类结构
我们依然是线看其整个类内部机构:
- final int capacity; // 队列的容量
- final AtomicInteger count = new AtomicInteger(); // 当前元素的数量
- Node
head; // 头结点 - Node
last; // 尾节点 - ReentrantLock takeLock = new ReentrantLock(); // 出队锁
- Condition notEmpty = takeLock.newCondition(); // 出队条件变量
- ReentrantLock putLock = new ReentrantLock(); // 入队锁
- Condition notFull = putLock.newCondition(); // 入队条件变量 由上可以LinkedBlockingQueue使用了两把锁,为什么这么做呢,为何不像双向队列那样只使用一把锁两个条件变量呢?
1
2
3
4
5static class Node<E> { // 就是一个单向链表
E item;
Node<E> next;
Node(E x) { item = x; }
}
我们带着疑问往下看。
方法介绍
和双向队列一样,只要是队列,离不开offer、poll、peek这几个重要方法。
入队
其入队操作也有两种put、offer、add
- offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 获取到锁,其他入队操作将被阻塞
try {
if (count.get() < capacity) {
enqueue(node); // 未满则加入队列
c = count.getAndIncrement(); // 并增加队列数量
if (c + 1 < capacity)
notFull.signal(); // 如果还没满,继续唤醒其他入队线程
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); // c==0说明之前队列时空的,那么入队之后就不空了,因此唤醒出队线程,但为什么是==0,而不是>= 0呢?
return c >= 0;
}
public boolean add(E e) { // add 方法继承自父类,调用的是offer方法
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
} - put 上面代码要点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 响应中断
try {
while (count.get() == capacity) { // 已经满了,则等待直到可以入队
notFull.await();
}
enqueue(node); // 入队
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
- 入队操作使用 putLock锁,也就是说不不会阻塞出队操作
- 入队成功且如果队列还是未满,则释放之前因为队列满了而阻塞的入队线程
- put和offer的区别之一:put操作是一个阻塞操作,如果队列满则会等待直到可以插入,所以put的返回值是void永不失败,而offer的返回值是boolean,可能因为队列满了而失败
- put和offer的区别之二:put会响应中断
- 回到上面那个问题,为什么c==0 的时候才唤醒出队线程呢?
这是因为如果c=0,则表示在这次入队之前队列时空的,那么所有的出队操作都将被阻塞。这个时候需要入队线程来唤醒。那么为什么c>0的时候不唤醒?其实这个问题我们应该抛开之前讲过的双向队列的思路,LinkedBlockingDeque是使用一个锁同时控制读写,所以读写互相通知,但这里不一样,用了两把锁。再往上看代码notFull.signal(),发现了吗,写线程会通知写线程。这就会大大提升性能,读和写互补干扰,只有在队列为空,或者队列满时才有交流。
可以看出put和offer的不同之处在于,put是一定会成功的,而offer则因为队列满了而失败。
出队
同入队方式相对应也有三种方式poll、take、remove,remove也是父类的方法,调用的是子类的poll方法。
- poll take()方法与put方法类似,相对poll,会阻塞直到成功,不会返回null,且响应中断。代码这里不再展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0) // 如果队列为空,返回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue(); // 出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); // 通知下一个出队线程
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); // 如果之前满了,则释放一个入队操作。
return x;
}获取元素
从代码可以看出逻辑非常简单,就是takelock来实现并发阻塞出队操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}其他方法
- size
1
2
3public int size() {
return count.get(); // 这是一个原子类,所以是一个精确值
} - contains
public boolean contains(Object o) { if (o == null) return false; fullyLock(); // 读写锁同时锁住,所以也是一个精确值 try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } }
将这两个函数,主要是与ConcurrentLinkedQueue作对比,ConcurrentLinkedQueue采用的非阻塞的方式实现了入队出队的操作,但其size()和contains()方法并没有实现同步,因此不精确,但是入队出队的效率更高。