• 一篇文章带你彻底搞懂wait/notify


    纵向学习

    概念

    多线程环境中,共享条件未满足当前线程的执行,这可能是暂时的,之后其他线程会更新共享条件从而使其成立,因此我们可以将当前线程暂停,直到条件满足,这个过程依赖于wait、notify;
    等待(Wait):一个线程因执行目标动作所需的条件未满足从而被暂停的过程被称为等待;
    通知(Notify):一个线程更新了共享变量的状态,使其他线程满足执行目标动作的条件,从而唤醒被暂停的线程,这个过程被成为通知。
    在java中,任何对象都可以实现等待和通知。

    使用

    wait()
    Object.wait()的作用使其执行的线程被暂停(生命状态变为WAITING),Object.wait()执行的线程被成为等待线程。
    我们先看Object.wait()的伪代码:

       //原子操作,在调用之前需要新获取内部锁
        synchronized (someObject){
            while(保护条件不成立){
                //调用someObject来暂停当前线程
                someObject.wait()
            }
            //其它线程执行了notify,使条件满足时,执行目标动作
            doAction();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    其中,保护条件时一个共享变量条件表达式,返回值为布尔类型,当保护条件不成立,会执行wait()方法。一个线程只有在持有一个对象的内部锁的情况下才能够调用该对象的wait()方法,因此someObject.wait()一定要放到临界区当中。上面的伪代码就是受保护的方法,其中包含了3个条件:保护条件(共享变量布尔表达式)、暂停当前线程和目标动作。

    由于一个对象的someObject.wait()方法可以被多个线程执行,因此一个对象可能存在多个等待线程。someObject上的等待线程可以通过其他线程执行someObject.notify()来唤醒。在线程执行someObject.wait()方法时会释放锁(目的是为了防止锁泄漏),但该方法并未执行结束。其它线程执行someObject.notify()时会随机唤醒一个线程,被唤醒的线程并不会立刻结束wait()方法,而是需要再次申请someObject对象的内部锁,当持有锁的时候才会执行someObject.wait()剩余的指令,直到方法返回(这里看不懂没关系,下面有它的原理讲解)。
    注意事项:
    1、一定要用synchronized来修饰:为了保证等待线程对保护条件的判断以及目标动作的执行是一个原子操作,因为在保护条件判断成立后、目标动作执行前可能有其他线程对共享变量更新,使保护条件重新不成立,导致执行结果未符合预期。
    2、一定要使用while循环:当其他线程更新保护条件并执行someObject.notify()时,等待线程被唤醒,在运行到持有该对象内部锁的这段时间内,又有其它线程更新了保护条件使其又不成立;所以,需要被等待线程再次判断保护条件是否真的成立。
    3、当前线程执行Object.wait()释放的锁只是该wait方法所属对象的内部锁,当前线程所持有其它对象的锁并不会被释放。

    notify()
    Object.notify()的作用是随机唤醒一个被暂停的线程,Object.notify()的执行线程被称为通知线程。
    Object.notify()的伪代码如下:

    
        synchronized (someObject){
            //更新保护条件
            updateSharedState();
            //唤醒其他线程
            someObject.notify();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    上面的代码被成为通知方法;它包含了2个要素:更新共享变量,唤醒其他线程。同wait方法,只有线程持有内部锁的情况下才能执行notify方法,因此,someObject.notify()总是放在临界区当中。也正是因为这样,所以线程执行完someObject.wait()方法后需要释放内部锁,否则通知线程无法进入临界区,无法执行notify。
    注意事项:
    1、唤醒线程执行完someObject.notify()方法之后并不会释放内部锁,因此,为了让等待线程可以快速的重新获取锁,要尽量将唤醒线程的someObject.notify()方法放到靠近临界区结束的位置。
    2、唤醒线程执行完someObject.notify()方法后,只会随机唤醒一个等待线程,有可能并不是我们期望唤醒的那个线程,所以可以使用someObject.notifyAll()来唤醒该对象上的所有等待线程。

    原理

    java虚拟机会为每个对象维护一个称为等待集(Wait Set)的队列,该队列用于存储该对象上的等待线程。当前线程执行Object.wait()方法时会将自己加入到该对象的等待集中,并释放当前对象的内部锁,当其它线程执行Object.nofity()方法时,会随机唤醒等待集中的一个线程,被唤醒的线程仍然会停留在当前对象的等待集中(Object.wait()方法尚未执行结束),直到被唤醒的线程再次获取到内部锁,jvm才会把当前线程从等待集中移除,之后Object.wait()方法才执行结束。
    Object.wait()的伪代码如下:

       public void wait() {
            //当前线程必须持有当前对象的内部锁
            if (!Thread.holdsLock(this)) {
                throw new IllegalMonitorStateException();
            }
            if (当前对象不在等待集中){
                //将当前线程加入到当前对象的等待集中
                addToWaitSet(Thread.currentThread());
            }
    
            //原子操作
            atomic{
                //释放当前对象的内部锁
                releaseLock(this);
                //暂停当前线程
                block(Thread.currentThread());//语句1
            }
    
            //再次申请当前对象的内部锁
            acquireLock(this);//语句2
            //将当前线程从当前对象等待集中移除
            removeFromWaitSet(Thread.currentThread());
    
            //这时候再返回
            return;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    实战

    Object.wait()/notify()

    现在用wait、notify来演示一个demo(可将代码粘贴到本地便于理解),场景如下:
    现在有两个系统,一个告警系统,一个告警接收系统;前者负责发送告警信息给后者。该代码主要有三个线程来实现的,一个线程用来与告警接收系统建立网络连接;一个线程是心跳线程,负责监听与告警接收系统的连接情况;还有一个线程负责发送告警信息给告警接收系统(执行sendAlarm方法)

    public class AlarmAgent {
    
        private final static AlarmAgent INSTANCE = new AlarmAgent();
    
        private Object lock = new Object();
    
        //是否可以连接到告警服务器
        private boolean connectedToServer = false;
        //心跳线程,用于检测告警代理与告警服务器的网络连接是否正常
        private final HeartBeatThread heartBeatThread = new HeartBeatThread();
    
        private AlarmAgent() {
    
        }
    
        public static AlarmAgent getInstance() {
            return INSTANCE;
        }
    
        @PostConstruct
        public void init() {
            connectToServer();
            heartBeatThread.setDaemon(true);
            heartBeatThread.start();
        }
    
    
        public void connectToServer() {
            //创建启动网络连接线程,该线程中与告警服务器建立连接
            new Thread(() -> {
                doConnect();
            }).start();
        }
    
        private void doConnect() {
            //模拟实际操作耗时
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (this) {
                connectedToServer = true;
                //连接建立完毕,通知以唤醒告警发送线程
                lock.notify();
            }
        }
    
        //被调用的方法
        public void sendAlarm(String message) throws InterruptedException {
            synchronized (this) {
                while (!connectedToServer) {
                    System.out.println("Alarm was not connected to server");
                    lock.wait();
                }
                //将告警消息上报到告警服务器
                doSendAlarm(message);
            }
        }
    
        private void doSendAlarm(String message) {
    
            System.out.printf("Alarm sent %s", message);
        }
    
    
        class HeartBeatThread extends Thread {
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(1000);
                while (true) {
                    if (checkConection()) {
                        connectedToServer = true;
                    } else {
                        connectedToServer = false;
                        System.out.println("Alarm was disconnected from server");
                        //检测到连接中断,重新建立连接
                        connectToServer();
                    }
                    Thread.sleep(2000);
                }
            }
        }
    
        //检测告警与服务器的连接情况
        private boolean checkConection() {
            boolean isConnected = true;
            final Random random = new Random();
    
            //模拟随机性的网络连接
            int rand = random.nextInt(1000);
            if (rand < 500) {
                isConnected = false;
            }
            return isConnected;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    connectedToServer 变量表示告警和告警接收系统的连接状态,在发送告警(sendAlarm方法)时,要检测连接状态,如果connectedToServer为false,表示连接失败,则进行wait;当心跳线程判断连接失败的时候,会执行connectToServer()方法,修复成功后,进行nofity,这时发送告警的线程就可以继续执行doSendAlarm方法。

    Object.wait(long)/notify()

    Object.wait()方法会一直等待下去,直到被唤醒;Object.wait(long)方法可以指定一个超时时间,在这期间未被唤醒的话,则java虚拟机会自动唤醒该线程。但是要区分是等待超时而结束还是由其他线程主动唤醒而结束的,需要进行额外的处理。可以参考如下wait(long)方法的使用:

    public class TimedWaitNotify {
    
        private final static Object lock = new Object();
    
        private static boolean ready = false;
    
        protected final static Random random = new Random();
    
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                for (; ; ) {
                    synchronized (lock) {
                        ready = random.nextInt(100) < 50 ? true : false;
                        if (ready) {
                            lock.notify();
                        }
                    }
                    try {
                        //模拟继续执行任务
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            });
            thread.setDaemon(true);
            thread.start();
            waiter(1000);
        }
    
        public static void waiter(final long timeOut) throws InterruptedException {
            if (timeOut < 0) {
                throw new IllegalArgumentException();
            }
    
            long start = System.currentTimeMillis();
            long waitTime;
            long now;
            synchronized (lock) {
                while (!ready) {
                    now = System.currentTimeMillis();
                    //计算剩余等待时间
                    waitTime = timeOut - (now - start);
                    if (waitTime < 0) {
                        //执行超时
                        break;
                    }
                    //非执行超时,继续等待
                    lock.wait(waitTime);
                }
                if (ready) {
                    //表明是被notify唤醒的,继续执行要做的内容
                } else {
                    //等待了 waitTime,但还是没有notify被唤醒,则证明是等待超时
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    在上述执行wait(long)方法之前,要先计算出该程序的执行时间,然后用等待的时间减去前者,得到的结果才是最终需要等待的时间。如果等待了这些时间还没有被唤醒,则说明是等待超时了。

    问题

    wait、notify的使用不当会带来如下问题:

    过早唤醒

    假设T1、T2两个线程使用不同的保护条件,但都是someObject对象上的线程,当T1、T2都进行someObject.wait()的时候,线程T3执行了T1的保护条件使其成立,但执行了someObject.notifyAll(),导致T2也被唤醒,当T2被唤醒之后,发现自己的条件并未满足,于是再次进行等待。这种等待线程因所需保护条件并未成立而被唤醒的现象叫做过早唤醒。过早唤醒会造成资源的浪费。
    解决方法可使用Condition(下文介绍),或者使用不同的对象锁。

    信号丢失

    如果等待线程没有进行保护条件的判断,直接执行了someObject.wait()方法,但通知线程在等待线程执行someObject.wait()方法之前已经执行过了someObject.notify(),对等待线程来说,丢失了一个被唤醒的信号,导致等待线程永远等待下去,因此被称为信号丢失。信号丢失还有一种体现就是多个等待线程共用同一个保护条件,但是通知线程只执行了notify,未执行notifyAll,因此只会随机唤醒一个等待线程,其他线程将继续等待。
    解决方法是等待线程的保护条件放在while循环当中进行判断。

    欺骗性唤醒

    等待线程有可能在没有任何线程执行notify或notifyAll方法的情况下被唤醒。这种现象被成为欺骗性唤醒。这种概率非常低,导致原因是java平台对操作系统的妥协。
    解决方法和信号丢失的解决方法一致,将保护条件放到while循环中。

    上下文切换

    等待线程在完整的执行完wait方法后,会进行至少两次锁的申请与释放,第一次是进入临界区和退出临界区,第二次是在执行wait方法时先释放锁,通知线程执行notify时,等待线程需要再次申请锁;一共两次。这时如果等待线程并没有申请到锁,就会再次阻塞,从而导致了上下文切换。其次如果遇到过早唤醒的现象,也会增加额外的上下文切换。
    通知线程要完整的执行notify方法也需要一次锁的申请与释放。
    减少上下文切换的方法之一是使用notify来代替notifyAll,减少过早唤醒的现象。另外把通知线程的notify和notifyAll方法放到退出临界区的最后代码部分,保证通知完成可以立即释放锁,让等待线程快速拿到锁。

    横向学习

    Thread.join()

    Thread.join()方法也可以实现线程的等待,并且可以通过Thread.join(long time) 来进行指定时间的等待,与wait、notify方法一样。但实际上,Thread.join(long time)是通过wait、notify来实现的。

     public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    
            if (millis == 0) {
                while (isAlive()) {
                    wait(0);
                }
            } else {
                while (isAlive()) {
                    long delay = millis - now;
                    if (delay <= 0) {
                        break;
                    }
                    //等待线程执行wait方法,目标线程run方法执行结束会执行notify方法
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    条件变量

    概念

    由于wait、notify比较底层,操作不当容易出现过早唤醒等问题,因此,在工作中,我们尽量使用Condition可以替代wait、notify的使用。但经过上文对wait、notify的学习,可以有效帮助我们对Condition的理解,其使用和wait、nofity基本一致。
    Lock.newCondition()的返回值就是一个Condition实例,因此可以使用任意一个显示锁进行调用,并使用Condition.await/signal代替了上文的Object.wait/notify,Condition的实例就被成为条件变量

    使用

    每一个Condition实例内部都有一个存储等待线程的等待队列,设cond1和cond2是两个不同的Condition实例,cond1执行await方法时,会进入等待队列中,通知线程执行cond.signal时,只会唤醒cond1中的任意一个线程,并不会影响cond2中等待的线程,即使执行了cond1.singalAll也不会影响,从而解决了过早唤醒的问题。
    condition.await/singal的使用和wait/notify基本上是一致的,它的使用模板如下:

    public class ConditionDemo {
    
        private final Lock lock = new ReentrantLock();
    
        private Condition condition = lock.newCondition();
    
        public void testMethod() throws InterruptedException {
            lock.lock();
            try {
                while (保护条件不成立) {
                    condition.await();
                }
            } finally {
                lock.unlock();
            }
            //do something
        }
    
        public void notifyMethod(){
            lock.lock();
            try {
                //更新了条件变量
                changeConditon();
                condition.notify();
            } finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    主要区别就是将wait/notify的内部锁换成了显示锁。
    上面说到Condition可以解决过早唤醒的问题,解决方法为创建不同的条件变量。除此之外,它的awaitUtil(Date)可以区分出等待线程的唤醒由于等待超时还是主动唤醒引起的。
    下面是它的使用demo:

    public class ConditionTimeWait {
        private final static Lock lock = new ReentrantLock();
        private static Condition condition = lock.newCondition();
        private static boolean ready = false;
        protected final static Random random = new Random();
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                lock.lock();
                try {
                    ready = random.nextInt(100) < 50 ? true : false;
                    if (ready) {
                        condition.signal();
                    }
                } finally {
                    lock.unlock();
                }
                try {
                    //模拟继续执行任务
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            thread.setDaemon(true);
            thread.start();
            waiter(1000);
        }
    
        public static void waiter(final long timeOut) throws InterruptedException {
            if (timeOut < 0) {
                throw new IllegalArgumentException();
            }
    
            //最终的等待时间
            Date deadLineDate = new Date(System.currentTimeMillis() + timeOut);
            boolean continueToWait = true;
            lock.lock();
            try {
                while (!ready) {
                    if (!continueToWait) {
                        return;
                    }
                    //返回为true说明是被唤醒的,否则是超时等待。
                    continueToWait = condition.awaitUntil(deadLineDate);
                }
                doSomething();
            } finally {
                lock.unlock();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    condition.awaitUntil(Data)方法返回为true就是就是被唤醒的,如果返回为false表示是超时等待引起的。
    感谢您的观看,欢迎评论,一起探讨~

  • 相关阅读:
    数据化管理洞悉零售及电子商务运营——商品中的数据化管理
    图解拓扑排序+代码实现
    Matlab与.m脚本文件没有关联,怎么破?
    卫星物联网生态建设全面加速,如何抓住机遇?
    两周面试,遇到的那些奇事
    [论文阅读] Generative Adversarial Networks for Video-to-Video Domain Adaptation
    鹅长微服务发现与治理巨作PolarisMesh实践-上
    Oracle大表在线重新分区
    Python学习笔记--初识 Python 正则表达式
    01. 课程简介
  • 原文地址:https://blog.csdn.net/aaaPostcard/article/details/126335063