• AQS核心原理分析《下》


    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章是关于并发编程中AQS的独占模式和共享模式。
    本篇文章记录的基础知识,适合在学Java的小白,也适合复习中,面试中的大佬🤩🤩。
    如果文章有什么需要改进的地方还请大佬不吝赐教👏👏。
    小威在此先感谢各位大佬啦~~🤞🤞
    在这里插入图片描述

    🏠个人主页:小威要向诸佬学习呀
    🧑个人简介:大家好,我是小威,一个想要与大家共同进步的男人😉😉
    目前状况🎉:24届毕业生,在一家满意的公司实习👏👏

    🎁如果大佬在准备面试,可以使用我找实习前用的刷题神器哦刷题神器点这里哟
    💕欢迎大家:这里是CSDN,我总结知识的地方,欢迎来到我的博客,我亲爱的大佬😘

    以下正文开始
    在这里插入图片描述

    独占锁模式

    前面说到,ReentrantLock锁就是基于独占锁实现的,独占锁的加锁和解锁操作都是通过互斥方式实现的
    在这里插入图片描述

    加锁流程

    在AQS中,是通过acquire()方法来加锁的,源码如下:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    首先映入眼帘的是tryAcquire(arg)方法,翻译过来是尝试获取锁的意思,由于是用&&连接,只有tryAcquire(arg)方法失败时才会往下执行,当tryAcquire(arg)方法成功返回true时表示获取资源成功,对于tryAcquire(arg)方法,里面没有实现具体的东西,只是抛出了一个异常,具体逻辑是由AQS的子类实现的:

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
    • 1
    • 2
    • 3

    当tryAcquire(arg)返回为false时,会往后执行addWaiter(Node.EXCLUSIVE)方法,将当前线程封装成独占模式并添加到AQS的等待队列尾部,如果当tryAcquire(arg)返回true,则会直接让当前的线程继续执行,不需要添加到等待队列尾部,点入addWaiter(Node mode)方法,会看到如下源码:

    private Node addWaiter(Node mode) {
           
            /* 这个可以参考上面Node的构造方法
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
            */
            
            //构造新的等待线程节点
            Node node = new Node(Thread.currentThread(), mode);
           
            //新建临时节点pred指向尾节点
            Node pred = tail;
            //队列不为空的话,通过CAS机制将node放到队列尾部
            if (pred != null) {
                //将node的prev域指向尾节点
                node.prev = pred;
                //通过CAS机制将node放到队列尾部
                if (compareAndSetTail(pred, node)) {
                    //将原来尾节点的next域指向当前node节点,node现在为尾节点
                    pred.next = node;//形成双向链表
                    return node;
                }
            }
            //如果队列为空的话
            enq(node);
            return node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在多线程并发情况下,如果有多个线程同时争夺尾节点的位置,会调用enq(node)方法,使用CAS自旋机制挂到双向链表的尾部,下面是源码:

        private Node enq(final Node node) {
            //死循环(自旋)
            for (;;) {
                Node t = tail;
                //尾节点为null,说明头结点也为null,可能是还没有创建队列的时候
                if (t == null) { 
                    //多线程并发情况下,利用CAS机制创建头结点和尾节点,CAS保证此时只有一个头节点被创建,下次自旋时,就会满足队列不为空的条件
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    //如果存在尾节点,将当前节点的prev域指向尾节点
                    node.prev = t;
                    //利用CAS机制完成双向链表的绑定,让之前尾节点指向当前node节点
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    接下来看一看compareAndSetTail方法使用CAS乐观锁机制的方法源码:

        private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }
    
    • 1
    • 2
    • 3

    上述代码讨论到,使用tryAcquire(int arg)方法返回false时,再使用addWaiter(Node mode)方法,将当前线程放入到队列的尾部。acquireQueued(final Node node, int arg)方法,等待休息直到其他线程唤醒

        final boolean acquireQueued(final Node node, int arg) {
            //拿到资源失败情况置为true,表示没有拿到资源
            boolean failed = true;
            try {
                //用于判断线程是否中断过,默认没有中断过
                boolean interrupted = false;
                for (;;) {
                    //获取node的前置节点
                    final Node p = node.predecessor();
                    //如果当前节点的前驱节点是头结点,并且当前节点尝试成功获取了资源
                    if (p == head && tryAcquire(arg)) {
                        //就将当前节点设为头结点
                        setHead(node);
                        //释放之前的头结点,利于垃圾回收
                        p.next = null; // help GC
                        //表示获取资源成功
                        failed = false;
                        //返回线程是否被中断过
                        return interrupted;
                    }
                    //获取资源失败后线程等待,并检查是否被中断过
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //线程是否被中断过
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    如果当前的前驱节点表示头结点,并且获取资源成功,那么直接将当前线程设为头结点,释放之前头结点与后继节点的链接,帮助垃圾回收(GC),如果前面当前节点的前驱不为头结点或者没有获取到资源,那么会调用shouldParkAfterFailedAcquire(Node pred, Node node)方法来判断当前线程是否能够进入waiting状态,如果可以进入,并且进入到了阻塞状态,那会阻塞,直到调用了LockSupport中的unpark()方法唤醒线程。

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //保存前驱节点的状态
            /*
            提示:
            当waitState>0时,表示该线程处于取消状态(线程中断或者等待锁超时),需要移除线程;
            当waitState=0时,默认值,表示初始化状态,表示线程还未完成初始化操作;
            当waitState<0,表示有效状态,线程处于可唤醒状态。
            */
            int ws = pred.waitStatus;
            //等待唤醒后置节点,SIGNAL为-1
            if (ws == Node.SIGNAL)
                return true;
            //如果前置节点不是正常的等待状态(CANCELLED结束状态),那么从当前节点开始往前寻找正常的等待状态
            if (ws > 0) {
                do {
                    //后面的节点断开与前驱节点的链接
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                //双向连接
                pred.next = node;
            } else { //小于0时,可能为共享锁
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    如果前驱节点的SIGNAL值为-1,会返回true。

    compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法内部也使用了CAS锁机制,源码:

        private static final boolean compareAndSetWaitStatus(Node node,
                                                             int expect,
                                                             int update) {
            return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                            expect, update);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果shouldParkAfterFailedAcquire(Node pred, Node node)方法返回true,则会调用parkAndCheckInterrupt()方法阻塞当前线程,线程等待,如果线程被中断过则返回true:

    private final boolean parkAndCheckInterrupt() {
        // 调用park让线程进入wait状态
        LockSupport.park(this);
        // 检查线程是否中断过。
        return Thread.interrupted();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果线程在等待的过程中被中断过,那么获取到资源后会通知线程中断:

        /**
         * Convenience method to interrupt current thread.
         */
        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    acquire()竞争获取锁资源流程时,首先会调用tryAcquire()方法去尝试获取资源,如果成功获取到资源,则直接进入临界区执行代码;如果没有获取到资源,则将此线程封装成一个结点放入队列尾部分,调用park()方法让线程等待,并且标记为独占模式。如果线程被唤醒(unPark)时,会尝试获取锁资源,如果在等待过程中,线程被中断过则返回true,没有被中断过返回false。如果线程在等待过程中被中断过,它是不会响应,只是在获取到资源后直接调用自我中断方法selfInterrupt(),将中断线程。

    在这里插入图片描述

    释放独占锁

    在独占锁中,释放锁的入口是release()方法,源码如下:

        public final boolean release(long arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //唤醒后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    点入tryRelease(arg)方法尝试释放锁,可以看出,其和tryAcquire()方法一样,也没有具体的实现,只是抛出了UnsupportedOperationException()异常,具体的逻辑由AQS的子类实现:

        protected boolean tryRelease(long arg) {
            throw new UnsupportedOperationException();
        }
    
    • 1
    • 2
    • 3

    如果tryRelease(long arg)方法返回true,会判断头结点是否为空,并且waitStatus是否为0(为0则代码初始化阶段),如果不为0,那么下面会调用unparkSuccessor()方法,唤醒后继节点,我们来查看unparkSuccessor()方法的源码:

        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            //如果当前状态为有效状态
            if (ws < 0)
                //CAS操作将waitState置为0
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            //下一个节点(线程)为空或已取消
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从后往前遍历,寻找有效的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                //找到则唤醒后继节点
                LockSupport.unpark(s.thread);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    unparkSuccessor(Node node)方法实现的是唤醒后继节点,当当前节点的waitStatus状态小于0时,表示该状态为有效状态,会使用CAS机制将当前线程设为初始化状态0,之后找到下一个需要唤醒的节点,如果下一个需要唤醒的节点为空或者为取消状态则将当前线程置为null,之后从尾节点往前遍历,寻找有效的节点,找到了且不为null的话,就唤醒该节点(线程)。
    在这里插入图片描述

    共享模式加锁

    在共享模式下加锁的方法入口为acquireShared(long arg)方法,其源码如下:

        public final void acquireShared(long arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    • 1
    • 2
    • 3
    • 4

    进入到tryAcquireShared(arg)方法,此方法为尝试获取资源,得到如下源码,与独占模式获取锁一样,tryAcquireShared(long arg)没有实现具体的逻辑,由AQS的子类实现:

        protected long tryAcquireShared(long arg) {
            throw new UnsupportedOperationException();
        }
    
    • 1
    • 2
    • 3

    tryAcquireShared(arg)会有三种返回值:当返回值为负数时,表示获取资源失败;当返回值为0时,表示获取资源成功,没有剩余资源;当返回值为正数时,表示当前线程获取到了资源,仍然有资源剩余。

    当tryAcquireShared(arg)方法返回为false时,表示获取资源失败,会往下进行doAcquireShared(arg)方法,此方法会将线程放入到等待队列尾部休息,点入 doAcquireShared(arg)方法的源码得:

        private void doAcquireShared(long arg) {
            //将节点加入到队列尾部
            final Node node = addWaiter(Node.SHARED);
            //获取资源成功的标志,初始为获取不到
            boolean failed = true;
            try {
                //中断的标志
                boolean interrupted = false;
                //自旋
                for (;;) {
                    //获取当前节点的前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点是头节点的话
                    if (p == head) {
                        //返回还剩下多少资源
                        long r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //如果资源还足够的话,将头结点指向自己,如果还有剩余资源,可以唤醒后面的节点
                            setHeadAndPropagate(node, r);
                            //断开之前的头结点,便于垃圾回收
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    在共享模式下,当线程被唤醒拿到资源时,如果还有剩余资源,会继续唤醒后继的线程。如果被唤醒的线程发现资源不够用时会再次进入休眠。这个情况下,就算排在首位线程后面的线程需要更少的资源,也会因为前面资源不够而等待,不会先执行后面的线程。

    对于上面代码中出现的setHeadAndPropagate()方法,点入查看得到以下源码:

        private void setHeadAndPropagate(Node node, long propagate) {
            //将头结点赋值为h
            Node h = head; // Record old head for check below
            //将当前节点设置为头结点
            setHead(node);
            //如果还有剩余资源的话,会唤醒后面的节点(线程)
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    //唤醒后继节点
                    doReleaseShared();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在这里插入图片描述

    释放共享资源

    释放共享资源方法为doReleaseShared()

       private void doReleaseShared() {
            //自旋
            for (;;) {
                Node h = head;
                //头结点不为空并且头结点不等于尾节点
                if (h != null && h != tail) {
                    //拿到头结点的等待状态
                    int ws = h.waitStatus;
                    //如果当前节点为等待唤醒的节点
                    if (ws == Node.SIGNAL) {
                        //将当前节点等待状态初始化,来唤醒线程
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;
                                    // loop to recheck cases
                        unparkSuccessor(h);//唤醒后继线程
                    }
                    //如果线程处于初始化状态,并且还有剩余资源
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                //如果没有后继节点,退出自旋
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在 doReleaseShared()方法中,通过自旋的方式获取头节点,当头节点不为空,且队列不为空时,判断头节点的waitStatus状态的值是否为SIGNAL(-1)。当满足条件时,会通过CAS将头节点的waitStatus状态值设置为0,如果CAS操作设置失败,则继续自旋。如果CAS操作设置成功,则唤醒队列中的后继节点。

    如果头节点的waitStatus状态值为0,并且在通过CAS操作将头节点的waitStatus状态设置为PROPAGATE(-3)时失败,则继续自旋逻辑。

    如果在自旋的过程中发现没有后继节点了,则退出自旋逻辑。

    本篇文章就分享到这里了,后续将会分享各种其他关于并发编程的知识,感谢大佬认真读完支持咯 ~
    在这里插入图片描述

    文章到这里就结束了,如果有什么疑问的地方请指出,诸佬们一起讨论🍻
    希望能和诸佬们一起努力,今后进入到心仪的公司
    再次感谢各位小伙伴儿们的支持🤞

    在这里插入图片描述

  • 相关阅读:
    国产化框架PaddleClas结合Swanlab进行杂草分类
    WebDAV之葫芦儿·派盘+飞傲音乐
    k8s部署elk8 直接通过logstash获取日志文件方式
    JavaEE开发之Spring框架整合1
    9.19作业
    npm彻底清理缓存
    高压放大器在3D打印中的应用
    如何向瑞芯微平台添加驱动
    我也差点“跑路”
    linux常用命令及解释大全(一)
  • 原文地址:https://blog.csdn.net/qq_53847859/article/details/127628686