九、并发队列之LinkedBlockingQueue源码解析

类结构

我们依然是线看其整个类内部机构:

  • final int capacity; // 队列的容量
  • final AtomicInteger count = new AtomicInteger(); // 当前元素的数量
  • Node head; // 头结点
  • Node last; // 尾节点
  • ReentrantLock takeLock = new ReentrantLock(); // 出队锁
  • Condition notEmpty = takeLock.newCondition(); // 出队条件变量
  • ReentrantLock putLock = new ReentrantLock(); // 入队锁
  • Condition notFull = putLock.newCondition(); // 入队条件变量
    1
    2
    3
    4
    5
    static class Node<E> {   // 就是一个单向链表
    E item;
    Node<E> next;
    Node(E x) { item = x; }
    }
    由上可以LinkedBlockingQueue使用了两把锁,为什么这么做呢,为何不像双向队列那样只使用一把锁两个条件变量呢?
    我们带着疑问往下看。

方法介绍

和双向队列一样,只要是队列,离不开offer、poll、peek这几个重要方法。

入队

其入队操作也有两种put、offer、add

  • offer
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
    return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 获取到锁,其他入队操作将被阻塞
    try {
    if (count.get() < capacity) {
    enqueue(node); // 未满则加入队列
    c = count.getAndIncrement(); // 并增加队列数量
    if (c + 1 < capacity)
    notFull.signal(); // 如果还没满,继续唤醒其他入队线程
    }
    } finally {
    putLock.unlock();
    }
    if (c == 0)
    signalNotEmpty(); // c==0说明之前队列时空的,那么入队之后就不空了,因此唤醒出队线程,但为什么是==0,而不是>= 0呢?
    return c >= 0;
    }
    public boolean add(E e) { // add 方法继承自父类,调用的是offer方法
    if (offer(e))
    return true;
    else
    throw new IllegalStateException("Queue full");
    }
  • put
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 响应中断
    try {
    while (count.get() == capacity) { // 已经满了,则等待直到可以入队
    notFull.await();
    }
    enqueue(node); // 入队
    c = count.getAndIncrement();
    if (c + 1 < capacity)
    notFull.signal();
    } finally {
    putLock.unlock();
    }
    if (c == 0)
    signalNotEmpty();
    }
    上面代码要点:
  1. 入队操作使用 putLock锁,也就是说不不会阻塞出队操作
  2. 入队成功且如果队列还是未满,则释放之前因为队列满了而阻塞的入队线程
  3. put和offer的区别之一:put操作是一个阻塞操作,如果队列满则会等待直到可以插入,所以put的返回值是void永不失败,而offer的返回值是boolean,可能因为队列满了而失败
  4. put和offer的区别之二:put会响应中断
  5. 回到上面那个问题,为什么c==0 的时候才唤醒出队线程呢?

    这是因为如果c=0,则表示在这次入队之前队列时空的,那么所有的出队操作都将被阻塞。这个时候需要入队线程来唤醒。那么为什么c>0的时候不唤醒?其实这个问题我们应该抛开之前讲过的双向队列的思路,LinkedBlockingDeque是使用一个锁同时控制读写,所以读写互相通知,但这里不一样,用了两把锁。再往上看代码notFull.signal(),发现了吗,写线程会通知写线程。这就会大大提升性能,读和写互补干扰,只有在队列为空,或者队列满时才有交流。

可以看出put和offer的不同之处在于,put是一定会成功的,而offer则因为队列满了而失败。

出队

同入队方式相对应也有三种方式poll、take、remove,remove也是父类的方法,调用的是子类的poll方法。

  • poll
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0) // 如果队列为空,返回null
    return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
    if (count.get() > 0) {
    x = dequeue(); // 出队
    c = count.getAndDecrement();
    if (c > 1)
    notEmpty.signal(); // 通知下一个出队线程
    }
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)
    signalNotFull(); // 如果之前满了,则释放一个入队操作。
    return x;
    }
    take()方法与put方法类似,相对poll,会阻塞直到成功,不会返回null,且响应中断。代码这里不再展示

    获取元素

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public E peek() {
    if (count.get() == 0)
    return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
    Node<E> first = head.next;
    if (first == null)
    return null;
    else
    return first.item;
    } finally {
    takeLock.unlock();
    }
    }
    从代码可以看出逻辑非常简单,就是takelock来实现并发阻塞出队操作。

    其他方法

  • size
    1
    2
    3
    public int size() {
    return count.get(); // 这是一个原子类,所以是一个精确值
    }
  • contains
    public boolean contains(Object o) {
        if (o == null) return false;
        fullyLock();        // 读写锁同时锁住,所以也是一个精确值
        try {
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }

将这两个函数,主要是与ConcurrentLinkedQueue作对比,ConcurrentLinkedQueue采用的非阻塞的方式实现了入队出队的操作,但其size()和contains()方法并没有实现同步,因此不精确,但是入队出队的效率更高。