• Java 多线程共享模型之管程(下)


    共享模型之管程

    wait、notify

    wait、notify 原理

    • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
    • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
    • BLOCKED 线程会在 Owner 线程释放锁时唤醒
    • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入EntryList 重新竞争

    API 介绍

    • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
    • obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
    • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

    它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

    package WaNo;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo2")
    public class demo2 {
    
        static final Object lock = new Object();
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                synchronized (lock){
                    log.debug("执行");
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.debug("其他代码");
                }
            },"t1").start();
    
            new Thread(() -> {
                synchronized (lock){
                    log.debug("执行");
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.debug("其他代码");
                }
            },"t2").start();
    
            Thread.sleep(2000);
            log.debug("唤醒 lock 上其他线程");
            synchronized (lock){
                lock.notify();  //唤醒 lock 上的一个线程(随机)
                //lock.notifyAll();   //唤醒 lock 上的所有线程
            }
        }
    }
    
    • notify()

      20:20:58 [t1] c.demo2 - 执行
      20:20:58 [t2] c.demo2 - 执行
      20:21:00 [main] c.demo2 - 唤醒 lock 上其他线程
      20:21:00 [t1] c.demo2 - 其他代码
      
    • notifyAll()

      20:22:04 [t1] c.demo2 - 执行
      20:22:04 [t2] c.demo2 - 执行
      20:22:06 [main] c.demo2 - 唤醒 lock 上其他线程
      20:22:06 [t2] c.demo2 - 其他代码
      20:22:06 [t1] c.demo2 - 其他代码
      

    wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到notify 为止

    wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify

    wait、notify 正确使用

    sleep vs. wait

    • sleep 是 Thread 方法,而 wait 是 Object 的方法
    • sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
    • sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
    • 它们状态 TIMED_WAITING
    step 1

    思考下面的解决方案好不好,为什么?

    package WaNo;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo4")
    public class demo4 {
        static final Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeOut = false;
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                synchronized (room){
                    log.debug("有烟没?[{}]",hasCigarette);
                    if(!hasCigarette){
                        log.debug("没烟,睡会!");
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("有烟没?[{}]",hasCigarette);
                    if(hasCigarette){
                        log.debug("开始干活!");
                    }
                }
            },"小南").start();
    
            for(int i=0;i<5;i++){
                new Thread(() -> {
                    synchronized (room){
                        log.debug("开始干活!");
                    }
                },"其他人").start();
            }
    
            Thread.sleep(1000);
            new Thread(() -> {
                hasCigarette = true;
                log.debug("烟到了!");
            },"送烟的").start();
        }
    }
    

    输出:

    20:41:09 [小南] c.demo4 - 有烟没?[false]
    20:41:09 [小南] c.demo4 - 没烟,睡会!
    20:41:10 [送烟的] c.demo4 - 烟到了!
    20:41:11 [小南] c.demo4 - 有烟没?[true]
    20:41:11 [小南] c.demo4 - 开始干活!
    20:41:11 [其他人] c.demo4 - 开始干活!
    20:41:11 [其他人] c.demo4 - 开始干活!
    20:41:11 [其他人] c.demo4 - 开始干活!
    20:41:11 [其他人] c.demo4 - 开始干活!
    20:41:11 [其他人] c.demo4 - 开始干活!
    
    • 其它干活的线程,都要一直阻塞,效率太低
    • 小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
    • 加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的
    • 解决方法,使用 wait - notify 机制
    step 2

    思考下面的实现行吗,为什么?

    package WaNo.step;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo4")
    public class step2 {
        static final Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeOut = false;
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                synchronized (room){
                    log.debug("有烟没?[{}]",hasCigarette);
                    if(!hasCigarette){
                        log.debug("没烟,睡会!");
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("有烟没?[{}]",hasCigarette);
                    if(hasCigarette){
                        log.debug("开始干活!");
                    }
                }
            },"小南").start();
    
            for(int i=0;i<5;i++){
                new Thread(() -> {
                    synchronized (room){
                        log.debug("开始干活!");
                    }
                },"其他人").start();
            }
    
            Thread.sleep(1000);
            new Thread(() -> {
                synchronized (room){
                    hasCigarette = true;
                    log.debug("烟到了!");
                    room.notify();
                }
            },"送烟的").start();
        }
    }
    

    输出:

    20:46:32 [小南] c.demo4 - 有烟没?[false]
    20:46:32 [小南] c.demo4 - 没烟,睡会!
    20:46:32 [其他人] c.demo4 - 开始干活!
    20:46:32 [其他人] c.demo4 - 开始干活!
    20:46:32 [其他人] c.demo4 - 开始干活!
    20:46:32 [其他人] c.demo4 - 开始干活!
    20:46:32 [其他人] c.demo4 - 开始干活!
    20:46:33 [送烟的] c.demo4 - 烟到了!
    20:46:33 [小南] c.demo4 - 有烟没?[true]
    20:46:33 [小南] c.demo4 - 开始干活!
    
    • 解决了其它干活的线程阻塞的问题
    • 但如果有其它线程也在等待条件呢?
    step 3
    package WaNo.step;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo4")
    public class step3 {
        static final Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeOut = false;
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                synchronized (room){
                    log.debug("有烟没?[{}]",hasCigarette);
                    if(!hasCigarette){
                        log.debug("没烟,睡会!");
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("有烟没?[{}]",hasCigarette);
                    if(hasCigarette){
                        log.debug("开始干活!");
                    } else {
                        log.debug("没干成活...");
                    }
                }
            },"小南").start();
    
            new Thread(() -> {
                synchronized (room) {
                    Thread thread = Thread.currentThread();
                    log.debug("外卖送到没?[{}]", hasTakeOut);
                    if (!hasTakeOut) {
                        log.debug("没外卖,先歇会!");
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("外卖送到没?[{}]", hasTakeOut);
                    if (hasTakeOut) {
                        log.debug("可以开始干活了");
                    } else {
                        log.debug("没干成活...");
                    }
                }
            }, "小女").start();
    
            for(int i=0;i<5;i++){
                new Thread(() -> {
                    synchronized (room){
                        log.debug("开始干活!");
                    }
                },"其他人").start();
            }
    
            Thread.sleep(1000);
            new Thread(() -> {
                synchronized (room){
                    hasCigarette = true;
                    log.debug("烟到了!");
                    room.notify();
                }
            },"送烟的").start();
        }
    }
    

    输出:

    20:53:12.173 [小南] c.TestCorrectPosture - 有烟没?[false] 
    20:53:12.176 [小南] c.TestCorrectPosture - 没烟,先歇会!
    20:53:12.176 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
    20:53:12.176 [小女] c.TestCorrectPosture - 没外卖,先歇会!
    20:53:13.174 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
    20:53:13.174 [小南] c.TestCorrectPosture - 有烟没?[false] 
    20:53:13.174 [小南] c.TestCorrectPosture - 没干成活...
    

    notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】

    解决方法,改为 notifyAll

    step 4
    new Thread(() -> {
     	synchronized (room) {
     		hasTakeout = true;
     		log.debug("外卖到了噢!");
     		room.notifyAll();
     	}
    }, "送外卖的").start();
    

    输出:

    20:55:23.978 [小南] c.TestCorrectPosture - 有烟没?[false] 
    20:55:23.982 [小南] c.TestCorrectPosture - 没烟,先歇会!
    20:55:23.982 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
    20:55:23.982 [小女] c.TestCorrectPosture - 没外卖,先歇会!
    20:55:24.979 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
    20:55:24.979 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
    20:55:24.980 [小女] c.TestCorrectPosture - 可以开始干活了
    20:55:24.980 [小南] c.TestCorrectPosture - 有烟没?[false] 
    20:55:24.980 [小南] c.TestCorrectPosture - 没干成活...
    

    用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了

    解决方法,用 while + wait,当条件不成立,再次 wait

    step 5

    将 if 改为 while

    while (!hasCigarette) {
     	log.debug("没烟,先歇会!");
     	try {
     		room.wait();
     		} catch (InterruptedException e) {
     			e.printStackTrace();
     		}
    }
    

    输出:

    20:58:34.322 [小南] c.TestCorrectPosture - 有烟没?[false] 
    20:58:34.326 [小南] c.TestCorrectPosture - 没烟,先歇会!
    20:58:34.326 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
    20:58:34.326 [小女] c.TestCorrectPosture - 没外卖,先歇会!
    20:58:35.323 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
    20:58:35.324 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
    20:58:35.324 [小女] c.TestCorrectPosture - 可以开始干活了
    20:58:35.324 [小南] c.TestCorrectPosture - 没烟,先歇会!
    
    套路总结
    synchronized(lock) {
     	while(条件不成立) {
     		lock.wait();
     	}
     	// 干活
    }
    
    //另一个线程
    synchronized(lock) {
     	lock.notifyAll();
    }
    

    同步模式之保护性暂停

    定义

    即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

    要点

    • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
    • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
    • JDK 中,join 的实现、Future 的实现,采用的就是此模式
    • 因为要等待另一方的结果,因此归类到同步模式

    实现
    package WaNo.step;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.ArrayList;
    import java.util.List;
    
    @Slf4j(topic = "c.demo4")
    public class demo4 {
        public static void main(String[] args) {
            //线程1 等待线程2 的下载结果
            GuardedObject guardedObject = new GuardedObject();
            new Thread(() -> {
                List<String> list = (List<String>) guardedObject.get();
                log.debug("结果的大小是:{}",list.size());
            },"t1").start();
    
            new Thread(() -> {
                log.debug("执行下载");
                try {
                    Thread.sleep(5000);
                    List<String> list = new ArrayList<>();
                    list.add("1");
                    guardedObject.complete(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"t2").start();
        }
    }
    
    class GuardedObject {
        //结果
        private Object response;
    
        //获取结果
        public Object get() {
            synchronized (this){
                //还没有结果
                while (response == null){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return response;
            }
        }
    
        //产生结果
        public void complete(Object response){
            synchronized (this){
                //给结果成员变量赋值
                this.response = response;
                this.notifyAll();
            }
        }
    }
    

    输出:

    16:47:15 [t2] c.demo4 - 执行下载
    16:47:20 [t1] c.demo4 - 结果的大小是:1
    

    异步模式之生产者/消费者

    要点

    • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
    • 消费队列可以用来平衡生产和消费的线程资源
    • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
    • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
    • JDK 中各种阻塞队列,采用的就是这种模式

    package WaNo;
    
    import lombok.AllArgsConstructor;
    import lombok.Setter;
    import lombok.ToString;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.LinkedList;
    
    @Slf4j(topic = "c.demo5")
    public class demo5 {
        public static void main(String[] args) {
            MessageQueue queue = new MessageQueue(2);
    
            for (int i = 0; i < 3; i++) {
                int id = i;
                new Thread(() -> {
                    queue.put(new Message(id,"值"+id));
                },"生产者" + i).start();
            }
    
            new Thread(() -> {
                while (true){
                    try {
                        Thread.sleep(1000);
                        Message message = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"消费者").start();
        }
    }
    
    //消息队列类(线程间通信)
    @Slf4j(topic = "c.MessageQueue")
    class MessageQueue {
        //消息队列集合
        private LinkedList<Message> list = new LinkedList<>();
        //队列容量
        private int capcity;
    
        public MessageQueue(int capcity){
            this.capcity = capcity;
        }
    
        //获取消息
        public Message take(){
            //检查队列是否为空
            synchronized (list){
                while (list.isEmpty()){
                    try {
                        log.debug("队列为空,消费者线程等待");
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //从队列头部获取消息返回
                Message message = list.removeFirst();
                log.debug("已消费消息 {}",message);
                list.notifyAll();
                return message;
            }
        }
    
        //存入消息
        public void put(Message message){
            synchronized (list){
                //检查队列是否已满
                while (list.size() == capcity){
                    try {
                        log.debug("队列已满,生产者线程等待");
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //将消息加入队列的尾部
                list.addLast(message);
                log.debug("已生产消息 {}",message);
                list.notifyAll();
            }
        }
    }
    
    @Setter
    @AllArgsConstructor
    @ToString
    @Slf4j(topic = "c.Message")
    final class Message {
        private int id;
        private Object value;
    }
    

    输出:

    17:18:49 [生产者0] c.MessageQueue - 已生产消息 Message(id=0, value=值0)
    17:18:49 [生产者2] c.MessageQueue - 已生产消息 Message(id=2, value=值2)
    17:18:49 [生产者1] c.MessageQueue - 队列已满,生产者线程等待
    17:18:50 [消费者] c.MessageQueue - 已消费消息 Message(id=0, value=值0)
    17:18:50 [生产者1] c.MessageQueue - 已生产消息 Message(id=1, value=值1)
    17:18:51 [消费者] c.MessageQueue - 已消费消息 Message(id=2, value=值2)
    17:18:52 [消费者] c.MessageQueue - 已消费消息 Message(id=1, value=值1)
    17:18:53 [消费者] c.MessageQueue - 队列为空,消费者线程等待
    

    park、unpark

    基本使用

    它们是 LockSupport 类中的方法

    // 暂停当前线程
    LockSupport.park(); 
    
    // 恢复某个线程的运行
    LockSupport.unpark(暂停线程对象)
    

    先 park 再 unpark

    Thread t1 = new Thread(() -> {
     	log.debug("start...");
     	sleep(1);
     	log.debug("park...");
     	LockSupport.park();
     	log.debug("resume...");
    },"t1");
    t1.start();
    
    Thread.sleep(2);
    log.debug("unpark...");
    LockSupport.unpark(t1);
    

    输出:

    18:42:52.585 c.TestParkUnpark [t1] - start... 
    18:42:53.589 c.TestParkUnpark [t1] - park... 
    18:42:54.583 c.TestParkUnpark [main] - unpark... 
    18:42:54.583 c.TestParkUnpark [t1] - resume...
    

    先 unpark 再 park

    Thread t1 = new Thread(() -> {
     	log.debug("start...");
         sleep(2);
         log.debug("park...");
         LockSupport.park();
         log.debug("resume...");
    }, "t1");
    t1.start();
    
    sleep(1);
    log.debug("unpark...");
    LockSupport.unpark(t1);
    

    输出:

    18:43:50.765 c.TestParkUnpark [t1] - start... 
    18:43:51.764 c.TestParkUnpark [main] - unpark... 
    18:43:52.769 c.TestParkUnpark [t1] - park... 
    18:43:52.769 c.TestParkUnpark [t1] - resume...
    

    特点

    与 Object 的 wait & notify 相比

    • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
    • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
    • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

    原理

    每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex

    1. 当前线程调用 Unsafe.park() 方法
    2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
    3. 线程进入 _cond 条件变量阻塞
    4. 设置 _counter = 0

    1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
    2. 唤醒 _cond 条件变量中的 Thread_0
    3. Thread_0 恢复运行
    4. 设置 _counter 为 0

    1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
    2. 当前线程调用 Unsafe.park() 方法
    3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
    4. 设置 _counter 为 0

    重新理解六种状态

    假设有线程 Thread t

    情况一

    NEW --> RUNNABLE

    当调用 t.start() 方法时,由 NEW --> RUNNABLE

    情况二

    ​ RUNNABLE <--> WAITING

    t 线程用 synchronized(obj) 获取了对象锁后

    • 调用 obj.wait() 方法时,t 线程从 RUNNABLE --> WAITING
    • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
      • 竞争锁成功,t 线程从 WAITING --> RUNNABLE
      • 竞争锁失败,t 线程从 WAITING --> BLOCKED

    情况三

    RUNNABLE <--> WAITING

    • 当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING
      • 注意是当前线程t 线程对象的监视器上等待
    • t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE

    情况四

    RUNNABLE <--> WAITING

    • 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
    • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE

    情况五

    ​ RUNNABLE <--> TIMED_WAITING

    t 线程用 synchronized(obj) 获取了对象锁后

    • 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
    • t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
      • 竞争锁成功,t 线程从 TIMED_WAITING --> RUNNABLE
      • 竞争锁失败,t 线程从 TIMED_WAITING --> BLOCKED

    情况六

    RUNNABLE <--> TIMED_WAITING

    • 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
      • 注意是当前线程t 线程对象的监视器上等待
    • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从TIMED_WAITING --> RUNNABLE

    情况七

    RUNNABLE <--> TIMED_WAITING

    • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
    • 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING --> RUNNABLE

    情况八

    RUNNABLE <--> TIMED_WAITING

    • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线程从 RUNNABLE --> TIMED_WAITING
    • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从TIMED_WAITING--> RUNNABLE

    情况九

    RUNNABLE <--> BLOCKED

    • t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED
    • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED

    情况十

    RUNNABLE <--> TERMINATED

    当前线程所有代码运行完毕,进入 TERMINATED

    多把锁

    package WaNo;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo6")
    public class demo6 {
        public static void main(String[] args) {
            BigRoom bigRoom = new BigRoom();
            new Thread(() -> {
                bigRoom.study();
            },"r1").start();
    
            new Thread(() -> {
                bigRoom.sleep();
            },"r2").start();
        }
    }
    
    @Slf4j(topic = "c.BigRoom")
    class BigRoom {
        private final Object studyRoom = new Object();
        private final Object bedRoom = new Object();
    
        public void sleep(){
            synchronized (bedRoom){
                log.debug("sleep two hours");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void study(){
            synchronized (studyRoom){
                log.debug("study one hour");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    输出:

    20:01:42 [r2] c.BigRoom - sleep two hours
    20:01:42 [r1] c.BigRoom - study one hour
    

    将锁的粒度细分

    • 好处,是可以增强并发度
    • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

    活跃性

    死锁

    有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁

    t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁

    Object A = new Object();
    Object B = new Object();
    Thread t1 = new Thread(() -> {
     	synchronized (A) {
     		log.debug("lock A");
     		sleep(1);
    		synchronized (B) {
     			log.debug("lock B");
     			log.debug("操作...");
     		}
     	}
    }, "t1");
    
    Thread t2 = new Thread(() -> {
     	synchronized (B) {
     		log.debug("lock B");
     		sleep(0.5);
     		synchronized (A) {
     			log.debug("lock A");
      			log.debug("操作...");
     		}
     	}
    }, "t2");
    t1.start();
    t2.start();
    

    输出:

    12:22:06.962 [t2] c.TestDeadLock - lock B 
    12:22:06.962 [t1] c.TestDeadLock - lock A
    

    哲学家进餐问题

    有五位哲学家,围坐在圆桌旁。

    • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
    • 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
    • 如果筷子被身边的人拿着,自己就得等待

    这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况

    活锁

    活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束

    public class TestLiveLock {
     	static volatile int count = 10;
     	static final Object lock = new Object();
     	public static void main(String[] args) {
     	new Thread(() -> {
     		// 期望减到 0 退出循环
     		while (count > 0) {
     			sleep(0.2);
     			count--;
     			log.debug("count: {}", count);
     		}
     	}, "t1").start();
     	
     	new Thread(() -> {
     		// 期望超过 20 退出循环
     		while (count < 20) {
     			sleep(0.2);
     			count++;
     			log.debug("count: {}", count);
     		}
     	}, "t2").start();
     }
    

    饥饿

    一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束

    ReentrantLock

    相对于 synchronized 它具备如下特点

    • 可中断
    • 可以设置超时时间
    • 可以设置为公平锁
    • 支持多个条件变量

    与 synchronized 一样,都支持可重入

    基本语法

    // 获取锁
    reentrantLock.lock();
    try {
     	// 临界区
    } finally {
     	// 释放锁
     	reentrantLock.unlock();
    }
    

    可重入

    可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j(topic = "c.demo1")
    public class demo1 {
        private static ReentrantLock lock = new ReentrantLock();
        public static void main(String[] args) {
    
            lock.lock();
            try {
                log.debug("enter main");
                m1();
            }finally {
                lock.unlock();
            }
        }
    
        public static void m1(){
            lock.lock();
            try {
                log.debug("enter m1");
                m2();
            }finally {
                lock.unlock();
            }
        }
    
        public static void m2(){
            lock.lock();
            try {
                log.debug("enter m2");
            }finally {
                lock.unlock();
            }
        }
    }
    

    输出:

    20:19:19 [main] c.demo1 - enter main
    20:19:19 [main] c.demo1 - enter m1
    20:19:19 [main] c.demo1 - enter m2
    

    可打断

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j(topic = "c.demo2")
    public class demo2 {
        private static ReentrantLock lock = new ReentrantLock();
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(() -> {
                try {
                    //如果没有竞争,此方法会获取对象的锁
                    //如果有竞争,就进入阻塞队列,可以被其他线程用 interrupt 打断
                    log.debug("尝试获得锁");
                    lock.lockInterruptibly();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.debug("未获得锁,返回");
                    return;
                }
                try {
                    log.debug("获取到锁");
                }finally {
                    lock.unlock();
                }
            }, "t1");
    
            lock.lock();
            t1.start();
            Thread.sleep(1000);
            log.debug("打断t1");
            t1.interrupt();
        }
    }
    

    输出:

    20:26:05 [t1] c.demo2 - 尝试获得锁
    20:26:06 [main] c.demo2 - 打断t1
    20:26:06 [t1] c.demo2 - 未获得锁,返回
    java.lang.InterruptedException
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    	at ReentrantLockDemo.demo2.lambda$main$0(demo2.java:16)
    	at java.lang.Thread.run(Thread.java:748)
    
    Process finished with exit code 0
    

    注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断

    锁超时

    立刻失败

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j(topic = "c.demo3")
    public class demo3 {
        private static ReentrantLock lock = new ReentrantLock();
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                log.debug("尝试获得锁");
                if(!lock.tryLock()){
                    log.debug("获取不到锁");
                    return;
                }
                try {
                    log.debug("获得到锁");
                }finally {
                    lock.unlock();
                }
            },"t1");
    
            lock.lock();
            log.debug("获得到锁");
            t1.start();
        }
    }
    

    输出:

    20:31:15 [main] c.demo3 - 获得到锁
    20:31:15 [t1] c.demo3 - 尝试获得锁
    20:31:15 [t1] c.demo3 - 获取不到锁
    

    超时失败

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    import sun.reflect.generics.tree.Tree;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j(topic = "c.demo3")
    public class demo3 {
        private static ReentrantLock lock = new ReentrantLock();
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(() -> {
                log.debug("尝试获得锁");
                try {
                    if(!lock.tryLock(1, TimeUnit.SECONDS)){
                        log.debug("获取不到锁");
                        return;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.debug("获取不到锁");
                    return;
                }
                try {
                    log.debug("获得到锁");
                }finally {
                    lock.unlock();
                }
            },"t1");
    
            lock.lock();
            log.debug("获得到锁");
            Thread.sleep(1000);
            lock.unlock();
            t1.start();
        }
    }
    

    输出:

    20:34:03 [main] c.demo3 - 获得到锁
    20:34:04 [t1] c.demo3 - 尝试获得锁
    20:34:04 [t1] c.demo3 - 获得到锁
    

    公平锁

    ReentrantLock 默认是不公平的

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo4")
    public class demo4 {
        public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock(false);
            lock.lock();
            for (int i = 0; i < 500; i++) {
                new Thread(() -> {
                    lock.lock();
                    try {
                        System.out.println(Thread.currentThread().getName() + " running...");
                    } finally {
                        lock.unlock();
                    }
                }, "t" + i).start();
            }
    // 1s 之后去争抢锁
            Thread.sleep(1000);
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + " start...");
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "强行插入").start();
            lock.unlock();
        }
    }
    

    强行插入,有机会在中间输出

    注意该实验不一定总能复现

    t39 running... 
    t40 running... 
    t41 running... 
    t42 running... 
    t43 running... 
    强行插入 start... 
    强行插入 running... 
    t44 running... 
    t45 running... 
    t46 running... 
    t47 running... 
    t49 running...
    

    改为公平锁后

    ReentrantLock lock = new ReentrantLock(true);
    

    强行插入,总是在最后输出

    t465 running... 
    t464 running... 
    t477 running... 
    t442 running... 
    t468 running... 
    t493 running... 
    t482 running... 
    t485 running... 
    t481 running... 
    强行插入 running...
    

    公平锁一般没有必要,会降低并发度

    条件变量

    ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

    • synchronized 是那些不满足条件的线程都在一间休息室等消息
    • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

    使用要点:

    • await 前需要获得锁
    • await 执行后,会释放锁,进入 conditionObject 等待
    • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
    • 竞争 lock 锁成功后,从 await 后继续执行
    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j(topic = "c.demo4")
    public class demo4 {
        private static ReentrantLock lock = new ReentrantLock();
        public static void main(String[] args) {
            //创建一个新的条件变量(休息室)
            Condition condition1 = lock.newCondition();
            Condition condition2 = lock.newCondition();
    
            lock.lock();
            //进入休息室等待
            condition1.await();
            
            condition1.signal();
            //condition1.signalAll();
        }
    }
    

    同步模式之顺序控制

    固定运行顺序

    比如,必须先 2 后 1 打印

    wait notify版

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j(topic = "c.demo4")
    public class demo4 {
        static final Object lock = new Object();
        //表示 t2 是否被运行过
        static boolean t2runned = false;
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                synchronized (lock){
                    while (!t2runned){
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("1");
                }
            },"t1");
    
            Thread t2 = new Thread(() -> {
                synchronized (lock){
                    log.debug("2");
                    t2runned = true;
                    lock.notify();
                }
            },"t2");
    
            t1.start();
            t2.start();
        }
    }
    

    输出:

    20:49:28 [t2] c.demo4 - 2
    20:49:28 [t1] c.demo4 - 1
    

    park unpark版

    可以看到,实现上很麻烦:

    • 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
    • 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
    • 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个

    可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:

    package ReentrantLockDemo;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.LockSupport;
    
    @Slf4j(topic = "demo5")
    public class demo5 {
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                LockSupport.park();
                log.debug("1");
            }, "t1");
    
            t1.start();
    
            new Thread(() -> {
                log.debug("2");
                LockSupport.unpark(t1);
            },"t2").start();
        }
    }
    

    交替输出

    线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

    wait notify版

    package ReentrantLockDemo;
    
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.LockSupport;
    
    @Slf4j(topic = "demo5")
    public class demo5 {
        public static void main(String[] args) {
            WaitNotify wn = new WaitNotify(1,5);
            new Thread(() -> {
                wn.print("a",1,2);
            }).start();
            new Thread(() -> {
                wn.print("b",2,3);
            }).start();
            new Thread(() -> {
                wn.print("c",3,1);
            }).start();
        }
    }
    /*
        输出内容    等待标记    下一个标记
        a           1           2
        b           2           3
        c           3           1
     */
    @AllArgsConstructor
    class WaitNotify{
        //等待标记
        private int flag;
        //循环次数
        private int loopNumber;
    
        //打印
        public void print(String str,int waitFlag,int nextFlag){
            for (int i = 0; i < loopNumber; i++) {
                synchronized (this){
                    while (flag != waitFlag){
                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.print(str);
                    flag = nextFlag;
                    this.notifyAll();
                }
            }
        }
    }
    

    输出:

    abcabcabcabcabc
    

    ReentrantLock版

    package ReentrantLockDemo;
    
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j(topic = "c.demo6")
    public class demo6 {
        public static void main(String[] args) throws InterruptedException {
            AwaitSignal awaitSignal = new AwaitSignal(5);
            Condition a = awaitSignal.newCondition();
            Condition b = awaitSignal.newCondition();
            Condition c = awaitSignal.newCondition();
            new Thread(() -> {
                awaitSignal.print("a", a, b);
            }).start();
            new Thread(() -> {
                awaitSignal.print("b", b, c);
            }).start();
            new Thread(() -> {
                awaitSignal.print("c", c, a);
            }).start();
    
            Thread.sleep(1000);
            awaitSignal.lock();
            try {
                System.out.println("开始。。。");
                a.signal();
            }finally {
                awaitSignal.unlock();
            }
        }
    }
    
    @AllArgsConstructor
    class AwaitSignal extends ReentrantLock {
        private int loopNumber;
        /**
         * @param str 打印内容
         * @param current   进入哪一间休息室
         * @param next  下一间休息室
         */
        public void print(String str,Condition current,Condition next){
            for (int i = 0; i < loopNumber; i++) {
                lock();
                try {
                    try {
                        current.await();
                        System.out.print(str);
                        next.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }finally {
                    unlock();
                }
            }
        }
    }
    

    输出:

    开始。。。
    abcabcabcabcabc
    

    park unpark版

    package ReentrantLockDemo;
    
    import lombok.AllArgsConstructor;
    
    import java.util.concurrent.locks.LockSupport;
    
    public class demo7 {
    
        static  Thread t1;
        static  Thread t2;
        static  Thread t3;
    
        public static void main(String[] args) {
            ParkUnpark pu = new ParkUnpark(5);
            t1 = new Thread(() -> {
                pu.print("a", t2);
            },"t1");
            t2 = new Thread(() -> {
                pu.print("b", t3);
            },"t2");
            t3 = new Thread(() -> {
                pu.print("c", t1);
            },"t3");
    
            t1.start();
            t2.start();
            t3.start();
    
            LockSupport.unpark(t1);
        }
    }
    
    @AllArgsConstructor
    class ParkUnpark{
        private int loopNumber;
    
        public void print(String str,Thread next){
            for (int i = 0; i < loopNumber; i++) {
                LockSupport.park();
                System.out.print(str);
                LockSupport.unpark(next);
            }
        }
    }
    

    输出:

    abcabcabcabcabc
    
  • 相关阅读:
    select的测试demo
    webpack:详解entry和output一些重要API的使用
    软件架构设计(四) 基于服务的架构(SOA)
    springboot基于Java Web的华家医疗器械商城设计与实现毕业设计源码261620
    springboot项目面试题
    php使用sqlServer
    工业互联网安全备受关注,防御体系该如何建设?
    leetcode:6248. 统计中位数为 K 的子数组【问题转化 + 排序二分】
    接口测试基础
    软件测试面试大全(2023版,答案+文档)
  • 原文地址:https://www.cnblogs.com/lcha-coding/p/16365735.html