七、并发队列之ConcurrentLinkedQueue源码分析

类图结构

首先可以看到ConcurrentLinkedQueue继承自AbstractQueue,一个先进先出的数据接口。
ConcurrentLinkedQueue类继承关系

属性

  • volatile Node head // 头结点
  • volatile Node tail // 尾节点
  • private static final sun.misc.Unsafe UNSAFE;
  • private static final long headOffset; // head的地址偏移
  • private static final long tailOffset; // tail的地址偏移
    ConcurrentLinkedQueue 的结构非常简单就是头尾节点。
  • 构造方法
    1
    2
    3
    public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null); // 初始化头尾节点
    }

    Node内部类

    Node内部类也非常简单,只有值以及指向下一个节点的引用如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    Node(E item) {
    UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> k = Node.class;
    itemOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("item"));
    nextOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("next"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }
    }
    唯一需要注意的是UNSAFE.putOrderedObject(this, nextOffset, val);方法,之前没有讲过,这里说明一下:

    Version of {@link #putObjectVolatile }that does not guarantee immediate visibility of the store to other threads. This method is generally only useful if the underlying field is a Java volatile.
    意思是说这是putObjectVolatile的另一个版本,但是对其他线程并不是立即可见的。而且这个方法只对volatile变量有效。
    为什么不适立即可见呢?好处是什么呢?
    这里涉及导一下内存屏障的概念,一般volatile变量是store-load barrier也就是读写之前都直接读取内存,不走缓存,这样保证了操作的原子性。但使用putOrderedObject时使用的是store-store barrier也就是只对写做了屏障,写之前强制完成前面的操作,对于读并不要求。再来看这个函数的名字 put orderd Object,也就是保证写有序。好处是提高因为读屏障造成的性能消耗。

方法

既然是队列,那我们自然想到最主要的几个方法:offer入列、poll出列、peek获取头部元素。其他方法大体类似或者调用这几个方法。接下来我们就这几个方法如何实现并发安全进行源码分析

offer入列

对于队列,我们都是从尾部进入,从头部出,先进先出。offer就是从头部出列的一个操作。此操作是个自旋操作,成功才返回,不存在false的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // 使用两个Node变量t代表入函数的的尾节点,p代表更新后的尾节点。
Node<E> q = p.next; // q是尾节点的下一个节点 (1)
if (q == null) { // 到这里说明确实是队尾,没有发生其他线程竞争现象
// 则尝试cas设置新节点为tail的next节点。(2)
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK. // 此时尾节点不是真正的尾节点,更新失败说明其他线程设置了tail(3)
return true; // 入队成功,才结束
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head; // (4)更新尾节点,找到头结点,也就是尾节点失效了
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q; // 更新尾节点 (5)
}
}

由上我们看到循环里会出现三个条件分支,我们一个一个来分析。

  1. 首先是第一种正常入队,没有产生竞争现象,于是顺利的进入第一个条件分支将节点加入队列

正常入队

注意:此时没有更新尾节点
2. 倘若有另一个线程在我们cas的时候插入了一个节点newNode1,则我们会cas失败,那么重新进入loop循环如下图步骤(1),此时发现q!=null,且p!=q,那么会进入第三个分支(5)重新设置p为新的尾节点,接着在下一次循环的时候进行插入,如下图依次进入步骤(1)(3)

竞争锁失败
注意此时会更新尾节点,也就是我们说的尾节点更新的延后性。
另外步骤5中什么情况会p=t?,这是因为到这一个条件后的步骤(1)后我们发现还有线程又竞争插入了一个节点,那么此时q!=null,且p!=t,t!=tail,于是重新找tail。
3. 现在就差第二个分支p==q。也就是p.next=p 此条件是因为存在poll出队操作后可能会把head变为自引用,也就是head.next变为head,这个时候需要重新找到head,放弃现有的tail。(可以先看下文的poll)

重点注意这里的尾节点会出现滞后现象(省去自旋cas操作提高性能),但是对于队列来讲,我们总是去读头结点,所以并不影响。

poll出列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) { (1)移除当前节点
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p); // 更新头结点
return item;
}
else if ((q = p.next) == null) { // (2)将q指向p的下个节点当
// 到这表示前队列为空返回null
updateHead(h, p);
return null;
}
else if (p == q) // (3)如果被自引用了则重新寻找
continue restartFromHead;
else (4) 寻找下一个节点
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}

与offer相对应,这里有4个条件分支,也是一个自旋操作,直到移除成功。
注意上面入队操作后的结果图,我们发现无论何时head都是一个null节点,也就是哨兵节点,所以我们推测,poll操作是把第一个有效节点置空再使哨兵出队,并设置新的头结点。我们根据流程图分析这四个分支如何走到。
LinkedQ出队操作

  1. 第一种情况,正常操作,且队列依然有节点,此时访问的是哨兵节点,且有下个元素,则走到条件四,我们拿到真正的数据节点,那么会进入步骤一,释放哨兵节点并将此数据节点置为哨兵节点。
  2. 第二种情况,访问哨兵节点,此时队列没有元素了,则进入第二个条件,也就是直接返回空。
  3. 第三种情况,cas失败(有线程出队,且没有重置哨兵节点),此时p=head==null,q=p.next还是为空,看上图中的步骤一,于是出现了p==q。则执行下一个循环。

至此四种情况啊都走完了。这里总结一下重点:

  • 队列的头部是一个哨兵节点,这个有点类似AQS的head
  • 队列的出队操作是将第一个数据节点置位哨兵节点,释放旧的哨兵节点
  • 如果出队的过程发生并发,则重新循环执行。

peek操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

从代码看,peek操作与poll操作非常类似,只是减少了移除的操作。

总结

综上ConcurrentLinkedQueue类的实现非常简单,都是通过自旋+cas操作头尾节点来实现并发安全,另外ConcurrentLinkedQueue只保证了出队和入队的原子性,其contains、size方法并没有加锁,完全非阻塞,所以结果不精确。
然后我们看到在整个操作过程中我们没有用到锁或者park的操作,也就是说这是一个完全非阻塞的操作,这与之前讲到过的原子类很相似。
下一篇文章我们看看阻塞方法实现的队列