类图结构
首先可以看到ConcurrentLinkedQueue继承自AbstractQueue,一个先进先出的数据接口。
属性
- volatile Node
head // 头结点 - volatile Node
tail // 尾节点 - private static final sun.misc.Unsafe UNSAFE;
- private static final long headOffset; // head的地址偏移
- private static final long tailOffset; // tail的地址偏移
ConcurrentLinkedQueue 的结构非常简单就是头尾节点。 - 构造方法
1
2
3public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null); // 初始化头尾节点
}Node内部类
Node内部类也非常简单,只有值以及指向下一个节点的引用如下:唯一需要注意的是1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}UNSAFE.putOrderedObject(this, nextOffset, val);
方法,之前没有讲过,这里说明一下:Version of {@link #putObjectVolatile }that does not guarantee immediate visibility of the store to other threads. This method is generally only useful if the underlying field is a Java volatile.
意思是说这是putObjectVolatile的另一个版本,但是对其他线程并不是立即可见的。而且这个方法只对volatile变量有效。
为什么不适立即可见呢?好处是什么呢?
这里涉及导一下内存屏障的概念,一般volatile变量是store-load barrier也就是读写之前都直接读取内存,不走缓存,这样保证了操作的原子性。但使用putOrderedObject时使用的是store-store barrier也就是只对写做了屏障,写之前强制完成前面的操作,对于读并不要求。再来看这个函数的名字 put orderd Object,也就是保证写有序。好处是提高因为读屏障造成的性能消耗。
方法
既然是队列,那我们自然想到最主要的几个方法:offer入列、poll出列、peek获取头部元素。其他方法大体类似或者调用这几个方法。接下来我们就这几个方法如何实现并发安全进行源码分析
offer入列
对于队列,我们都是从尾部进入,从头部出,先进先出。offer就是从头部出列的一个操作。此操作是个自旋操作,成功才返回,不存在false的情况
1 | public boolean offer(E e) { |
由上我们看到循环里会出现三个条件分支,我们一个一个来分析。
- 首先是第一种正常入队,没有产生竞争现象,于是顺利的进入第一个条件分支将节点加入队列
注意:此时没有更新尾节点
2. 倘若有另一个线程在我们cas的时候插入了一个节点newNode1,则我们会cas失败,那么重新进入loop循环如下图步骤(1),此时发现q!=null,且p!=q,那么会进入第三个分支(5)重新设置p为新的尾节点,接着在下一次循环的时候进行插入,如下图依次进入步骤(1)(3)
注意此时会更新尾节点,也就是我们说的尾节点更新的延后性。
另外步骤5中什么情况会p=t?,这是因为到这一个条件后的步骤(1)后我们发现还有线程又竞争插入了一个节点,那么此时q!=null,且p!=t,t!=tail,于是重新找tail。
3. 现在就差第二个分支p==q。也就是p.next=p 此条件是因为存在poll出队操作后可能会把head变为自引用,也就是head.next变为head,这个时候需要重新找到head,放弃现有的tail。(可以先看下文的poll)
重点注意这里的尾节点会出现滞后现象(省去自旋cas操作提高性能),但是对于队列来讲,我们总是去读头结点,所以并不影响。
poll出列
1 | public E poll() { |
与offer相对应,这里有4个条件分支,也是一个自旋操作,直到移除成功。
注意上面入队操作后的结果图,我们发现无论何时head都是一个null节点,也就是哨兵节点,所以我们推测,poll操作是把第一个有效节点置空再使哨兵出队,并设置新的头结点。我们根据流程图分析这四个分支如何走到。
- 第一种情况,正常操作,且队列依然有节点,此时访问的是哨兵节点,且有下个元素,则走到条件四,我们拿到真正的数据节点,那么会进入步骤一,释放哨兵节点并将此数据节点置为哨兵节点。
- 第二种情况,访问哨兵节点,此时队列没有元素了,则进入第二个条件,也就是直接返回空。
- 第三种情况,cas失败(有线程出队,且没有重置哨兵节点),此时p=head==null,q=p.next还是为空,看上图中的步骤一,于是出现了p==q。则执行下一个循环。
至此四种情况啊都走完了。这里总结一下重点:
- 队列的头部是一个哨兵节点,这个有点类似AQS的head
- 队列的出队操作是将第一个数据节点置位哨兵节点,释放旧的哨兵节点
- 如果出队的过程发生并发,则重新循环执行。
peek操作
1 | public E peek() { |
从代码看,peek操作与poll操作非常类似,只是减少了移除的操作。
总结
综上ConcurrentLinkedQueue类的实现非常简单,都是通过自旋+cas操作头尾节点来实现并发安全,另外ConcurrentLinkedQueue只保证了出队和入队的原子性,其contains、size方法并没有加锁,完全非阻塞,所以结果不精确。
然后我们看到在整个操作过程中我们没有用到锁或者park的操作,也就是说这是一个完全非阻塞的操作,这与之前讲到过的原子类很相似。
下一篇文章我们看看阻塞方法实现的队列