目录
介绍
JUC是并发中非常重要的一个包,接下来针对这个包做一个详细的介绍。
结构如下:
- atomic包
- locks包
- 其他类
可以看到结构非常简单,两个包加一些类。包里面的内容也没多少,这一章节主要讲一下atomic包。
atomic包下主要有Integer、Boolean、Long、Reference(array)等,其每一种又各包含不同功能的类
我们拿Long来介绍,其他不外乎这三种。
AtomicLong
AtomicLong属性
- unsafe:这是一个Unsafe的静态实例,JUC包一系列原子类操作都是基于此
- value:实际变量值,volatile Long类型
- valueOffset: value的偏移地址
- VM_SUPPORTS_LONG_CAS:JVM的支持判断(可以不用管)
原子类操作有一个比较通用的静态代码块,就是加载地址偏移,后面介绍的LongAdder、LongAccumulator也是一样。
1 | static { |
从上面可以看到,属性非常少,所以其原理也很简单,当然因为Long本来就简单,下面直接介绍它的一些方法:
AtomicLong方法
第一类是直接操作如:
1
2
3
4
5
6public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}底层逻辑就是CAS
设置一个新值,并返回上一个值,有以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
public final long getAndUpdate(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return prev;
}
还有其他的方法,不一一列举上面的方法都可以归为一类,因为底层的实现都是unsafe的方法且操作都是一个while循环进行cas操作,成功了再返回。最后一个看起有地不一样,但如果有看过java8的函数式接口就应该不难理解。
LongUnaryOperator就是一个函数式接口,定义一个函数,然后操作前一个值,得到目标值。因此getAndUpdate可以实现定制化的操作。AtomicLong实例
计算多个数组中0的个数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* <测试原子操作:参考《Java并发编程之美》
*/
public class AtomicTest
{
private static AtomicLong atomicLong = new AtomicLong();
private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
private static Integer[] arrayTwo=new Integer[]{10,1,2,3,0,5,6,0,56,0};
public static void main(String[] args) throws InterruptedException
{
// 线程one统计数组arrayone中0的个数
Thread threadOne=new Thread(()->{
int size=arrayOne.length;
for (int i = 0; i < size; i++)
{
if(arrayOne[i].intValue()==0){
atomicLong.incrementAndGet();
}
}
});
// 线程two统计数组arraytwo中0的个数
Thread threadTwo=new Thread(()->{
int size=arrayTwo.length;
for (int i = 0; i < size; i++)
{
if(arrayTwo[i].intValue()==0){
atomicLong.incrementAndGet();
}
}
});
threadOne.start();
threadTwo.start();
threadOne.join();
threadTwo.join();
System.out.println("count 0 : "+atomicLong.get());
}
}输出: count 0 : 7
LongAdder
到这里AtomicLong原子类操作已经比阻塞同步器来说已经好很多了,但是jdk开发组仍不满足,我们也看到,在第二种方法时我们进行了一个while循环来自旋操作,当多个线程竞争同一个变量时,会造成CPU资源浪费。
LongAdder就是被设计来解决多个线程竞争同一个变量的问题,它把一个变量拆分成了多个变量,使用Cell自动扩容数组来存储部分数据,让多个线程去竞争多个资源,这样就解决了性能瓶颈。
如图:
LongAdder 在内部维护了一个Cell元素数组,来分担单个变量进行争夺的资源占用。
Striped64
LongAdder继承自Striped64,Striped64内部维护以下变量:
- volatile Cell[] cells :LongAdder实际值为数组中所有元素的值+base
- volatile long base : 每次操作首先进行base的CAS如果失败则分配一个Cell
- volatile int cellsBusy :用来实现cells数组的自旋锁,修改的时候锁住,状态值只有0或者1,1表示不允许修改
- static final long PROBE:线程中threadLocalRandomProbe的偏移
- static final sun.misc.Unsafe UNSAFE:和AtomicLong一样
- int NCPU : CPU的个数,Cell不会无限增长,只有个数小于等于CPU的个数才会有性能的提升
除了Striped64维护的几个属性,LongAdder本身只增加了函数,没有增加属性
再看内部类Cell:由上面可以看到内部类Cell非常简单,一个value值,一个CAS操作,保证value是原子操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22@sun.misc.Contended // 此注解使用字节填充防止伪共享
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
继续看Striped64的其他几个方法:除了最后一个longAccumulate函数比较复杂我们后面再讲,其他都非常好理解,就是简单的CAS操作1
2
3
4
5
6
7
8
9
10final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended)//核心方法
LongAdder方法
LongAdder方法可以归为三类,一类是返回当前的值或重置,一类是数据加、减,还有一类则是序列化(此章节忽略)
第一类如下:
- public long sum() 这个函数比较简单,就是返回所有Cell的元素和base的和,注意这个计算不是一个近实时的值,因为在计算过程中每个Cell会改变
1
2
3
4
5
6
7
8
9
10
11long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
} - void reset() 将base和Cell全都置为0
1
2
3
4
5
6
7
8
9
10public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
} - long sumThenReset() 这个函数是上面两个函数的叠加
- public String toString() {return Long.toString(sum());}
- public long longValue() { return sum();}
- public int intValue() {return (int)sum();}
- 还有floatValue、doubleValue和以上一样
可以看出上面这一类非常容易理解,就是普通数组的操作
重点是第二类:
- public void add(long x)
- public void increment() { add(1L);}
- public void decrement() {add(-1L);}
核心函数add源码:
1 | public void add(long x) { |
上面函数逻辑不难,就是各种条件判断看着有迷。这里有一张图:
要点:
- 如果cells还没创建,则尝试修改base的值,这部分跟AtomicLong操作一样,不过不同的是,AtomicLong会自旋不断尝试,而这里则会进入longAccumulate操作cells数据
- 如果cells不为空,则尝试对当前线程映射到的Cell元素进行赋值,如果成功则结束,如果不存在或者CAS失败则调用longAccumulate。
- uncontended为true 代表当前线程映射的Cell元素没有值,为false,代表当前线程映射的Cell元素已经有其他线程占用了
接着我们重点研究longAccumulate代码:
1 | final void longAccumulate(long x, LongBinaryOperator fn, |
这一整段代码下来还是挺复杂的,主要是比较多的分支,和casCellsBusy判断,下面总结一下要点:
- 如果当前线程映射到此的Cell存在,但如果有冲突,也就是别的线程也映射到此了,则进行扩容和修改线程的探针值(上限是CPU的数量),否则不断尝试CAS如③、④、⑤;
- 如果当前线程映射到此的Cell不存在,则尝试创建一个新的Cell并设置值如①
- 如果连cells都为空,则进行初始化如⑥
- 如果以上都失败,也就是操作被占用,则尝试对casBase
个人想法:
- 代码⑥处,个人感觉cells==as是否有点多余,因为后面拿到锁后还会判断
- 代码②处,在扩容、初始化时利用cellsBusy锁住整个cells能理解,但是创建一个新Cell插入进去,也锁住了整个cells是否能够继续优化?
LongAdder实例
和上面的使用方式差不多1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class LongAdderTest
{
private static LongAdder LongAdder = new LongAdder();
private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
private static Integer[] arrayTwo=new Integer[]{10,1,2,3,0,5,6,0,56,0};
public static void main(String[] args) throws InterruptedException
{
// 线程one统计数组arrayone中0的个数
Thread threadOne=new Thread(()->{
int size=arrayOne.length;
for (int i = 0; i < size; i++)
{
if(arrayOne[i].intValue()==0){
LongAdder.increment();
}
}
});
// 线程two统计数组arraytwo中0的个数
Thread threadTwo=new Thread(()->{
int size=arrayTwo.length;
for (int i = 0; i < size; i++)
{
if(arrayTwo[i].intValue()==0){
LongAdder.increment();
}
}
});
threadOne.start();
threadTwo.start();
threadOne.join();
threadTwo.join();
System.out.println("count 0 : "+LongAdder.sum());
}
}LongAccumulator
LongAccumulator其实和LongAdder很相似,可以说LongAdder只是LongAccumulator一个操作加减的特例。
构造函数如下:发现它是可以定制方法的而且是有初始值的,我们只需传入一个方法(函数式接口)原理如下:1
2
3
4
5public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}与LongAdder的add()函数相比,它只有function不一样,LongAdder里是null,而这里使我们构造的时候传入的。1
2
3
4
5
6
7
8
9
10
11
12
13public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
a.cas(v, r)))
longAccumulate(x, function, uncontended);
}
}
再回到longAccumulate 代码中,真正更新的值不再是v+x 而是使用我们定义的函数进行计算。1
2else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
break;