1 概述
JUC包中提供了一系列基于非阻塞CAS算法的原子操作类,相比于用锁机制具有更好的性能,譬如AtomicLong,AtomicInteger,AtomicBoolean,LongAdder,LongAccumulator等,本文着重介绍一下AtomicLong, LongAdder, LongAccumulator,其类图如下所示
?2 AtomicLong
2.1 内部结构
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
// 获得一个unsafe类
private static final Unsafe unsafe = Unsafe.getUnsafe();
//value的偏移量
private static final long valueOffset;
//判断JVM是否支持Long类型无锁CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();
//获取value变量的偏移量
static {
try {
valueOffset = unsafe.objectFieldOffset
(java.util.concurrent.atomic.AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//真正的变量值
private volatile long value;
//有参构造器,可以指定初始value值
public AtomicLong(long initialValue) {
value = initialValue;
}
//无参构造器
public AtomicLong() {
}
......
}
2.2 递增与递减方法
incrementAndGet() 原子性递增value的值并返回递增之后的值
decrementAndGet() 原子性递减value的值并返回递减之后的值
getAndIncrement() 原子性递增value的值并返回递增之前的值
getAndDecrement() 原子性递减value的值并返回递减之前的值
以上四个方法底层都是通过调用unsafe的getAndAddLong()来实现,下面来看一下该方法的具体实现:
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
//获取原子类中value的值
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));//查看原子类中var2偏移量(value)的值是否是var6,是则将其重新赋值为var6 + var4
return var6;
}
2.3 使用案例
以下是计算两个数之和的代码10000 + 100,使用原子操作类的结果是10100
public class Test {
public static AtomicLong atomicLong = new AtomicLong();
public static void main(String[] args) {
//统计两个数之和
Thread threadone = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
atomicLong.incrementAndGet();
}
});
Thread threadtwo = new Thread(() -> {
for (int i = 0; i < 100; i++) {
atomicLong.incrementAndGet();
}
});
threadone.start();
threadtwo.start();
try {
threadone.join();
threadtwo.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("数组中元素的个数为" + atomicLong.get());//10100
}
}
使用一般int变量的结果是9992
public class Test {
public static int atomicLong = 0;
public static void main(String[] args) {
//统计两个数之和
Thread threadone = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
atomicLong++;
}
});
Thread threadtwo = new Thread(() -> {
for (int i = 0; i < 100; i++) {
atomicLong++;
}
});
threadone.start();
threadtwo.start();
try {
threadone.join();
threadtwo.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("数组中元素的个数为" + atomicLong);//9992
}
}
3 LongAdder
LongAdder继承自Striped64在这个类中维护了一个cell数组和一个base,从前面对于AtomicLong的分析可以看到当多线程来对该原子类进行操作时,时多线程共同竞争一个原子操作类AtomicLong,这就势必导致大量竞争失败的线程在此书进行CAS自旋,这样会白白浪费很多CPU资源,故设计出LongAdder,当线程数较少时,共同竞争base变量,当线程数达到一定值时就会初始化Cell数组,并创建Cell(原子变量),这样多线程就又可以去竞争Cell变量,提高了性能,最终统计是是统计base与Cell数组中value的总和。
3.1 Cell类
//避免伪共享
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe机制,获取value的偏量
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Striped64.Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
3.2 add()方法
LongAdder的递增与递减主要就是依靠LongAdder类中的add()方法,接下来看看其实现逻辑
public void add(long x) {
Striped64.Cell[] as; long b, v; int m; Striped64.Cell a;
//查看cell数组是否已经被初始化,并尝试在base变量上进行递增,若cell数组已经被初始化且递增base失败(线程竞争base失败)则进入下面的逻辑
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//再次对cell数组进行非空和是否有元素的判断(防止该过程中有其他线程对cell数组进行了操作)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||//计算当前线程应该获得到cell数组中的哪个Cell
!(uncontended = a.cas(v = a.value, v + x)))//当前线程分配到Cell并进行CAS递增操作,若操作失败则执行Striped64中的longAccumulate方法
longAccumulate(x, null, uncontended);
}
}
接下来看longAccumulate操作,代码分析很长,建议从下往上看,分别是cells数组初始化,cells数组扩容以及添加操作
final void longAccumulate(long x, LongBinaryOperator fn,//传入x,null,false
boolean wasUncontended) {
int h;
//初始化当前线程的ThreadLocalRandomProbe变量(该变量在给线程分配Cell时用到)
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Striped64.Cell[] as; Striped64.Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {//cells数组不为空且有元素这时就可以尝试着区进行进行add操作
if ((a = as[(n - 1) & h]) == null) {//该线程分配的数组上的元素为null
if (cellsBusy == 0) { //若没有其他线程对cells数组进行操作就尝试给数组添加新元素
Striped64.Cell r = new Striped64.Cell(x); // 创建value值就是x的Cell
if (cellsBusy == 0 && casCellsBusy()) { //重新进行判断(因为前面的创建过程中可能有其他线程这时对cells数组进行操作了)并进行CAS操作设置cellsBusy的值
boolean created = false;
try { // Recheck under lock
Striped64.Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {//进行最初逻辑的重新判断
rs[j] = r;//给该线程分配的数组位置上添加原先创建的Cell,此时就是正儿八经的完成了添加x的操作了
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//下面两个判断逻辑是cells数组进行扩容的条件
else if (n >= NCPU || cells != as) //当前的cells数组数目小于cpu数目
collide = false; // At max size or stale
else if (!collide)//发生了线程争夺同一个cells数组中的Cell
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {//cells数组的扩容操作
try {
if (cells == as) { // Expand table unless stale
Striped64.Cell[] rs = new Striped64.Cell[n << 1];//数组按照两倍进行扩容
for (int i = 0; i < n; ++i)//进行数组中元素的拷贝
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//cellsBusy是一个标识,用来表示当前cells数组是否正在初始化或者扩容,或者正在新建Cell元素(1)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//初始化cells数组并通过CAS操作设置cellsBusy变量的值
boolean init = false;
try { // Initialize table
if (cells == as) {
Striped64.Cell[] rs = new Striped64.Cell[2];//创建一个大小为2的Cell数组
rs[h & 1] = new Striped64.Cell(x);//给索引为h&1(h & (2 - 1))处添加Cell对象
cells = rs;//给cells数组赋值
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
4 LongAccumulator
LongAdder就是这个类的特例,我们来看看这个类的构造方法:
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
identity就是LongAccumulator累加器的初始值,而LongBinaryOperator则是一个自定义的双目计算器,可以用来定义我们的LongAccumulator(这是一个函数式接口)要用来是怎样的“累加”(自定义累加规则,比如累乘,累除等等);其实我们在LongAdder中也看到过该双目计算器,就是LongAdder中的add()方法最后调用Striped64中的longAccumulate方法时,该方法就有双目计算器形参,不过当时我们传入的是null,也就是默认是累加器了。
双目计算器接口代码如下:
public interface LongBinaryOperator {
/**
* Applies this operator to the given operands.
*
* @param left the first operand
* @param right the second operand
* @return the operator result
*/
long applyAsLong(long left, long right);
}
来看一个案例看看如果使用这个累加器
public static void main(String[] args) {
LongAccumulator longAccumulator = new LongAccumulator((l, r) -> l * r, 1);
longAccumulator.accumulate(2);
longAccumulator.accumulate(3);
System.out.println(longAccumulator.get());//1 * 2 * 3 实现了累乘器
}
|