十一、并发队列之PriorityBlockingQueue源码解析

前面文章我们深度了解了ConcurrentLinkedQueue、LinkedBlockingQueue、LinkedBlockingDeque、ArrayBlockingQueue。
这里我们再次总结一下,同步队列一共就两种方式:

  • 一种是直接使用自旋结合cas实现同步,性能最好,但复杂度较高,其思想与AQS同步队列有点类似,都是tail入队,head出队,但AQS队列不是严格控制队列的,所以又有区别,重点记住保证入队出队同步,但size、contains不同步。
  • 另一种是使用锁这种类型的所有函数都是严格同步的,但使用锁又有区别, 一种是LinkedBlockingDeque使用两把锁,分别控制生产者消费者,效率较高,所以很多同步场合使用这种,另一种是只使用一把锁,锁住全部,实现简单,如LinkedBlockingDeque、ArrayBlockingQueue。
  • 另外注意使用数组和链表实现的区别,链表采用的是lazySet的方式可能会在GC时产生影响,而Array的方式是采用数组,对于空间的消耗较小(不用构建Node)

PriorityBlockingQueue概述

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素其内部使用PriorityQueue来实现。

属性和构造函数

  • final int DEFAULT_INITIAL_CAPACITY = 11;
  • Object[] queue; // 内部队列
  • int size; // 元素数量
  • Comparator<? super E> comparator; //比较器
  • ReentrantLock lock; // 独占锁
  • Condition notEmpty; // 非空条件变量
  • volatile int allocationSpinLock; // 用来控制扩容的自旋锁,需要cas来操作
  • PriorityQueue q; // 用来序列化的属性
  • 构造函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public PriorityBlockingQueue(int initialCapacity,
    Comparator<? super E> comparator) {
    if (initialCapacity < 1)
    throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
    }
    由上面的信息我们可以总结如下:
  1. 没有notFull条件变量,那就说明这是个无界队列
  2. 只有一把锁,显然出队和入队是同一把,也就是同一时刻只能进或者出
  3. 默认容量是11
  4. 默认比较器是null,也就是说默认自然比较
  5. 核心数据是数组queue,这样看来是使用数组来实现比较,这很容易让我们相对数组形式的堆排序

    操作

    同其他同步队列一样,我们主要还是探究其offer、poll、peek操作

    入队操作

  • offer

    直接贴源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public boolean offer(E e) {
    if (e == null)
    throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap); // 扩容直到成功
    try {
    Comparator<? super E> cmp = comparator;
    if (cmp == null)
    siftUpComparable(n, e, array); // 入队核心逻辑
    else
    siftUpUsingComparator(n, e, array, cmp);
    size = n + 1;
    notEmpty.signal(); // 唤醒下一个等待线程(take方法发现队列为空时会等待)
    } finally {
    lock.unlock(); // 释放锁
    }
    return true;
    }

    从上面的方法里,我们知道,在每次插入元素前会校验一次容量是否足够,不足则进行扩容,然后调用siftUpComparable方法进行入队。

  • siftUpComparable

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
    int parent = (k - 1) >>> 1; 找到父节点
    Object e = array[parent];
    if (key.compareTo((T) e) >= 0) 与父节点比较,如果比父节点大,则结束循环
    break;
    array[k] = e;
    k = parent; 否则将父节点的值赋值到k位置
    }
    array[k] = key; 最后找到正确的位置并放入
    }

    了解过二叉树堆的同学肯定看出来这其实就是一个二叉树堆的入堆操作而且是最小堆,每次从末尾元素加入,然后依次和父节点比较,如果父节点更大,则交换。直到整个堆符合最小堆特征。下面看网上一张图:
    最小堆入队

    • 首先,将元素插入队尾,满嘴最小堆第一个条件:完全二叉树
    • 然后依次与父节点比较,如果更小,则与父节点交换,直到比父节点大。因此达成最小堆第二个条件:父节点比左右子节点都小。

      扩容操作

      上一小结,我们跳过了扩容的阶段,现在我们再回头看:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      private void tryGrow(Object[] array, int oldCap) {
      lock.unlock(); // must release and then re-acquire main lock 释放锁
      Object[] newArray = null;
      if (allocationSpinLock == 0 &&
      UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
      0, 1)) { // 持有扩容锁,成功则向下
      try {
      int newCap = oldCap + ((oldCap < 64) ?
      (oldCap + 2) : // grow faster if small
      (oldCap >> 1)); // 小于64则+2或者扩容50%
      if (newCap - MAX_ARRAY_SIZE > 0) { // 保证不溢出
      int minCap = oldCap + 1;
      if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
      throw new OutOfMemoryError();
      newCap = MAX_ARRAY_SIZE;
      }
      if (newCap > oldCap && queue == array)
      newArray = new Object[newCap];
      } finally {
      allocationSpinLock = 0;
      }
      }
      if (newArray == null) // back off if another thread is allocating
      Thread.yield(); // 到这说明失败了,尝试让出cpu给扩容线程
      lock.lock();
      if (newArray != null && queue == array) {
      // 到这才是真正的扩容,需要使用锁。
      queue = newArray;
      System.arraycopy(array, 0, newArray, 0, oldCap);
      }
      }
      为什么在扩容器要先释放锁,然后使用cas来控制并发?
      显然这里不释放锁一定是没问题,那么作者采用了cas的方式肯定是为了提高性能,如何提高的呢,这里主要是因为扩容所耗费的时间比较长,如果这个时候占有锁,其他入队出队操作都无法工作,这样会大大降低并发性能。

      出队操作

  • poll

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    return dequeue();
    } finally {
    lock.unlock();
    }
    }

    可以看出核心函数是dequeue;

  • dequeue
    既然入队是采用的二叉树堆的方法,不难猜出,我们出列也是使用的二叉树最小堆的出列,
    我们知道这是最小堆,那么出队的肯定是第一个元素,关键是,如何在排除第一个元素之后重新调整使之成为二叉树堆,看下面代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private E dequeue() {
    int n = size - 1;
    if (n < 0)
    return null;
    else {
    Object[] array = queue;
    E result = (E) array[0]; // 队列第一个元素
    E x = (E) array[n]; // 找到最后一个元素
    array[n] = null; // 释放最后一个元素
    Comparator<? super E> cmp = comparator;
    if (cmp == null)
    siftDownComparable(0, x, array, n); // 重新调整
    else
    siftDownUsingComparator(0, x, array, n, cmp);
    size = n;
    return result;
    }
    }

    从上面可以看出,我们是通过释放最后一个元素(但值是头结点也就是最小的那个),如何做到的继续往下面看siftDownComparable:

  • siftDownComparable

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private static <T> void siftDownComparable(int k, T x, Object[] array,
    int n) {
    if (n > 0) {
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1; // loop while a non-leaf 找到尾节点的父节点
    while (k < half) {
    int child = (k << 1) + 1; // 找到k的左右子节点
    Object c = array[child]; // 临时节点C=左节点
    int right = child + 1;
    if (right < n &&
    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
    c = array[child = right]; // 如果右边的小等于右边,则c=右节点
    if (key.compareTo((T) c) <= 0)
    break; // 到这说明x已经小于左右子节点了
    array[k] = c; // 否则将c赋值给第k个节点,也就是将父节点和较小的子节点交换
    k = child;
    }
    array[k] = key;
    }
    }

    上面的逻辑可以使用下图来做推演:
    最小堆出列

    • 在删除最小元素后,我们需要拿最后一个元素往上放,则满足最小堆第一个条件:完全二叉树
    • 接着每次与左右子节点比较,交换最小的子节点,直到比子节点都大,完成最小堆第二个条件:父节点比左右子节点都小。

      其他操作

      首先出队和入队的相关操作与其他队列相似,add和put调用offer,无需阻塞(其他有界队列,会阻塞等待队列有空闲),然后take方法是一个阻塞方法,如果为空,则一直等待直到有元素存在,与poll不同,take不会返回null。
      然后因为使用了锁,其他方法都是严格同步的。

      题外话,其实看过PriorityQueue的源代码就会发现,PriorityBlockingQueue的实现方法与其极其相似,那为什么内部不使用PriorityQueue队列而重新使用array来实现二叉堆呢(下一篇DelayQueue就是基于PriorityQueue实现的)?我的理解是,在PriorityBlockingQueue中锁的粒度比直接是使用PriorityQueue更小,对于扩容,它使用自旋CAS操作来实现同步的,与offer分离了,这一点可以优化性能。