• 深入理解AQS--jdk层面管程实现【管程详解的补充】


    什么是AQS

      1.java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器

      2.JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的

        【1】一般是通过一个内部类Sync继承 AQS

        【2】将同步器所有调用都映射到Sync对应的方法

     

    AQS具备的特性:

      1.阻塞等待队列  , 2.共享/独占  , 3.公平/非公平  , 4.可重入  , 5.允许中断 

     

    AQS定义两种资源共享方式

      1.Exclusive-独占,只有一个线程能执行,如ReentrantLock(详情可查看 深入理解ReentrantLock类锁

      2.Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

     

    AQS定义两种队列

      1.同步等待队列【主要用于维护获取锁失败时入队的线程

        【1】AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。   

        【2】AQS 依赖CLH同步队列来完成同步状态的管理:

          1)当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程

          2)当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

          3)通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

        【3】图示:

               

      2.条件等待队列【调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁

        【1】AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:

          1)调用await方法阻塞线程;

          2)当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)

     

      3.AQS 定义了5个队列中节点状态:

        1)值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。

        2)CANCELLED,值为1,表示当前的线程被取消;

        3)SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;

        4)CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;

        5)PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

     

    源码详解(将源码拆分为三块,抽象同步器AbstractQueuedSynchronizer类,节点Node类,条件对象ConditionObject类

      AbstractQueuedSynchronizer类解析

        1.属性值解析

    复制代码
    //用链表来表示队列
    private transient volatile Node head;
    private transient volatile Node tail;
    
    private volatile int state;  //可以表示锁的加锁状态【独占锁只为1,共享锁可以大于1】,又可以表示锁的重入次数,0为没有加锁
    复制代码

        2.方法解析

    复制代码
    //定义了主体的大体逻辑,如入队,如尝试加锁
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    //模板方法的处理,如果子类没有实现,则子类中调用的话会报错
    //提供给子类去实现的公平与非公平的逻辑
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //释放锁的逻辑
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    复制代码

     

      Node类详解

        1.代码展示

    复制代码
    static final class Node {
    
        static final Node SHARED = new Node();  // 共享模式标记
        static final Node EXCLUSIVE = null;     // 独占模式标记
    
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    
        //值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
        //CANCELLED,值为1,表示当前的线程被取消;
        //SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
        //CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
        //PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
        volatile int waitStatus;
    
        
        volatile Node prev;//前驱结点
        volatile Node next;//后继结点
        volatile Thread thread; //与节点绑定的线程
        Node nextWaiter; // 存储condition队列中的后继节点
    
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {}
    
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    复制代码

     

      Condition接口详解

        1.代码展示

    复制代码
    //Condition用来替代synchronized锁的监视器的功能,而且更加灵活
    //一个Condition实例需要与一个lock进行绑定
    public interface Condition {
        //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断
        void await() throws InterruptedException;
    
        //调用此方法的线程将加入等待队列,阻塞直到被通知(线程中断忽略)
        void awaitUninterruptibly();
    
        //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断或等待超时
        long awaitNanos(long nanosTimeout) throws InterruptedException;
    
        //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断或等待超时
        boolean await(long time, TimeUnit unit) throws InterruptedException;
    
        //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断或超出指定日期
        boolean awaitUntil(Date deadline) throws InterruptedException;
    
        //唤醒一个等待中的线程
        void signal();
    
        //唤醒所以等待中的线程
        void signalAll();
    }
    复制代码

        2.发现说明

          【1】在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。Condition的强大之处在于,对于一个锁,我们可以为多个线程间建立不同的Condition。如果采用Object类中的wait(), notify(), notifyAll()实现的话,当写入数据之后需要唤醒读线程时,不可能通过notify()或notifyAll()明确的指定唤醒读线程,而只能通过notifyAll唤醒所有线程,但是notifyAll无法区分唤醒的线程是读线程,还是写线程。所以,通过Condition能够更加精细的控制多线程的休眠与唤醒。

          【2】但,condition的使用必须依赖于lock对象,通过lock对象的newCondition()方法初始化一个condition对象。

     

      ConditionObject类详解【Condition接口的实现类】

        1.属性值解析

    //由头尾两个节点指针形成的链表来达到队列的效果
    private transient Node firstWaiter;
    private transient Node lastWaiter;

        2.方法解析

          【1】核心await方法

    复制代码
    public final void await() throws InterruptedException {
        //如果线程中断,直接抛出异常  
        if (Thread.interrupted())
            throw new InterruptedException();
    
        //进入等待队列中
        Node node = addConditionWaiter();
        //释放当前线程持有的锁,并获取当前同步器状态
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //如果不在同步队列中,那么直接阻塞当前线程;直到被唤醒时,加入到同步队列中
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //此时已经被唤醒,那么尝试获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //如果节点中断取消,那么清除节点
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    //addConditionWaiter将一个节点添加到condition队列中。在入队时,判断当前尾节点是不是CONDITION。如果不是则判断当前尾节点已经被取消,将当前节点出队。那么也就是说在队列中的节点状态,要么是CONDITION,要么是CANCELLED
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    
    //方法的作用是移除取消的节点。方法本身只有在持有锁的时候会被调用。方法会遍历当前condition队列,将所有非Condition状态的节点移除。
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    复制代码

     

          【2】核心signal方法与signalAll方法

    复制代码
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    
    final boolean transferForSignal(Node node) {
        //如果不能更改waitStatus,则表示该节点已被取消
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    复制代码

     

  • 相关阅读:
    机器学习笔记 - 用于动作识别的网络TSN/TSM/SlowFast/R(2+1)D/3D MobileNetV2
    AQS源码解析 7.共享模式_CyclicBarrier重复屏障
    【AGC】如何快速部署Serverless Url缩短模板
    【GIT版本控制】--高级GIT配置
    利用bert4keras实现多任务学习
    记一次神奇的 pipe 错误
    SQL数据库的基本操作流程
    jacoco—增量代码覆盖率实现
    如何在 Spring Boot 中进行文件上传
    【软件架构文档】
  • 原文地址:https://www.cnblogs.com/chafry/p/16756929.html