本章内容
- CAS 与 volatile
- 原子整数
- 原子引用
- 原子累加器
- Unsafe
1、问题的提出
有如下需求,保证accont.withdraw 取款方法的线程安全
public interface Account {
Integer getBalance();
void withdraw(Integer amount);
static void demo(Account account) {
List<Thread> threads = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
threads.add(new Thread(() -> {
account.withdraw(10);
}));
}
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " cost :" + (end - start) / 1000_000 + "ms");
}
}
原实现并不是线程安全的
public class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return this.balance;
}
@Override
public void withdraw(Integer amount) {
balance -= amount;
}
}
测试
Account.demo(new AccountUnsafe(10000));
输出
90 cost :185ms
为什么是不安全的
withdraw 方法
public void withdraw(Integer amount) {
balance -= amount;
}
对应字节码
ALOAD 0 // <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ALOAD 1 // <- amount
INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱
ISUB // 减法
INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance
多线程执行流程
ALOAD 0 // thread-0 <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance // thread-0 <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 拆箱
ALOAD 1 // thread-0 <- amount
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 拆箱
ISUB // thread-0 减法
INVOKESTATIC java/lang/Integer.valueOf // thread-0 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-0 -> this.balance
ALOAD 0 // thread-1 <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance // thread-1 <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 拆箱
ALOAD 1 // thread-1 <- amount
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 拆箱
ISUB // thread-1 减法
INVOKESTATIC java/lang/Integer.valueOf // thread-1 结果装箱
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-1 -> this.balance
解决思路 - 锁
首先想到的是给Account对象加锁
public class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public synchronized Integer getBalance() {
return this.balance;
}
@Override
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
}
输出
0 cost :231ms
解决思路 - 无锁
public class AccountSafe implements Account{
private AtomicInteger balance;
public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next))
break;
}
}
}
测试
Account.demo(new AccountSafe(10000));
输出
0 cost :177ms
2、CAS与volatile
前面看到的 AtomicInteger 的解决方法,内部并没有用到锁来保护共享变量的线程安全。那么它是如何实现的呢?
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next))
break;
}
}
其中关键是compareAndSet, 它就是简称的CAS (也有Compare And Swap的说法),它必须是原子操作。
注意 其实CAS的低层是 lock cmpxchg 指令 (X86架构),在单核CPU和多核CPU下都能够保证【比较 - 交换】的原子性
- 在多核状态下,某个核执行到带lock的指令时,CPU会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制打断,保证了多个线程对内存操作的准确性,是原子的。
慢动作分析
@Slf4j(topic = "c.SlowMotion")
public class SlowMotion {
public static void main(String[] args) {
AtomicInteger balance = new AtomicInteger(10000);
int mainPrev = balance.get();
log.debug("try get : {}", mainPrev);
new Thread(() -> {
sleep(1000);
int prev = balance.get();
balance.compareAndSet(prev, 9000);
log.debug(balance.toString());
}, "t1").start();
sleep(2000);
log.debug("try set 8000");
boolean isSuccess = balance.compareAndSet(mainPrev, 8000);
log.debug("is Success? {}" , isSuccess);
if (!isSuccess) {
mainPrev = balance.get();
log.debug("try set 8000");
isSuccess = balance.compareAndSet(mainPrev, 8000);
log.debug("is Success? {}" , isSuccess);
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出
2022/03/09-02:14:37.774 [main] c.SlowMotion - try get : 10000
2022/03/09-02:14:38.842 [t1] c.SlowMotion - 9000
2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000
2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? false
2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000
2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? true
volatile
获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。
它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存总查找变量的值,必须从主存中获取它的值,线程操作volatile变量都是直接操作主存。即一个线程对volatile变量的修改,对另一个线程可见。
注意 vloatile仅仅保证了共享变量的可见性,让其他线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)
CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换的效果】
为什么无锁效率高
- 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
- 线程就好像高速跑道上的赛车,高速运行,速度超快,一旦发生上下文切换,就好比火车要减速、熄火,等被唤醒又要重新打火、启动、加速,恢复到高速运行,代价比较大
- 但无锁情况下,因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速跑道,没有额外的跑道,线程想要高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
CAS的特点
结合CAS和volatile可以实现无所并发,适用于线程数少,多核CPU的场景下。
- CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,及时改了也没关系,我吃点亏再重试呗。
- synchronized是基于悲观锁的思想:最悲观的估计,得防着其他线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会
- CAS体现的是无锁并发、无阻塞并发,请仔细思考这两句话的意思
- 因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受到影响
3、原子整数
JUC并发包提供了
- AtomicBoolean
- AtomicInteger
- AtomicLong
以AtomicInteger为例
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
System.out.println(i.getAndIncrement());
System.out.println(i.incrementAndGet());
System.out.println(i.decrementAndGet());
System.out.println(i.getAndDecrement());
System.out.println(i.getAndAdd(5));
System.out.println(i.addAndGet(-5));
System.out.println(i.getAndUpdate(p -> p - 2));
System.out.println(i.updateAndGet(p -> p + 2));
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}
4、原子引用
为什么要使用原子引用类型?
- AtomicReference
- AtomICMarkableReference
- AtomicStampedReference
有如下方法
public interface DecimalAccount {
BigDecimal getBalance();
void withdraw(BigDecimal amount);
static void demo(DecimalAccount account) {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
threads.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
试着提供不同的DecimalAccount实现,实现安全的取款操作
不安全实现
class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;
public DecimalAccountUnsafe(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
安全实现 - 使用锁
class DecimalAccountSafeLock implements DecimalAccount {
private final Object lock = new Object();
BigDecimal balance;
public DecimalAccountSafeLock(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
synchronized (lock) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
}
安全实现 - 使用CAS
public class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> balance;
public DecimalAccountSafeCas(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return this.balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = this.balance.get();
BigDecimal next = prev.subtract(prev);
if (balance.compareAndSet(prev, next))
break;
}
}
}
测试
DecimalAccount.demo(new DecimalAccountUnsafe(BigDecimal.valueOf(10000)));
DecimalAccount.demo(new DecimalAccountSafeLock(BigDecimal.valueOf(10000)));
DecimalAccount.demo(new DecimalAccountSafeCas(BigDecimal.valueOf(10000)));
输出
340
0
0
ABA 问题及解决
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
String prev = ref.get();
other();
Thread.sleep(1000);
log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
private static void other() throws InterruptedException {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
}, "t2").start();
}
输出
2022/03/09-23:27:35.350 [main] c.Test4 - main start...
2022/03/09-23:27:35.384 [t1] c.Test4 - change A->B true
2022/03/09-23:27:35.889 [t2] c.Test4 - change B->A true
2022/03/09-23:27:36.903 [main] c.Test4 - change A->C true
主线程仅能判断出共享变量的值和最初值A相同,不能感知到这种A改变为B又改回A的情况,如果主线程希望:
只要有其他线程【动过了】共享变量,那么自己的cas就算失败,这时,仅比较值是不够的,需要再加一个版本号
AtomicStampedReference
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
String prev = ref.getReference();
Integer stamp = ref.getStamp();
other();
Thread.sleep(1000);
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() throws InterruptedException {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
}, "t2").start();
}
输出
2022/03/09-23:32:21.802 [main] c.Test4 - main start...
2022/03/09-23:32:21.842 [t1] c.Test4 - change A->B true
2022/03/09-23:32:22.345 [t2] c.Test4 - change B->A true
2022/03/09-23:32:23.348 [main] c.Test4 - change A->C false
AtomicStampedReference可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被修改了几次。
但是有时候,并不关心变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
AtomicMarkableReference
public class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "GarbageBag{" +
"desc='" + desc + '\'' +
'}';
}
}
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满垃圾");
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程 start");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生线程...");
bag.setDesc("倒空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {
}
log.debug(bag.toString());
}).start();
Thread.sleep(1000);
log.debug("主线程想换一只新的垃圾袋");
boolean isSuccess = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?{}", isSuccess);
log.debug(ref.getReference().toString());
}
输出
2022/03/09-23:47:54.873 [main] c.Test5 - 主线程 start
2022/03/09-23:47:54.874 [main] c.Test5 - GarbageBag{desc='装满垃圾'}
2022/03/09-23:47:54.916 [Thread-0] c.Test5 - 打扫卫生线程...
2022/03/09-23:47:54.916 [Thread-0] c.Test5 - GarbageBag{desc='倒空垃圾袋'}
2022/03/09-23:47:55.922 [main] c.Test5 - 主线程想换一只新的垃圾袋
2022/03/09-23:47:55.922 [main] c.Test5 - 换了么?false
2022/03/09-23:47:55.923 [main] c.Test5 - GarbageBag{desc='倒空垃圾袋'}
Process finished with exit code 0
5、原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
有如下方法
private static <T> void demo(Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer) {
List<Thread> threads = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < 1000; i++) {
threads.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j % length);
}
}));
}
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
不安全的数组
demo(() -> new int[10],
array -> array.length,
(array, index) -> array[index]++,
array -> System.out.println(Arrays.toString(array)));
输出
[979287, 979029, 978970, 978921, 978919, 978832, 979217, 979376, 979381, 979636]
安全的数组
demo(() -> new AtomicIntegerArray(10),
array -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array));
输出
[1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000]
6、字段更新器
- AtomicReferenceFieldUpdater //域,字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合volatile修饰的关键字使用,否则会出现异常
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
at java.util.concurrent.atomic.AtomicIntegerFieldUpdater$AtomicIntegerFieldUpdaterImpl.<init>(AtomicIntegerFieldUpdater.java:412)
at java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdater.java:88)
at nolock.Test7.main(Test7.java:16)
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test7.class, "field");
Test7 test7 = new Test7();
fieldUpdater.compareAndSet(test7, 0, 10);
System.out.println(test7.field);
fieldUpdater.compareAndSet(test7, 10, 20);
System.out.println(test7.field);
fieldUpdater.compareAndSet(test7, 10, 30);
System.out.println(test7.field);
}
输出
10
20
20
|