十一、并发队列之PriorityBlockingQueue源码解析

结构

  • DelayQueue // 说明队列元素必须实现Delay接口

  • ReentrantLock lock = new ReentrantLock();

  • PriorityQueue q = new PriorityQueue();

  • Thread leader = null;

  • final Condition available = lock.newCondition();
    延迟并发队列很简单,共4个主要属性,lock和available我们很熟悉,就是锁和同步变量用来控制并发。稍有不同的是q和leader。我们往下看它们的作用

    方法

    入队

  • offer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
       public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    q.offer(e); // 内部使用了PriorityQueue
    if (q.peek() == e) { // 说明是第一个即将过期的
    leader = null;
    available.signal(); // 唤醒等待的线程(主要是避免队列为空,或者leader还在阻塞,但新加入的元素优先级跟高,其更快到期)
    }
    return true;
    } finally {
    lock.unlock();
    }
    }

    出列操作

  • poll

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    E first = q.peek();
    if (first == null || first.getDelay(NANOSECONDS) > 0)
    return null; // 如果为空,或者第一个还没过期,则不出列
    else
    return q.poll();
    } finally {
    lock.unlock();
    }
    }

    也不难理解,主要是first.getDelay(NANOSECONDS) > 0 这个判断,需要判断是否过期,到 时间了才处理,没有则继续延时。

  • take

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
        public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 响应中断
    try {
    for (;;) {
    E first = q.peek(); // 找到头结点
    if (first == null)
    available.await(); // 如果队列为空就等待
    else {
    long delay = first.getDelay(NANOSECONDS); // 获取过期时间
    if (delay <= 0)
    return q.poll(); // 如果过期了,则直接出列
    first = null; // don't retain ref while waiting
    // 走到这了,代表元素还没过期
    if (leader != null) // leader不为空则等待,说明第一个元素都还没到期
    available.await();
    else {
    Thread thisThread = Thread.currentThread();
    leader = thisThread; // 成为leader
    try {
    available.awaitNanos(delay); // 释放锁等待直到过期
    } finally {
    if (leader == thisThread) // 释放leader,
    leader = null;
    }
    }
    }
    }
    } finally {
    if (leader == null && q.peek() != null)
    available.signal(); // 唤醒其他线程
    lock.unlock();
    }
    }

    发现take方法与以前有很大不同,明显多了很多逻辑。主要是因为要判断是否到过期时间,到了则直接出列,没到则等待。

    1. 等待分两种情况
      • 如果当前leader为空则自己成为leader,并等待delay时长,注意这也是可以被唤醒的
      • 如果已经有leader,则说明当前leader都还在等待,那当前线程也需等待,减少不必要的竞争,这么理解呢,就是leader已经获取了头结点,过期时间还没到,那么其他线程也就无需竞争了
    2. 细心的同学会发现有个问题,如果所有其他线程都因为leader线程等待而等待,那么此时如果有 offer操作,进来一个更快过期的元素怎么办呢?
      这个时候我们就需要回头看了,offer方法里有available.signal();调用,会随机的唤醒一个线程

      思考

      为什么延时队列内部使用PriorityQueue而不像PriorityBlockingQueue那样自己实现更高效的队列呢,个人觉得是因为这两个队列的使用场景不一样,DelayQueue更多的用在定时任务之类,对于入队操作并不是很频繁,所以只需使用已有数据结构即可。它的重点在于出列,所以使用了leader这个结构来实现优化,减少不必要的竞争。