教程:https://www.bilibili.com/video/BV16J411h7Rd
CAS 与 volatileUnsafe创建 1000 个线程修改余额。
public class InitTest {
public static void main(String[] args) throws InterruptedException {
Account account = new AccountUnsafe(10000);
Account.demo(account);
}
}
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
synchronized (this) {
return this.balance;
}
}
@Override
public void withdraw(Integer amount) {
synchronized (this) {
this.balance -= amount;
}
}
}
interface Account {
/**
* @return 查看余额
*/
Integer getBalance();
/**
* @param amount 取款
*/
void withdraw(Integer amount);
/**
* @param account 账户
* 创建 1000 个线程取款
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("最终余额: " + account.getBalance() + " cost: " + TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - start)) + "ms");
}
}

public class InitTest {
public static void main(String[] args) throws InterruptedException {
Account account = new AccountCAS(10000);
Account.demo(account);
}
}
class AccountCAS implements Account {
private AtomicInteger balance;
public AccountCAS(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
while (true) {
// 获取余额的最新值
int latest = balance.get();
// 修改余额
int next = latest - amount;
// 同步余额
if (balance.compareAndSet(latest, next)) {
break;
}
}
}
}
interface Account {
/**
* @return 查看余额
*/
Integer getBalance();
/**
* @param amount 取款
*/
void withdraw(Integer amount);
/**
* @param account 账户
* 创建 1000 个线程取款
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("最终余额: " + account.getBalance() + " cost: " + TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - start)) + "ms");
}
}

稍微快一丢丢。
AtomicInteger 的解决方案中没有使用锁来保护共享变量的线程安全,那么它是怎么实现的线程安全?
public void withdraw(Integer amount) {
while (true) {
int latest = balance.get();
int next = latest - amount;
if (balance.compareAndSet(latest, next)) {
break;
}
}
}
关键在于 compareAndSet,简称 CAS(Compare And Swap),它必须是原子操作。
AtomicInteger 源码中使用 volatile 关键字修饰变量保证变量可见性:

Note:
CAS 的原子性是由操作系统保证的,操作系统中有 CAS 原语
synchronized 会让线程在没有获得锁的时候发生上下文切换,进入阻塞。Note:
打个比喻:无锁状态下相当于线程都在高速上,一旦发生上下文切换,就好比赛车要减速,等待唤醒又要重新打火,启动加速,代价比较大。
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数较少、多核 CPU 的情景下。
CAS 是基于乐观锁的思想:不怕别的线程修改共享变量,若其他线程进行修改,当前线程会再重新尝试修改一下synchronized 是基于悲观锁的思想:防止别的线程来修改共享变量,当前线程上了锁,其他线程无法进入CAS 体现的是无锁并发、无阻塞并发:

AtomicInteger i = new AtomicInteger(0); // 默认就是 0
i.compareAndSet(0, 1); // 先看是不是 0, 若是 0, 则修改为 1
log.debug("compareAndSet(0, 1) 之后的值: {}", i);
log.debug("自增并获取值: {}", i.incrementAndGet()); // ++i
log.debug("自减并获取值: {}", i.decrementAndGet()); // --i
log.debug("先获取后自增: {}", i.getAndIncrement()); // i++
log.debug("先获取后自减: {}", i.getAndDecrement()); // i--
log.debug("先加再取值: {}", i.addAndGet(5));
log.debug("先取值再加: {}", i.getAndAdd(5));

public void withdraw(Integer amount) {
/*
while (true) {
int latest = balance.get();
int next = latest - amount;
if (balance.compareAndSet(latest, next)) {
break;
}
}
*/
balance.getAndAdd(-1 * amount);
}
AtomicInteger i = new AtomicInteger(0); // 默认就是 0
log.debug("自定义先修改, 然后返回: {}", i.updateAndGet(x -> x - 50));
log.debug("自定义先返回, 然后修改: {}", i.getAndUpdate(x -> x * 20));
log.debug("查看当前值: {}", i);

仿照着实现 updateAndGet 功能:
public class InitTest {
public static void main(String[] args) throws InterruptedException {
AtomicInteger i = new AtomicInteger(5); // 默认就是 0
// log.debug("自定义先修改, 然后返回: {}", i.updateAndGet(x -> x * 20));
log.debug("自定义先修改, 然后返回: {}", updateAndGet(i, x -> x * 20));
log.debug("当前值: {}", i);
}
public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator) {
int prev, next;
do {
prev = i.get();
next = operator.applyAsInt(prev);
} while (!i.compareAndSet(prev, next));
return next;
}
}

public class InitTest {
public static void main(String[] args) throws InterruptedException {
Account account = new AccountCAS(new BigDecimal("10000.0"));
Account.demo(account);
}
}
class AccountCAS implements Account {
private final AtomicReference<BigDecimal> balance;
public AccountCAS(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
balance.updateAndGet(x -> x.subtract(amount));
}
}
interface Account {
BigDecimal getBalance();
void withdraw(BigDecimal amount);
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("最终余额: " + account.getBalance() + " cost: " + TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - start)) + "ms");
}
}

如果共享变量经过修改后又恢复到原来的值,当前线程不能发现别的线程对该变量的修改过程:
String prev = ref.get();
new Thread(() -> {
log.debug("change A -> B: {} ?", ref.compareAndSet("A", "B"));
log.debug("change B -> A: {} ?", ref.compareAndSet("B", "A"));
}, "t").start();
TimeUnit.SECONDS.sleep(1);
log.debug("change A -> C: {} ?", ref.compareAndSet(prev, "C"));

AtomicStampedReference 可以解决上述问题,当前线程如果发现别的线程对该变量进行修改,则自己的 CAS 操作失败。
通过设置一个版本号,可以追踪原子引用被修改了几次:
@Slf4j(topic = "c.InitTest")
public class InitTest {
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
int stamp = ref.getStamp();
log.debug("当前版本号: {}", stamp);
new Thread(() -> {
int version = ref.getStamp();
// 四个参数分别是,旧值,新值,旧版本号,新版本号
log.debug("change A -> B: {} ?", ref.compareAndSet("A", "B", version, version + 1));
log.debug("change B -> A: {} ?", ref.compareAndSet("B", "A", version + 1, version + 2));
}, "t").start();
TimeUnit.SECONDS.sleep(1);
log.debug("当前版本号: {}", ref.getStamp());
log.debug("change A -> C: {} ?", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
}

有时候我们只关心变量是否被修改,并不关心被修改了多少次,就可以使用 AtomicMarkableReference 来解决。
@Slf4j(topic = "c.InitTest")
public class InitTest {
static AtomicMarkableReference<String> ref = new AtomicMarkableReference<>("A", true);
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
new Thread(() -> {
// 四个参数分别是,旧值,新值,旧标记,新标记
log.debug("change A -> B ?: {}", ref.compareAndSet("A", "B", true, false));
log.debug("change B -> A ?: {}", ref.compareAndSet("B", "A", false, false));
}, "t").start();
TimeUnit.SECONDS.sleep(1);
log.debug("change A -> C ?: {}", ref.compareAndSet(prev, "C", true, false));
}
}


@Slf4j(topic = "c.InitTest")
public class InitTest {
public static void main(String[] args) throws InterruptedException {
demo(
() -> new int[10],
array -> array.length,
(array, index) -> array[index]++,
array -> System.out.println(Arrays.toString(array))
);
demo(
() -> new AtomicIntegerArray(10),
array -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);
}
/**
* @param arraySupplier 无中生有, 没有参数 -> 结果
* @param lengthFun 一个参数, 一个结果, x -> 结果 ---- PS: BiFunction: 两个参数, 一个结果 (x, y) -> 结果
* @param putConsumer 一个参数没结果 void
* @param printConsumer 两个参数没结果 void
* @param 泛型
*/
private static <T> void demo(Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer) {
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
Integer length = lengthFun.apply(array);
// 新建 length 个线程, 每个线程执行任务: 每个下标位置递增 10000 次
for (int i = 0; i < length; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j % length);
}
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 打印线程
printConsumer.accept(array);
}
}


可以用来保护对象中属性赋值的原子性。
要求原子更新操作的属性必须声明为 volatile 保证可见性,否则会抛出异常 IllegalArgumentException: Must be volatile type:
@Slf4j(topic = "c.InitTest")
public class InitTest {
public static void main(String[] args) throws InterruptedException {
Student student = new Student();
// 参数: 类,属性类型,属性名称
AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
log.debug(student.toString());
log.debug("{}", updater.compareAndSet(student, null, "张三"));
log.debug(student.toString());
}
}
class Student {
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}

为什么要使用原子累加器?原子类不已经有了 incrementAndGet 方法了吗?
LongAdder 累加器在有竞争的时候设置多个累加单元,Thread-0 累加 Cell[0],Thread-1 累加 Cell[1]…最后将结果汇总。累加时操作不同的 Cell 变量,减少了 CAS 重试失败,从而提高性能。
@Slf4j(topic = "c.InitTest")
public class InitTest {
public static void main(String[] args) throws InterruptedException {
demo(
() -> new AtomicLong(0),
adder -> adder.getAndIncrement()
);
demo(
() -> new LongAdder(),
adder -> adder.increment()
);
}
private static <T> void demo (Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 50_0000; j++) {
action.accept(adder);
}
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("总和: " + adder + " 花费时间: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " ms");
}
}

/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells; // 累加单元数组,懒初始化
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base; // 没有竞争时,用 CAS 累加
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy; // 在 cells 创建或者扩容时置为 1,表示加锁
@Slf4j(topic = "c.InitTest")
public class InitTest {
public static void main(String[] args) throws InterruptedException {
LockCas lockCas = new LockCas();
new Thread(() -> {
log.debug("begin...");
lockCas.lock();
try {
log.debug("lock...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lockCas.unlock();
log.debug("unlock...");
}
}).start();
new Thread(() -> {
log.debug("begin...");
lockCas.lock();
try {
log.debug("lock...");
} finally {
lockCas.unlock();
log.debug("unlock...");
}
}).start();
}
}
class LockCas {
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
state.set(0);
}
}

前情提要:
64Byte伪共享:因为 Cell 是数组形式的,在内存中是连续存储的,一个 Cell 为 24 字节(16 Byte 对象头 + 8 Byte value 属性),因此缓存行中可以存储 2 个 Cell 对象,这样就可能导致两个核心更新自己的 Cell 的时候让另外一个核心的缓存失效,需要重新读取内存。

@jdk.internal.vm.annotation.Contended 注解的作用:

Unsafe 对象提供了底层操作内存和线程的方法。
Note:
了解更多参考:
https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html
https://www.cnblogs.com/throwable/p/9139947.html
Unsafe 不能通过 new 或者 Unsafe.getUnsafe 获取对象,只允许启动类加载器加载的类才可以使用。
但是可以通过反射获取,使用 CAS 方法修改属性值:
@Slf4j(topic = "c.InitTest")
public class InitTest {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
// 0. 通过反射获取 unsafe 对象
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
// 1. 获取域的偏移地址
long idOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("name"));
Student student = new Student();
// 2. 执行 CAS 操作
unsafe.compareAndSwapInt(student, idOffset, 0, 100);
unsafe.compareAndSwapObject(student, nameOffset, null, "王二麻子");
log.debug(student.toString());
}
}
@Data
class Student {
volatile int id;
volatile String name;
}
参照 Atomic 源码实现,并测试:
public class InitTest {
public static void main(String[] args) throws InterruptedException {
Account account = new AccountCAS(10000);
Account.demo(account);
}
}
class AccountCAS implements Account {
private MyAtomicInteger balance;
public AccountCAS(Integer balance) {
this.balance = new MyAtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public void withdraw(Integer amount) {
balance.addAndGet(-amount);
}
}
interface Account {
/**
* @return 查看余额
*/
Integer getBalance();
/**
* @param amount 取款
*/
void withdraw(Integer amount);
/**
* @param account 账户
* 创建 1000 个线程取款
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("最终余额: " + account.getBalance() + " cost: " + TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - start)) + "ms");
}
}
/**
* 模拟 AtomicInteger 的实现
*/
class MyAtomicInteger {
private volatile int value;
private static final long valueOffset;
private static final Unsafe UNSAFE;
static {
UNSAFE = UnsafeAccessor.getUnsafe();
try {
valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public MyAtomicInteger(int value) {
this.value = value;
}
public final int get() {
return value;
}
public final int addAndGet(int amount) {
int prev, next;
do {
// Unsafe 底层就是这么写的, 可以通过类对象和偏移量获取属性值
// prev = UNSAFE.getIntVolatile(this, valueOffset);
prev = value;
next = prev + amount;
} while (!UNSAFE.compareAndSwapInt(this, valueOffset, prev, next));
return next;
// return UNSAFE.getAndAddInt(this, valueOffset, amount) + amount;
}
}
/**
* 反射获取 Unsafe 对象工具类
*/
final class UnsafeAccessor {
@SneakyThrows
public static Unsafe getUnsafe() {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
;
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
}
}
