七、并发队列之ConcurrentLinkedQueue源码分析

结构

  • Node first 头节点
  • Node last 尾节点
  • int count 节点数量
  • int capacity 容量
  • ReentrantLock lock = new ReentrantLock(); 锁
  • Condition notEmpty = lock.newCondition(); 非空条件变量
  • Condition notFull = lock.newCondition(); 未满条件变量

由上可以看出,LinkedBlockingDeque有一个独占锁,且包括它的两个条件变量,其实这就是一个生产者-消费者模型。
再来看其内部Node类

1
2
3
4
5
6
7
8
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}

由上可以看出与ConcurrentLinkedQueue不同,其内部是一个双端队列。
到此我们值了解这么多,重要的操作原理我们往下看源码

重要方法

因为是队列,离不开offer、poll、peek这几个重要方法,不过由于LinkedBlockingDeque是一个双向队列所以还有对应相反的方法。

入队操作

入队方法有如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public boolean add(E e) {
addLast(e);
return true;
}
public void addFirst(E e) {
if (!offerFirst(e))
throw new IllegalStateException("Deque full");
}
public void addLast(E e) {
if (!offerLast(e))
throw new IllegalStateException("Deque full");
}
public boolean offer(E e) {
return offerLast(e);
}
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
return linkFirst(node); // 直接返回linkFirst
} finally {
lock.unlock();
}
}
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
putLast(e);
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
while (!linkFirst(node)) // 如果失败,则调用notFull.await,也就是,等待未满条件
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}

从代码中我们可以看到,一共有三类:add、offer、put,然后分别有对应last和first的方法。只有三点不同:

  1. put中加个while循环,如果失败,则进行等待,只有link成功的条件才往下执行,而offer直接返回link的结果
  2. 它们的返回参数不一样,offer的是boolean,而put则是void
  3. add函数在失败后会抛”Deque full”的异常
    于是我们知道,offer不保证成功,put保证一定能插入元素且是阻塞的方法。
    接着我们继续看他们的核心函数link:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    if (count >= capacity)
    return false; // 数量达到限制,则返回false
    Node<E> f = first; // 头节点
    node.next = f; // 当前节点放到头节点,成为新的头节点
    first = node;
    if (last == null)
    last = node; // 如果没有尾节点,则也置为尾节点
    else
    f.prev = node;
    ++count;
    notEmpty.signal(); // 唤醒出队操作,代表现在队列不是空
    return true;
    }
    上面的函数比较简单,就做了两件事:
  4. 重置头结点
  5. 唤醒出队线程,因为是一个入队操作,那么至少队列的数量有一个,也就是不可能为空了,表示现在可以出队了。

linkLast也是一样,其实由于前面已经加锁了,不会产生并发,所以逻辑非常简单,就是双向队列的入队操作。

出队操作

与入队对应,出队操作也有三种,remove、poll、take。。
看一下三者的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public E remove() {
return removeFirst();
}
public E removeLast() {
E x = pollLast(); // 其实就是调用的poll
if (x == null) throw new NoSuchElementException(); 只不过为空的时候会抛错
return x;
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock(); // 采用独占锁进行同步
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

可以看出,与入队几乎一样,两点不同:

  1. take中加个while循环,如果失败,则进行等待,只有unlink成功的条件才往下执行,而poll直接返回unlink的结果
  2. remove函数在失败后会抛错NoSuchElementException的异常

其核心函数还是unlink :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item; // 保存值用来返回
l.item = null;
l.prev = l; // 指向自己延后释放
last = p; // 重新指定队尾
if (p == null)
first = null; // 如果队列为空就没必要再指定下一个节点了
else
p.next = null;
--count;
notFull.signal(); // 至少有一个空余,唤醒某个入队线程,使其加入同步队列
return item;
}

上面的函数比较简单,就做了两件事:

  1. 释放尾节点、并充值last节点
  2. 唤醒入队线程,因为是一个出队操作,那么至少队列的数量减一,表示现在可以入队了。

获取元素

该操作与出列非常相似,只是没有移除的步骤,获取操作有两种:get、和peek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E getFirst() {
E x = peekFirst();
if (x == null) throw new NoSuchElementException();
return x;
}
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}

可以看出唯一的区别就是为空时抛不抛异常。