三、AtomicLong、LongAdder、LongAccumulator源码解读

目录

  1. 介绍
  2. AtomicLong
  3. LongAdder
  4. LongAccumulator

介绍

JUC是并发中非常重要的一个包,接下来针对这个包做一个详细的介绍。
结构如下:

  • atomic包
  • locks包
  • 其他类

可以看到结构非常简单,两个包加一些类。包里面的内容也没多少,这一章节主要讲一下atomic包。
atomic包下主要有Integer、Boolean、Long、Reference(array)等,其每一种又各包含不同功能的类
我们拿Long来介绍,其他不外乎这三种。

AtomicLong

AtomicLong属性

  • unsafe:这是一个Unsafe的静态实例,JUC包一系列原子类操作都是基于此
  • value:实际变量值,volatile Long类型
  • valueOffset: value的偏移地址
  • VM_SUPPORTS_LONG_CAS:JVM的支持判断(可以不用管)

原子类操作有一个比较通用的静态代码块,就是加载地址偏移,后面介绍的LongAdder、LongAccumulator也是一样。

1
2
3
4
5
6
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

从上面可以看到,属性非常少,所以其原理也很简单,当然因为Long本来就简单,下面直接介绍它的一些方法:

AtomicLong方法

  1. 第一类是直接操作如:

    1
    2
    3
    4
    5
    6
    public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }
    public final boolean weakCompareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    底层逻辑就是CAS

  2. 设置一个新值,并返回上一个值,有以下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public final long getAndSet(long newValue) {
    return unsafe.getAndSetLong(this, valueOffset, newValue);
    }
    public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
    }
    public final long getAndDecrement() {
    return unsafe.getAndAddLong(this, valueOffset, -1L);
    }
    public final long getAndAdd(long delta) {
    return unsafe.getAndAddLong(this, valueOffset, delta);
    }
    public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }
    public final long getAndUpdate(LongUnaryOperator updateFunction) {
    long prev, next;
    do {
    prev = get();
    next = updateFunction.applyAsLong(prev);
    } while (!compareAndSet(prev, next));
    return prev;
    }
    还有其他的方法,不一一列举

    上面的方法都可以归为一类,因为底层的实现都是unsafe的方法且操作都是一个while循环进行cas操作,成功了再返回。最后一个看起有地不一样,但如果有看过java8的函数式接口就应该不难理解。
    LongUnaryOperator就是一个函数式接口,定义一个函数,然后操作前一个值,得到目标值。因此getAndUpdate可以实现定制化的操作。

    AtomicLong实例

    计算多个数组中0的个数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    /**
    * <测试原子操作:参考《Java并发编程之美》
    */
    public class AtomicTest
    {
    private static AtomicLong atomicLong = new AtomicLong();
    private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
    private static Integer[] arrayTwo=new Integer[]{10,1,2,3,0,5,6,0,56,0};

    public static void main(String[] args) throws InterruptedException
    {
    // 线程one统计数组arrayone中0的个数
    Thread threadOne=new Thread(()->{
    int size=arrayOne.length;
    for (int i = 0; i < size; i++)
    {
    if(arrayOne[i].intValue()==0){
    atomicLong.incrementAndGet();
    }
    }
    });
    // 线程two统计数组arraytwo中0的个数
    Thread threadTwo=new Thread(()->{
    int size=arrayTwo.length;
    for (int i = 0; i < size; i++)
    {
    if(arrayTwo[i].intValue()==0){
    atomicLong.incrementAndGet();
    }
    }
    });
    threadOne.start();
    threadTwo.start();
    threadOne.join();
    threadTwo.join();
    System.out.println("count 0 : "+atomicLong.get());
    }
    }

    输出: count 0 : 7

LongAdder

到这里AtomicLong原子类操作已经比阻塞同步器来说已经好很多了,但是jdk开发组仍不满足,我们也看到,在第二种方法时我们进行了一个while循环来自旋操作,当多个线程竞争同一个变量时,会造成CPU资源浪费。

LongAdder就是被设计来解决多个线程竞争同一个变量的问题,它把一个变量拆分成了多个变量,使用Cell自动扩容数组来存储部分数据,让多个线程去竞争多个资源,这样就解决了性能瓶颈。
如图:
AtomicLong和LongAdder区别

LongAdder 在内部维护了一个Cell元素数组,来分担单个变量进行争夺的资源占用。

Striped64

LongAdder继承自Striped64,Striped64内部维护以下变量:

  • volatile Cell[] cells :LongAdder实际值为数组中所有元素的值+base
  • volatile long base : 每次操作首先进行base的CAS如果失败则分配一个Cell
  • volatile int cellsBusy :用来实现cells数组的自旋锁,修改的时候锁住,状态值只有0或者1,1表示不允许修改
  • static final long PROBE:线程中threadLocalRandomProbe的偏移
  • static final sun.misc.Unsafe UNSAFE:和AtomicLong一样
  • int NCPU : CPU的个数,Cell不会无限增长,只有个数小于等于CPU的个数才会有性能的提升
    除了Striped64维护的几个属性,LongAdder本身只增加了函数,没有增加属性
    再看内部类Cell:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @sun.misc.Contended    // 此注解使用字节填充防止伪共享
    static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> ak = Cell.class;
    valueOffset = UNSAFE.objectFieldOffset
    (ak.getDeclaredField("value"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }
    }
    由上面可以看到内部类Cell非常简单,一个value值,一个CAS操作,保证value是原子操作
    继续看Striped64的其他几个方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
    static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended)//核心方法
    除了最后一个longAccumulate函数比较复杂我们后面再讲,其他都非常好理解,就是简单的CAS操作

LongAdder方法

LongAdder方法可以归为三类,一类是返回当前的值或重置,一类是数据加、减,还有一类则是序列化(此章节忽略)
第一类如下

  • public long sum() 这个函数比较简单,就是返回所有Cell的元素和base的和,注意这个计算不是一个近实时的值,因为在计算过程中每个Cell会改变
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
    for (int i = 0; i < as.length; ++i) {
    if ((a = as[i]) != null)
    sum += a.value;
    }
    }
    return sum;
    }
  • void reset() 将base和Cell全都置为0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
    for (int i = 0; i < as.length; ++i) {
    if ((a = as[i]) != null)
    a.value = 0L;
    }
    }
    }
  • long sumThenReset() 这个函数是上面两个函数的叠加
  • public String toString() {return Long.toString(sum());}
  • public long longValue() { return sum();}
  • public int intValue() {return (int)sum();}
  • 还有floatValue、doubleValue和以上一样

可以看出上面这一类非常容易理解,就是普通数组的操作

重点是第二类

  • public void add(long x)
  • public void increment() { add(1L);}
  • public void decrement() {add(-1L);}

核心函数add源码:

1
2
3
4
5
6
7
8
9
10
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

上面函数逻辑不难,就是各种条件判断看着有迷。这里有一张图:
LongAdder.add逻辑

要点:

  1. 如果cells还没创建,则尝试修改base的值,这部分跟AtomicLong操作一样,不过不同的是,AtomicLong会自旋不断尝试,而这里则会进入longAccumulate操作cells数据
  2. 如果cells不为空,则尝试对当前线程映射到的Cell元素进行赋值,如果成功则结束,如果不存在或者CAS失败则调用longAccumulate。
  3. uncontended为true 代表当前线程映射的Cell元素没有值,为false,代表当前线程映射的Cell元素已经有其他线程占用了

接着我们重点研究longAccumulate代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 初始化探针值threadLocalRandomProbe,在并发中很多地方用到这个属性,我的理解是这个值并没有实际意义,因为可能线程中多个地方会改变它(比如线程中同时使用了LongAdder、ThreadLocalRandom),它的意义在于遇到线程竞争的时候初始化或重置一个值。
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {// 熟悉的判断,cells存在且有内容
if ((a = as[(n - 1) & h]) == null) {// 如果当前线程映射到此的Cell不存在①
if (cellsBusy == 0) { // cells的锁能修改,则尝试创建一个Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break; // 如果创建成功则跳出循环
continue; // ②不成功代表cells正在被修改比如初始化、扩容、或者新增一个Cell
}
}
collide = false;
}
// 当前线程映射到此的Cell存在 ③
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 执行CAS fn.applyAsLong(v, x)这个函数后面会讲,成功就退出循环,失败继续往下
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 判断Cell的个数是否达到CPU的个数,如果达到,就不能再扩容,继续尝试CAS
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 如果没达到,则锁住cells进行复制扩容④
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 进行到这,说明存在对应的Cell,但有冲突,因此进行扩容并重新生成threadLocalRandomProbe⑤
}
// 拿到cells锁并初始化cells⑥
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 尝试CAS base。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

这一整段代码下来还是挺复杂的,主要是比较多的分支,和casCellsBusy判断,下面总结一下要点:

  1. 如果当前线程映射到此的Cell存在,但如果有冲突,也就是别的线程也映射到此了,则进行扩容和修改线程的探针值(上限是CPU的数量),否则不断尝试CAS如③、④、⑤;
  2. 如果当前线程映射到此的Cell不存在,则尝试创建一个新的Cell并设置值如①
  3. 如果连cells都为空,则进行初始化如⑥
  4. 如果以上都失败,也就是操作被占用,则尝试对casBase

个人想法:

  1. 代码⑥处,个人感觉cells==as是否有点多余,因为后面拿到锁后还会判断
  2. 代码②处,在扩容、初始化时利用cellsBusy锁住整个cells能理解,但是创建一个新Cell插入进去,也锁住了整个cells是否能够继续优化?

    LongAdder实例

    和上面的使用方式差不多
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class LongAdderTest
    {
    private static LongAdder LongAdder = new LongAdder();
    private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
    private static Integer[] arrayTwo=new Integer[]{10,1,2,3,0,5,6,0,56,0};

    public static void main(String[] args) throws InterruptedException
    {
    // 线程one统计数组arrayone中0的个数
    Thread threadOne=new Thread(()->{
    int size=arrayOne.length;
    for (int i = 0; i < size; i++)
    {
    if(arrayOne[i].intValue()==0){
    LongAdder.increment();
    }
    }
    });
    // 线程two统计数组arraytwo中0的个数
    Thread threadTwo=new Thread(()->{
    int size=arrayTwo.length;
    for (int i = 0; i < size; i++)
    {
    if(arrayTwo[i].intValue()==0){
    LongAdder.increment();
    }
    }
    });
    threadOne.start();
    threadTwo.start();
    threadOne.join();
    threadTwo.join();
    System.out.println("count 0 : "+LongAdder.sum());
    }
    }

    LongAccumulator

    LongAccumulator其实和LongAdder很相似,可以说LongAdder只是LongAccumulator一个操作加减的特例。
    构造函数如下:
    1
    2
    3
    4
    5
    public LongAccumulator(LongBinaryOperator accumulatorFunction,
    long identity) {
    this.function = accumulatorFunction;
    base = this.identity = identity;
    }
    发现它是可以定制方法的而且是有初始值的,我们只需传入一个方法(函数式接口)原理如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void accumulate(long x) {
    Cell[] as; long b, v, r; int m; Cell a;
    if ((as = cells) != null ||
    (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
    (a = as[getProbe() & m]) == null ||
    !(uncontended =
    (r = function.applyAsLong(v = a.value, x)) == v ||
    a.cas(v, r)))
    longAccumulate(x, function, uncontended);
    }
    }
    与LongAdder的add()函数相比,它只有function不一样,LongAdder里是null,而这里使我们构造的时候传入的。
    再回到longAccumulate 代码中,
    1
    2
    else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
    break;
    真正更新的值不再是v+x 而是使用我们定义的函数进行计算。