• ReentrantLock源码剖析


    0.Lock与Synchronized区别

    • 首先Lock下的ReentrantReadWriteLockReentrantLock大差不差,只是前者多了一个S锁和X锁的兼容性
    • Lock是JUC包下的
    SynchronizedLock
    内置的Java关键字Java类,包括如下三个常用的可重入锁:
    ReentrantLockReentrantReadWriteLock
    无法判断获取锁的状态可以判断是否获取到了锁
    reentrantllock.isHeldByCurrentThread
    会自动释放锁
    可重入
    lock 必须要手动释放锁,否则会死锁。
    且由于可重入机制,lock()的次数要等于unlock()
    遇到阻塞就一直阻塞可以使用tryLock(long timeout, TimeUnit unit)设置超时时间
    非公平锁可以设置是否公平,构造器ReentrantLock(boolean fair)
    适合锁少量的代码同步问题,如:单例模式检锁自由度高,适合锁大量代码
    普通方法对当前对象this监视
    static方法对唯一Class对象监视
    监视的是new出来的ReentrantLock对象

    0.1Synchronized的两个使用示例

    0.1.1同步代码块

       synchronized (this) {
       。。。 System.out.println(Thread.currentThread().getName()+"售票一张,余票:"+ --ticket);
       }
    
    • 1
    • 2
    • 3

    0.1.2同步方法

    	public synchronized void sale(){
    	。。。System.out.println(Thread.currentThread().getName()+"售票一张,余票:"+ --ticket);
    	}
    
    • 1
    • 2
    • 3

    0.2ReentrantLock使用示例

    • 构造器决定是否是公平锁Lock lock = new ReentrantLock( true );//公平锁
    • lock.lock()不重新尝试,lock.tryLock(10,TimeUnit.SECOND)10s内重试
    • 一般在finally{}中lock.unlock()

    0.3读写锁

    因为最下面两个方法是直接获取读、写锁,因此实际操作如下:

    创建读写锁:ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    加读锁:readWriteLock.readLock().lock();
    开读锁:readWriteLock.readLock().unlock();

    1.AQS下的两个类

    1.1Node节点(双端队列)

    • 这几个状态值在lock、await unlock、signal中都是不同的,起到一个状态对应的作用,也用于判断是否可以唤醒线程
    • 每个ReentrantLock中维护着一个Node对象(相应的维护了一个双端队列)
    • 每个Condition中也维护着一个Node对象(同上,一个condition可以对应await在很多地方),因此可以实现指定唤醒
    • Node中的属性Thread非常重要,由于底层的unpark方法唤醒的是指定线程,在源码中的逻辑是:unpark()唤醒同步队列中的头节点head.thread线程

    1.2ConditionObject

    主要是使用这两个方法,其内部也用到了上述双端队列,并且在大题执行逻辑上

    • signal()与lock()相似,底层都是调用 LockSupport.unpark(node.thread);
    • await()与unlock()被阻塞相似,底层都是LockSupport.park(this);
    • 正是由于使用Condition类,在阻塞、唤醒的操作上比传统多线程提高了很多灵活性
    • 在传统的多线程中,使用notify()notifyAll()wait()唤醒其他线程阻塞自己并进入等待队列,等待唤醒后进入同步队列,在JUC编程中使用await()替换了Object类中的wait,signal()替换Object中的notify

    1.2.1signal()指定唤醒等待队列头节点

    其中doSIgnal()底层调用的就是LockSupport.unpark(node.thread);

    1.2.2await()阻塞进入等待队列

    着重需要注意的是:调用condition.await()很容易写成condition.wait()从而报错
    这个方法与unlock调用的都是tryRelease(),底层也都是unpark()
    因为Condition类的存在,await()变得异常灵活,我们可以在不同的线程用同一个condition调用await(),其等待队列维护在这个condition对象中,可以被signal()给唤醒

    关于同步队列、等待队列的内容在第6号标题


    2.源码分析总结

    这里把结论提前放这里:

    • AQS是一个抽象类,包含了Node(用于维护同步队列)、ConditionObject(也用Node维护了等待队列,给ReentrantLock提供更高的自由度,可由reentrantLock.newCondition()创建)
    • CAS(乐观锁的一种实现,本身与自旋锁无关)是通过比较属性的offset偏移量来实现原子性操作,一般配合while()或者for(;;)实现自旋锁,如果只是想尝试一次获取锁,那么就不需要循环,只需要保证原子性即可
    • 自旋锁只是不停尝试,真正执行语句的原子性需要CAS保证,自旋锁也一般配合阻塞实现
    • 公平锁和非公平锁都用Node维护了2个双端队列,不过只有公平锁在争锁的时候会先判断是不是队列的头节点,一旦进入队列仍然是有序的
    • reentrantLock.lock()condition.await()底层都同时做了两件事:将节点放入队列、在for(;;)自旋锁中调用LockSupport.park()来阻塞线程,暂停自旋,防止消耗cpu资源,有且仅有同步队列的第二个节点在自旋
    • reentrantLock.unlock()condition.signal()底层都同时做了两件事:从自己维护的队列中取出头节点(即便是非公平锁也是取头节点)、底层调用LockSupport.park()来唤醒对应的线程
    • 由此可见,ReentrantLock的非公平锁并不是真正意义上的非公平,他只是在第一次获取锁的时候非公平,一旦进入同步队列,还是得乖乖排队
    • 源码中大量使用了if( && ) 的短路功能,来简化代码
    • 关于LockSupport.park(),底层就是阻塞线程,lock失败、awiat都用到了他。同理unpark在unlock和signal中被使用,用于唤醒指定线程(唤醒等待队列的头节点)
    • 关于同步队列与等待队列,其实lock和unlock操作的是同步队列(即使源码注释只提到了等待队列),而awati和signal分别是“将同步队列节点移到等待队列”,“将等待列队节点移到同步队列”————在第6点中细讲

    3.ReentrantLock源码

    • 已知 lock()在没有抢到锁的时候会导致线程阻塞,那么可以猜测相关的线程挂起逻辑是while(true) for(;;) 自旋 + park()阻塞,等待别人unpark()唤醒后继续自旋
    • Java中调用CAS(乐观锁,底层是原子操作)是在Usafe类下的native方法,而这个state的值在CAS锁机制下使用的参数是stateOffset偏移量,效果相同,例如unsafe.objectFieldOffset获取偏移量,然后usafe.compareAndSwapInt执行原子指令
    • 利用形如while( !unsafe.compareAndSwapInt(this, stateOffset, 0 ,1 ))来实现自旋锁,直到加到锁,其中原子性是由CAS(一种乐观锁实现)来保证的

    3.1CAS实现一个简单的自旋锁

    • 用while + cas 可以实现一个自旋锁(自旋的思想就是不停重试)
    • 但是一直while很消耗cpu资源
    • 因此,我们不能让所有等待线程都while,在源码中使用park进行阻塞自旋

    3.1.1 Unsafe类的使用demo

    public class AQSTest {
    
      public static void main(String[] args) {
    
        AQSTest aqsTest = new AQSTest();
        aqsTest.test();
    
      }
    
    
      public void test(){
        System.out.println(state);//0
        Unsafe unsafe = getUnsafe();
        boolean b     = unsafe.compareAndSwapInt(this, stateOffset, 0, 1);
        System.out.println(state);//1
      }
    
      
      private volatile int state = 0;//状态0则没加锁,volatile防止指令重排
      private static final Unsafe unsafe = getUnsafe();//import sun.misc.Unsafe;
      //偏移量,即在计算机中定位到state的位置,以便于原子操作
      private static Long stateOffset;
    
      //用静态代码块捕获异常,如果直接定义private static Long stateOffset =
      // unsafe.objectFieldOffset(AQSTest.class.getDeclaredField("state"));
      //那么则会在空参构造上抛出异常
      static {
        try {
          stateOffset = unsafe.objectFieldOffset(AQSTest.class.getDeclaredField("state"));
        } catch (NoSuchFieldException e) {
          e.printStackTrace();
        }
      }
    
      //获取unsafe对象
      private static Unsafe getUnsafe(){
        try {
          Field field = Unsafe.class.getDeclaredField("theUnsafe");
          field.setAccessible(true);
          return (Unsafe) field.get(null);
        } catch (NoSuchFieldException e) {
          e.printStackTrace();
        } catch (IllegalAccessException e) {
          e.printStackTrace();
        }
        return null;
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3.1.2lock和unlock

    加锁解锁方法:

      public void lock(){
        Unsafe unsafe = getUnsafe();
        while ( !unsafe.compareAndSwapInt(this,stateOffset,0,1) ){
          System.out.println(Thread.currentThread().getName() + "尝试获取锁");
          try {
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println(Thread.currentThread().getName() + "获取锁成功");
      }
    
      public void unlock(){
        Unsafe unsafe = getUnsafe();
        boolean flag     = unsafe.compareAndSwapInt(this, stateOffset, 1, 0);
        if(flag){
          System.out.println("解锁成功");
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3.1.3两个线程测试

    main中开启两个线程:

    AQSTest t = new AQSTest();
    
    
    new Thread(()->{
      System.out.println("线程1开始,上锁");
      t.lock();
      try {
        TimeUnit.SECONDS.sleep(3);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      //3秒后释放锁
      t.unlock();
    },"线程1").start();
    
    
    new Thread(()->{
      System.out.println("线程2开始");
      t.lock();
      t.unlock();
    },"线程2").start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    输出:

    4.lock源码分析

    • lock有两个实现 公平和非公平
    • 两者的区别在于第一步tryAcquire,而第二步acquireQueued二者都是相同的(这也同时证明了非公平锁并不是真正意义上的公平
      二者都维护了等待队列,但在获取锁时只有公平锁使用的队列顺序
    • 非公平锁第一次加锁不考虑队列,会尝试两次锁(失败后进入等待队列)

    加锁失败后会打断自己

    4.1&&的左边条件tryAcquire

    • 在这个&&的前半块,在tryAcquire 中,公平锁要先判断是不是队列队头,而非公平锁是直接cas抢锁(当然因为没有循环,只会获取一次,如果没抢到就算了)

    • 关于排队hasQueuedPredecessors,详细内容如下(考虑了并发问题)

    4.2&&的右边条件acquireQueued

    在这里面公平锁与非公平锁都一样,都是使用了:
    自旋锁 + park阻塞 + unpark唤醒 + 同步队列FIFO 的策略

    4.2.1 addWaiter维护同步队列

    (这里源码中的等待队列其实本质是同步队列,相关内容在第6号标题中)

    那么非公平锁是否有维护这个队列呢?在&&的后半块:我们在addWaiter()的方法打个断点,分别测试公平锁和非公平锁,发现都会进来,就证明其实二者都维护了同步队列(只有公平锁加锁时用hasQueuedPredecessors判断是否是队列头)
    debug后发现,对于addWaiter同步队列,二者的流程一致。不得不说这个addWaiter设计十分精妙,enq(Node node)方法的自旋锁完全考虑了队列为空、创建队列时被插入、新增节点时可能遇到的所有并发问题
    自旋保证节点成功添加到队列中

    4.2.2acquireQueued获取同步队列

    刚刚分析到了加锁失败后,这里形参是刚刚生成的节点,这里最重要的是这个打框的地方,shouldParkAfterFailedAcquire(pre,node)是一个should开头的疑问句,作用是判断当前节点是不是下一个执行节点,如果是的话则执行parkAndCheckInterrupt()阻塞
    而在for(;;)中是自旋的,这里阻塞了可以减少循环消耗cpu资源,也能被上一个节点成功唤醒
    因为被parkAndCheckInterrupt()阻塞了,停止了循环,当上一个节点唤醒当前节点后解除阻塞,继续循环,尝试加锁(非公平锁仍有可能抢不过————存在虽然被唤醒但竞争失败的情况

    4.2.3 parkAndCheckInterrupt方法

    关于LockSupport.park方法,这里参考参考链接,park是一个native方法可以实现精准唤醒(配合队列可以指定唤醒某一个节点),其中公平锁非公平锁都用了相同逻辑的同步队列

    5.unlock源码分析

    5.1源码浅析

    unlock调的都是同一个release()方法
    这里调用unpark去唤醒下一个节点,下一个节点那边接触阻塞

    5.2使用示例

    这个案例主要是探究await()阻塞、

    6.同步队列、等待队列

    • 首先:同步队列的优先级高于等待队列,同步队列决定接下来执行哪个线程
    • 例如:有多个condition等待队列存在的情况下,需要先通过signal扔进同步队列才能确定线程的最终执行顺序

    6.1同步队列

    • 使用reentrantLock.lock()时,当前线程进入的是同步队列,如果阻塞也是阻塞在同步队列
    • 使用reentrantLock.unlock()会将当前节点(线程)从同步队列剔除,释放锁,并通知下一个节点(LockSupport.unpark()

    6.2等待队列

    • 使用condtion.await()时,将当前同步队列的头节点(当前获取锁的线程)扔到对应的condition中的队列尾,同时释放锁(与unlock逻辑相同)
    • 使用condition.signal()时,将condition等待队列的头节点扔到同步队列的队尾

    6.3执行流程

    线程的执行顺序由同步队列决定,等待队列仅仅起到一个保存节点的作用

    案例

    6.4demo

    public class AwaitTest {
    
      public static void main(String[] args) {
    
        final ReentrantLock lock = new ReentrantLock();
        final  Condition     condition = lock.newCondition();
    
        new Thread(()->{
          lock.lock();
          System.out.println("线程000000开始");
          System.out.println("线程000000await()阻塞,进入等待队列");
          try {
           condition.await();//线程0进入等待队列
          } catch (InterruptedException e) {
          }
          System.out.println("线程0被唤醒");
          lock.unlock();//线程0释放锁,锁交给同步队列的下一个节点
        }).start();
    
    
        new Thread(()->{
          lock.lock();
          System.out.println("线程111111开始");
          System.out.println("线程111111await()阻塞,进入等待队列,此时等待队列有线程0和1");
          try {
            //线程1从同步队列移除,进入condition等待队列,此时的condition等待队列有两个元素
            condition.await();
          } catch (InterruptedException e) {
          }
          System.out.println("线程1被唤醒");
          lock.unlock();//线程1释放锁,锁交给同步队列的下一个节点
        }).start();
    
        new Thread(()->{
          try {
          lock.lock();
          System.out.println("此时同步队列队首为线程2(线程2获取锁),线程222222开始");
            System.out.println("唤醒0线程————线程0从等待队列进入同步队列,当线程2 unlock后执行");
            condition.signal();//唤醒0线程
    
            System.out.println("唤醒1线程————线程1从等待队列进入同步队列,当线程0 unlock后执行");
            condition.signal();//唤醒1线程
    
            System.out.println("此时同步队列队首的线程2调用unlock释放锁,执行其他同步队列节点");
            lock.unlock();
          } catch (Exception e) {
            e.printStackTrace();
          }
        }).start();
        
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    6.4两个队列的本质

    • 本质其实就是这个Node节点的结构问题。Node节点是在AQS抽象类中的,且被AQS内部类创建,因此一个ReentrantLock可以有多个Node节点(1个同步队列,多个newCondition创建的等待队列)

    • 除此之外,我们用lock unlock操作的节点隶属于最外层AQS

    • 而await signal操作的节点是AQS的内部类ConditionObject中的

    • 因此形成了这种情况:
      lock unlock操作的是同步队列
      await signal操作的是等待队列

    7.生产者消费者

    7.1JUC:ReentrantLock实现

    关于生产者消费者问题,只需要满足以下结构即可:

    • reentrantLock对象是生产者消费者共有的
    • 如果涉及库存不足,需要指定唤醒生产者,就用reentrantLock.newCondition()的await()和signal()即可
  • 相关阅读:
    LeetCode每日一题——30. 串联所有单词的子串
    如何实现硬件和软件的统一?(从物理世界到电子电路再到计算机科学)
    【日常-bug】文件上传oss报错-跨域- ‘Access-Control-Allow-Origin
    STM32G0开发笔记-Platformio+libopencm3-按键和外部中断
    Web基础与HTTP协议
    网络地图服务(WMS)详解
    18.cuBLAS开发指南中文版--cuBLAS中的Level-2函数gbmv()
    基于ChatGPT的视频智能摘要实战
    R语言绘制时间序列的偏自相关函数图:使用pacf函数可视化时间序列数据的偏自相关系数图、分析是否存在自相关性以及显著相关的个数
    砥砺的前行|基于labview的机器视觉图像处理|NI Vision Assisant(五)——Grayscale(灰度图) 功能
  • 原文地址:https://blog.csdn.net/m0_56079407/article/details/126443778