结构
DelayQueue
// 说明队列元素必须实现Delay接口 ReentrantLock lock = new ReentrantLock();
PriorityQueue
q = new PriorityQueue (); Thread leader = null;
final Condition available = lock.newCondition();
延迟并发队列很简单,共4个主要属性,lock和available我们很熟悉,就是锁和同步变量用来控制并发。稍有不同的是q和leader。我们往下看它们的作用方法
入队
offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 内部使用了PriorityQueue
if (q.peek() == e) { // 说明是第一个即将过期的
leader = null;
available.signal(); // 唤醒等待的线程(主要是避免队列为空,或者leader还在阻塞,但新加入的元素优先级跟高,其更快到期)
}
return true;
} finally {
lock.unlock();
}
}出列操作
poll
1
2
3
4
5
6
7
8
9
10
11
12
13public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null; // 如果为空,或者第一个还没过期,则不出列
else
return q.poll();
} finally {
lock.unlock();
}
}也不难理解,主要是first.getDelay(NANOSECONDS) > 0 这个判断,需要判断是否过期,到 时间了才处理,没有则继续延时。
take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 响应中断
try {
for (;;) {
E first = q.peek(); // 找到头结点
if (first == null)
available.await(); // 如果队列为空就等待
else {
long delay = first.getDelay(NANOSECONDS); // 获取过期时间
if (delay <= 0)
return q.poll(); // 如果过期了,则直接出列
first = null; // don't retain ref while waiting
// 走到这了,代表元素还没过期
if (leader != null) // leader不为空则等待,说明第一个元素都还没到期
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; // 成为leader
try {
available.awaitNanos(delay); // 释放锁等待直到过期
} finally {
if (leader == thisThread) // 释放leader,
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒其他线程
lock.unlock();
}
}发现take方法与以前有很大不同,明显多了很多逻辑。主要是因为要判断是否到过期时间,到了则直接出列,没到则等待。
- 等待分两种情况
- 如果当前leader为空则自己成为leader,并等待delay时长,注意这也是可以被唤醒的
- 如果已经有leader,则说明当前leader都还在等待,那当前线程也需等待,减少不必要的竞争,这么理解呢,就是leader已经获取了头结点,过期时间还没到,那么其他线程也就无需竞争了
- 细心的同学会发现有个问题,如果所有其他线程都因为leader线程等待而等待,那么此时如果有 offer操作,进来一个更快过期的元素怎么办呢?
这个时候我们就需要回头看了,offer方法里有available.signal();调用,会随机的唤醒一个线程思考
为什么延时队列内部使用PriorityQueue而不像PriorityBlockingQueue那样自己实现更高效的队列呢,个人觉得是因为这两个队列的使用场景不一样,DelayQueue更多的用在定时任务之类,对于入队操作并不是很频繁,所以只需使用已有数据结构即可。它的重点在于出列,所以使用了leader这个结构来实现优化,减少不必要的竞争。
- 等待分两种情况