• 并发编程day06


    线程这块的一些工具类,基本都会以原理为主,希望大家能有一个这样的意识,
    通过分析别人代码的设计和实现,给自己提供积累一些方法和工具。

    Condition

    在前面讲过synchronized的时候,有讲到wait/notify的基本使用,结合synchronized可以实现对线程的通信。那么这个时候我就在思考了,既然J.U.C里面提供了锁的实现机制,那么J.U.C里面有没有提供类似的线程通信的工具呢?于是找一找,发现一个Condition工具类。
    Condition是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件,只有满足条件,线程才会被唤醒
    Condition的基本使用

    ConditionWait

    		public class ConditionDemoWait implements Runnable{
    		private Lock lock;
    		private Condition condition;
    		public ConditionDemoWait(Lock lock,Condition condition){
    		this.lock=lock;
    		this.condition=condition;}
    		@Override
    		public voidrun() {
    		System.out.println("begin -ConditionDemoWait");
    		try{
    		lock.lock();
    		condition.await();
    		System.out.println("end -ConditionDemoWait");
    		}catch(InterruptedException e) {
    		e.printStackTrace();
    		}finally{
    		lock.unlock();
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ConditionSignal

    				public class ConditionDemoSignal implementsRunnable{
    				private Lock lock;
    				private Condition condition;
    				public ConditionDemoSignal(Lock lock,Condition condition)
    				{
    				this.lock=lock;
    				this.condition=condition;
    				}
    				@Override
    				public void run() {
    				System.out.println("begin -ConditionDemoSignal");
    				try{
    				lock.lock();
    				condition.signal();
    				System.out.println("end -ConditionDemoSignal");
    				}finally{
    				lock.unlock();
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    通过这个方案简单实现了wait和notify的功能,当调用await方法后,当前线程会释放锁并等待,而其他线程调用condition对象的signal或者signalall方法通知并被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获取之前的锁继续执行,最后释放锁。
    所以,condition中两个重要的方法,一个await,一个是signal方法
    await:把当前线程阻塞挂起
    signal:唤醒阻塞的线程

    Condition源码分析

    调用Condition,需要获得Lock锁,所以意味着会存在一个AQS同步队列,在上面那个案例中,假如两个线程同时运行的话,那么AQS的队列可能是下面这种情况
    在这里插入图片描述
    那么这个时候ThreadA调用了condition.await方法,它做了什么事情呢?

    condition.await

    调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,
    同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁
    public final void await()throws InterruptedException {
    if(Thread.interrupted())
    //表示await允许被中断
    throw new InterruptedException();
    Node node = addConditionWaiter();
    //创建一个新的节点,节点状态为condition,采用的数据结构仍然是链表
    int savedState = fullyRelease(node);
    //释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程
    int interruptMode = 0;
    //如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞
    while(!isOnSyncQueue(node)){
    //判断这个节点是否在AQS队列上,第一次判断的是false,因为前面已经释放锁了
    LockSupport.park(this);//通过park挂起当前线程
    if((interruptMode=checkInterruptWhileWaiting(node))!= 0)
    break;
    }
    //当这个线程醒来,会尝试拿锁,当acquireQueued返回false就是拿到锁了
    .// interruptMode != THROW_IE ->表示这个线程没有成功将node入队,但signal执行了enq方法让其入队了.
    //将这个变量设置成REINTERRUPT.
    if(acquireQueued(node,savedState)&&interruptMode !=THROW_IE)
    interruptMode =REINTERRUPT;
    //如果node的下一个等待者不是null,则进行清理,清理Condition队列上的节点.
    //如果是null ,就没有什么好清理的了
    if(node.nextWaiter!=null)
    // clean up if cancelled
    unlinkCancelledWaiters();
    //如果线程被中断了,需要抛出异常.或者什么都不做
    if(interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }
    addConditionWaiter
    这个方法的主要作用是把当前线程封装成Node,添加到等待队列。
    这里的队列不再是双向链表,而是单向链表
    private Node addConditionWaiter() {
    Node t =lastWaiter;
    //
    如 果lastWaiter不 等 于 空 并 且waitStatus不等于CONDITION时,把冲好这个节点从链表中移除
    if(t !=null&& t.waitStatus!=Node.CONDITION){
    unlinkCancelledWaiters();
    t =lastWaiter;
    }
    //构建一个Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比AQS来说会简单很多

    Node node=newNode(Thread.currentThread(),Node.CONDITION);
    if(t ==null)firstWaiter= node;
    else
    t.nextWaiter= node;
    lastWaiter= node;
    return node;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    图解分析执行完addConditionWaiter这个方法之后,就会产生一个这样的condition队列
    在这里插入图片描述

    fullyRelease

    fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
    final int fullyRelease(Node node) {
    boolean failed =true;
    try
    {
    int savedState = getState();
    //获得重入的次数
    if(release(savedState))
    {//释放锁并且唤醒下一个同步队列中的线程
    failed =false;
    return savedState;
    }else{
    throw new IllegalMonitorStateException();
    }
    }finally{
    if(failed)
    node.waitStatus
    =Node.CANCELLED;
    }
    }
    图解分析
    此时,同步队列会触发锁的释放和重新竞争。ThreadB获得了锁。
    在这里插入图片描述
    isOnSyncQueue
    判断当前节点是否在同步队列中,返回false表示不在,返回true表示在
    如果不在AQS同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,
    直到其他的线程调用signal唤醒如果在AQS同步队列,意味着它需要去竞争同步锁去获得
    执行程序执行权限为什么要做这个判断呢?原因是在condition队列中的节点会重新加入到AQS
    队列去竞争锁。也就是当调用signal的时候,会把当前节点从condition队列转移到AQS队列➢
    大家思考一下,基于现在的逻辑结构。如何去判断ThreadA这个节点是否存在于AQS
    队列中呢?
    1.如果ThreadA的waitStatus的状态为CONDITION,说明它存在于condition队列中,不在AQS队列。
    因为AQS队列的状态一定不可能有CONDITION
    2.如果node.prev为空,说明也不存在于AQS队列,原因是prev=null在AQS队列中只有一种可能性,就是它是head节点,head节点意味着它是获得锁的节点。
    3.如果node.next不等于空,说明一定存在于AQS队列中,因为只有AQS队列才会存在next和prev的关系
    4.findNodeFromTail,表示从tail节点往前扫描AQS队列,一旦发现AQS队列的节点和当前节点相等,说明节点一定存在于AQS队列中
    final boolean isOnSyncQueue(Nodenode) {
    if(node.waitStatusNode.CONDITION||node.prevnull)
    return false;
    if(node.next!=null)// If has successor, it must be on queue
    return true;
    return findNodeFromTail(node);
    }
    Condition.signal
    await方法会阻塞ThreadA,然后ThreadB抢占到了锁获得了执行权限,这个时候在ThreadB中调用了
    Condition的signal()方法,将会唤醒在等待队列中节点
    public final void signal() {
    if(!isHeldExclusively())//先判断当前线程是否获得了锁,这个判断比较简单,直接用获得锁的线程和当前线程相比即可
    throw new IllegalMonitorStateException();
    Node first =firstWaiter;
    //拿到Condition队列上第一个节点
    if(first !=null)
    doSignal(first);
    }
    Condition.doSignal
    对condition队列中从首部开始的第一个condition状态的节点,执行transferForSignal操作,将node
    从condition队列中转换到AQS队列中,同时修改AQS队列中原先尾节点的状态
    private void doSignal(Node first) {
    do{//从Condition队列中删除first节点
    if((firstWaiter= first.nextWaiter)==null)
    lastWaiter=null;
    //将next节点设置成null
    first.nextWaiter=null;
    }
    while(!transferForSignal(first) &&(first =firstWaiter)!=null);
    }
    AQS.transferForSignal
    该方法先是CAS修改了节点状态,如果成功,就将这个节点放到AQS队列中,然后唤醒这个节点上的线程。此时,那个节点就会在await方法中苏醒

    final boolean transferForSignal(Node node){
    if(!compareAndSetWaitStatus(node,Node.CONDITION,0))
    //更新节点的状态为0,如果更新失败,只有一种可能就是节点被CANCELLED了
    return false;
    Node p = enq(node);
    //调用enq,把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail节点
    int ws = p.waitStatus;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    //
    如果上一个节点的状态被取消了,或者尝试设置上一个节点的状态为SIGNAL失败了(SIGNAL
    表示:他的next节点需要停止阻塞),
    if(ws > 0||!compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread);
    //唤醒节点上的线程
    .
    return true;
    //
    如果node的prev节点已经是signal状态,那么被阻塞的ThreadA的唤醒工作由AQS队列来完成
    }
    图解分析执行完doSignal以后,会把condition队列中的节点转移到aqs队列上,逻辑结构图如下
    这个时候会判断ThreadA的prev节点也就是head节点的waitStatus,如果大于0或者设置SIGNAL失败,
    表示节点被设置成了CANCELLED状态。这个时候会唤醒ThreadA这个线程。否则就基于AQS队列的机制来唤醒,也就是等到ThreadB释放锁之后来唤醒ThreadA
    在这里插入图片描述
    被阻塞的线程唤醒后的逻辑
    前面在分析await方法时,线程会被阻塞。而通过signal被唤醒之后又继续回到上次执行的逻辑中标注为红色部分的代码checkInterruptWhileWaiting这个方法是干嘛呢?其实从名字就可以看出来,就是
    ThreadA在condition队列被阻塞的过程中,有没有被其他线程触发过中断请求
    public final void await()throws InterruptedException {
    if(Thread.interrupted())throw new
    InterruptedException();
    Node node =addConditionWaiter();
    int savedState =fullyRelease(node);
    int interruptMode = 0;
    while(!isOnSyncQueue(node)){
    LockSupport.park(this);
    if((interruptMode =checkInterruptWhileWaiting(node))!= 0)
    break;
    }
    if(acquireQueued(node,savedState) && interruptMode !=THROW_IE)
    interruptMode =REINTERRUPT;
    if(node.nextWaiter!=null)
    //clean up if cancelled
    unlinkCancelledWaiters();
    if(interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
    }
    checkInterruptWhileWaiting;
    如果当前线程被中断,则调用transferAfterCancelledWait方法判断后续的处理应该是抛出
    InterruptedException还是重新中断。这里需要注意的地方是,如果第一次
    CAS失败了,则不能判断当前线程是先进行了中断还是先进行了signal方法的调用,可能是先执行了
    signal然后中断,也可能是先执行了中断,后执行了signal,当然,这两个操作肯定是发生在
    CAS之前。这时需要做的就是等待当前线程的node被添加到AQS队列后,也就是enq方法返回后,
    返回false告诉checkInterruptWhileWaiting方法返回REINTERRUPT(1),后续进行重新中断。
    简单来说,该方法的返回值代表当前线程是否在park的时候被中断唤醒,如果为true表示中断在
    signal调用之前,signal还未执行,那么这个时候会根据await的语义,在await时遇到中断需要抛出
    interruptedException,返回true就是告诉checkInterruptWhileWaiting返回THROW_IE(-1)。
    如果返回false,否则表示signal已经执行过了,只需要重新响应中断即可
    private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted()?(transferAfterCancelledWait(node) ?
    THROW_IE:REINTERRUPT):0;
    }
    final boolean
    transferAfterCancelledWait(Node
    node) {
    //使用cas修改节点状态,如果还能修改成功,说明线程被中断时,signal还没有被调用。
    //这里有一个知识点,就是线程被唤醒,并不一定是在java层面执行了locksupport.unpark,也可能是调用了线程的interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。

    if(compareAndSetWaitStatus(node,Node.CONDITION,0)){
    enq(node);
    //如果cas成功,则把node添加到AQS队列
    return true
    ;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    //如果cas失败,则判断当前node是否已经在AQS队列上,如果不在,则让给其他线程执行
    //当node被触发了signal方法时,node就会被加到aqs队列上
    while(!isOnSyncQueue(node))
    //循环检测node是否已经成功添加到AQS队列中。如果没有,则通过yield,
    Thread.yield();
    return false;
    }
    acquireQueued
    这个方法在讲aqs的时候说过,是的当前被唤醒的节点ThreadA去抢占同步锁。并且要恢复到原本的重入次数状态。调用完这个方法之后,AQS队列的状态如下将head节点的waitStatus设置为-1,Signal状态。
    在这里插入图片描述

  • 相关阅读:
    c++TCP socket实时文件传输
    JavaWeb基础9——Filter&Listener&Ajax
    【3D视觉原理】3-3D数据表示与转换
    tiup mirror clone
    安卓开发-基础知识补习12
    毛球修剪器方案开发的工作原理和构成
    js进阶笔记之构造函数
    文件内容的操作
    大数据在电力行业的应用案例100讲(十六)-Full GC对Hbase影响及相关解决方法实践
    React: 组件介绍 Components
  • 原文地址:https://blog.csdn.net/weixin_49349744/article/details/125521945