• 大话CAS


    1. 无锁的概念

    在谈论无锁概念时,总会关联起乐观派与悲观派,对于乐观派而言,他们认为事情总会往好的方向发展,总是认为坏的情况发生的概率特别小,可以无所顾忌地做事,但对于悲观派而言,他们总会认为发展事态如果不及时控制,以后就无法挽回了,即使无法挽回的局面几乎不可能发生。这两种派系映射到并发编程中就如同加锁与无锁的策略,即加锁是一种悲观策略,无锁是一种乐观策略,因为对于加锁的并发程序来说,它们总是认为每次访问共享资源时总会发生冲突,因此必须对每一次数据操作实施加锁策略。而无锁则总是假设对共享资源的访问没有冲突,线程可以不停执行,无需加锁,无需等待,一旦发现冲突,无锁策略则采用一种称为CAS的技术来保证线程执行的安全性,这项CAS技术就是无锁策略实现的关键,下面我们进一步了解CAS技术的奇妙之处。

    2. 什么是CAS

    CAS(Compare and Swap),即比较并替换,是用于实现多线程同步的原子指令。

    执行函数:CAS(V,E,N)

    其包含3个参数

    • V表示要更新的变量
    • E表示预期值
    • N表示新值

    假定有两个操作A和B,如果从执行A的线程来看,当另一个线程执行B时,要么将B全部执行完,要么完全不执行B,那么A和B对彼此来说是原子的。

    实现原子操作可以使用锁,锁机制,满足基本的需求是没有问题的了,但是有的时候我们的需求并非这么简单,我们需要更有效,更加灵活的机制,synchronized关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程需要等待,直到该线程释放锁,

    这里会有些问题:首先,如果被阻塞的线程优先级很高很重要怎么办?其次,如果获得锁的线程一直不释放锁怎么办?(这种情况是非常糟糕的)。还有一种情况,如果有大量的线程来竞争资源,那CPU将会花费大量的时间和资源来处理这些竞争,同时,还有可能出现一些例如死锁之类的情况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重。

    实现原子操作还可以使用当前的处理器基本都支持CAS的指令,只不过每个厂家所实现的算法并不一样,每一个CAS操作过程都包含三个运算符:一个内存地址V,一个期望的值A和一个新值B,操作的时候如果这个地址上存放的值等于这个期望的值A,则将地址上的值赋为新值B,否则不做任何操作。

    CAS的基本思路就是,如果这个地址上的值和期望的值相等,则给其赋予新值,否则不做任何事儿,但是要返回原值是多少。循环CAS就是在一个循环里不断的做cas操作,直到成功为止。

    CAS是怎么实现线程的安全呢?语言层面不做处理,我们将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性即可实现基于原子操作的线程安全。

    3. CPU指令对CAS的支持

    或许我们可能会有这样的疑问,假设存在多个线程执行CAS操作并且CAS的步骤很多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?答案是否定的,因为CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。

    4. 悲观锁,乐观锁

    说到CAS,不得不提到两个专业词语:悲观锁,乐观锁。我们先来看看什么是悲观锁,什么是乐观锁。

    4.1 悲观锁

    顾名思义,就是比较悲观的锁,总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronizedReentrantLock等独占锁就是悲观锁思想的实现。

    4.2 乐观锁

    反之,总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。我们今天讲的CAS就是乐观锁。

    由于CAS操作属于乐观派,它总认为自己可以成功完成操作,当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,同样知道其他线程对共享资源操作影响,并执行相应的处理措施。同时从这点也可以看出,由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说无锁操作天生免疫死锁。

    5. CAS 的优点

    非阻塞的轻量级乐观锁, 通过CPU指令实现, 在资源竞争不激烈的情况下性能高, 相比synchronize重量级悲观锁, synchronize有复杂的加锁, 解锁和唤醒线程操作.。

    6. 三大问题

    6.1 ABA问题

    假设这样一种场景,当第一个线程执行CAS(V,E,U)操作,在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KFL27AKH-1667972338372)(images/image-20220301104233113.png)]

    因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。

    就像上图描述的一样,线程A原来的值是10,线程B修改为了20,但是线程C又将值修改为了10,这个时候线程A来读取了,与旧值做判断,发现还是10,没有修改过,就做了更新操作,但是我们知道,值有过变更。

    这就是典型的CAS的ABA问题,一般情况这种情况发生的概率比较小,可能发生了也不会造成什么问题,比如说我们对某个做加减法,不关心数字的过程,那么发生ABA问题也没啥关系。但是在某些情况下还是需要防止的,那么该如何解决呢?在Java中解决ABA问题,我们可以使用以下两个原子类。

    6.1.1 AtomicStampedReference

    AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境,测试demo如下

    public class ABADemo {
    
        static AtomicInteger atIn = new AtomicInteger(100);
    
        //初始化时需要传入一个初始值和初始时间
        static AtomicStampedReference<Integer> atomicStampedR =
                new AtomicStampedReference<Integer>(200,0);
    
    
        static Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                //更新为200
                atIn.compareAndSet(100, 200);
                //更新为100
                atIn.compareAndSet(200, 100);
            }
        });
    
    
        static Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean flag=atIn.compareAndSet(100,500);
                System.out.println("flag:"+flag+",newValue:"+atIn);
            }
        });
    
    
        static Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                int time=atomicStampedR.getStamp();
                //更新为200
                atomicStampedR.compareAndSet(100, 200,time,time+1);
                //更新为100
                int time2=atomicStampedR.getStamp();
                atomicStampedR.compareAndSet(200, 100,time2,time2+1);
            }
        });
    
    
        static Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                int time = atomicStampedR.getStamp();
                System.out.println("sleep 前 t4 time:"+time);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean flag=atomicStampedR.compareAndSet(100,500,time,time+1);
                System.out.println("flag:"+flag+",newValue:"+atomicStampedR.getReference());
            }
        });
    
        public static  void  main(String[] args) throws InterruptedException {
            t1.start();
            t2.start();
            t1.join();
            t2.join();
    
            t3.start();
            t4.start();
            /**
             * 输出结果:
             flag:true,newValue:500
             sleep 前 t4 time:0
             flag:false,newValue:200
             */
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    对比输出结果可知,AtomicStampedReference类确实解决了ABA的问题,下面我们简单看看其内部实现原理

    public class AtomicStampedReference<V> {
        //通过Pair内部类存储数据和时间戳
        private static class Pair<T> {
            final T reference;
            final int stamp;
            private Pair(T reference, int stamp) {
                this.reference = reference;
                this.stamp = stamp;
            }
            static <T> Pair<T> of(T reference, int stamp) {
                return new Pair<T>(reference, stamp);
            }
        }
        //存储数值和时间的内部类
        private volatile Pair<V> pair;
    
        //构造器,创建时需传入初始值和时间初始值
        public AtomicStampedReference(V initialRef, int initialStamp) {
            pair = Pair.of(initialRef, initialStamp);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    接着看看其compareAndSet方法的实现:

    public boolean compareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
            Pair<V> current = pair;
            return
                expectedReference == current.reference &&
                expectedStamp == current.stamp &&
                ((newReference == current.reference &&
                  newStamp == current.stamp) ||
                 casPair(current, Pair.of(newReference, newStamp)));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    同时对当前数据和当前时间进行比较,只有两者都相等是才会执行casPair()方法,单从该方法的名称就可知是一个CAS方法,最终调用的还是Unsafe类中的compareAndSwapObject方法

    private boolean casPair(Pair<V> cmp, Pair<V> val) {
            return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
        }
    
    • 1
    • 2
    • 3

    到这我们就很清晰AtomicStampedReference的内部实现思想了,通过一个键值对Pair存储数据和时间戳,在更新时对数据和时间戳进行比较,只有两者都符合预期才会调用Unsafe的compareAndSwapObject方法执行数值和时间戳替换,也就避免了ABA的问题。

    6.1.2 AtomicMarkableReference

    AtomicMarkableReference与AtomicStampedReference不同的是,AtomicMarkableReference维护的是一个boolean值的标识,也就是说至于true和false两种切换状态,经过测试,这种方式并不能完全防止ABA问题的发生,只能减少ABA问题发生的概率。

    public class ABADemo {
        static AtomicMarkableReference<Integer> atMarkRef =
                  new AtomicMarkableReference<Integer>(100,false);
    
     static Thread t5 = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean mark=atMarkRef.isMarked();
                System.out.println("mark:"+mark);
                //更新为200
                System.out.println("t5 result:"+atMarkRef.compareAndSet(atMarkRef.getReference(), 200,mark,!mark));
            }
        });
    
        static Thread t6 = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean mark2=atMarkRef.isMarked();
                System.out.println("mark2:"+mark2);
                System.out.println("t6 result:"+atMarkRef.compareAndSet(atMarkRef.getReference(), 100,mark2,!mark2));
            }
        });
    
        static Thread t7 = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean mark=atMarkRef.isMarked();
                System.out.println("sleep 前 t7 mark:"+mark);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean flag=atMarkRef.compareAndSet(100,500,mark,!mark);
                System.out.println("flag:"+flag+",newValue:"+atMarkRef.getReference());
            }
        });
    
        public static  void  main(String[] args) throws InterruptedException {        
            t5.start();t5.join();
            t6.start();t6.join();
            t7.start();
    
            /**
             * 输出结果:
             mark:false
             t5 result:true
             mark2:true
             t6 result:true
             sleep 前 t5 mark:false
             flag:true,newValue:500 ---->成功了.....说明还是发生ABA问题
             */
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    AtomicMarkableReference的实现原理与AtomicStampedReference类似。到此,我们也明白了如果要完全杜绝ABA问题的发生,我们应该使用AtomicStampedReference原子类更新对象,而对于AtomicMarkableReference来说只能减少ABA问题的发生概率,并不能杜绝。

    6.2 循环时间长开销大

    自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

    6.3 只能保证一个共享变量的原子操作

    当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。

    还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。

    本文由传智教育博学谷教研团队发布。

    如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

    转载请注明出处!

  • 相关阅读:
    外包公司干了不到3个月,我离职了...
    FFplay文档解读-34-视频过滤器九
    中国电竞20年:从小众娱乐到新兴体育产业
    虚拟机开机字体变大,进入系统后字体模糊
    Java系列 | 如何讲自己的JAR包上传至阿里云maven私有仓库【云效制品仓库】
    python循环时循环体一会多一会少,这个思路值得参考
    scala的schema函数(算子)
    java毕业生设计专利查询与发布系统设计与实现计算机源码+系统+mysql+调试部署+lw
    c++ SQLite 特别好用的库使用实例-更新(3)
    Android内存分析
  • 原文地址:https://blog.csdn.net/bxg_kyjgs/article/details/127768207