• (多线程)并发编程的三大基础应用——阻塞队列、定时器、线程池【手搓源码】


    9.2 阻塞式队列

    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述

    BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
    
    • 1
    BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    // 入队列
    queue.put("abc");
    // 出队列. 如果没有 put 直接 take, 就会阻塞. 
    String elem = queue.take();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
        Thread customer = new Thread(() -> {
            while (true) {
                try {
                    int value = blockingQueue.take();
                    System.out.println("消费元素: " + value);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
           }
       }, "消费者");
        customer.start();
        Thread producer = new Thread(() -> {
            Random random = new Random();
            while (true) {
                try {
                    int num = random.nextInt(1000);
                    System.out.println("生产元素: " + num);
                    blockingQueue.put(num);
                    Thread.sleep(1000);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
           }
       }, "生产者");
        producer.start();
        customer.join();
        producer.join();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    阻塞队列实现
    // 名字还是不要和标准库的混淆
    class MyBlockingQueue {
        private int[] items = new int[1000];
        private volatile int head = 0;
        private volatile int tail = 0;
        private volatile int size = 0;
    
        // 入队列
        public void put(int elem) throws InterruptedException {
            synchronized (this) {
                // 判定队列是否满了, 满了则不能插入.
                while (size >= items.length) {
                    this.wait();
                }
                // 进行插入操作, 把 elem 放到 items 里, 放到 tail 指向的位置.
                items[tail] = elem;
                tail++;
                if (tail >= items.length) {
                    tail = 0;
                }
                size++;
                this.notify();
            }
        }
    
        // 出队列, 返回删除的元素内容
        public Integer take() throws InterruptedException {
            synchronized (this) {
                // 判定队列是否空, 如果空了, 则不能出队列
                while (size == 0) {
                    this.wait();
                }
                // 进行取元素操作.
                int ret = items[head];
                head++;
                if (head >= items.length) {
                    head = 0;
                }
                size--;
                this.notify();
                return ret;
            }
        }
    }
    
    public class TestMyBlockingQueue {
        public static void main(String[] args) {
            MyBlockingQueue queue = new MyBlockingQueue();
    
            Thread producer = new Thread(() -> {
                int n = 1;
                while (true) {
                    try {
                        queue.put(n);
                        System.out.println("生产元素 " + n);
                        n++;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            Thread customer = new Thread(() -> {
                while (true) {
                    try {
                        int n = queue.take();
                        System.out.println("消费元素 " + n);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            producer.start();
            customer.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    9.3 定时器

    定时器的使用样例
    public class ThreadTimer {
        public static void main(String[] args) {
            // 标准库的定时器.
            Timer timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    System.out.println("时间到, 快起床!");
                }
            }, 3000);
    
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    System.out.println("时间到2!");
                }
            }, 4000);
    
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    System.out.println("时间到3!");
                }
            }, 5000);
    
            System.out.println("开始计时!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    Timer timer = new Timer();
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("hello");
       }
    }, 3000);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    实现定时器
    定时器的构成:
    • 一个带优先级的阻塞队列

    为啥要带优先级呢?
    因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带优先级的队列就可以高效的把这个 delay 最小的任务找出来.

    • 队列中的每个元素是一个 Task 对象.
    • Task 中带有一个时间属性, 队首元素就是即将
    • 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行
    定时器完整代码
    package threading;
    
    import java.util.ArrayDeque;
    import java.util.PriorityQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.PriorityBlockingQueue;
    
    // 这个类表示一个任务
    class MyTask implements Comparable<MyTask> {
        // 要执行的任务
        private Runnable runnable;
        // 什么时间来执行任务. (是一个时间戳)
        private long time;
    
        public MyTask(Runnable runnable, long delay) {
            this.runnable = runnable;
            this.time = System.currentTimeMillis() + delay;
        }
    
        public Runnable getRunnable() {
            return runnable;
        }
    
        public long getTime() {
            return time;
        }
    
        @Override
        public int compareTo(MyTask o) {
            return (int) (this.time - o.time);
        }
    }
    
    class MyTimer {
        private BlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();	
    
        private Object locker = new Object();
    
        public MyTimer() {
            // 创建一个扫描线程.
            Thread t = new Thread(() -> {
                while (true) {
                    try {
                        synchronized (locker) {
                            // 取出队首元素
                            MyTask task = queue.take();
                            // 假设当前时间是 2:30, 任务设定的时间是 2:30, 显然就要执行任务了.
                            // 假设当前时间是 2:30, 任务设定的时间是 2:29, 也是到点了, 也要执行任务.
                            long curTime = System.currentTimeMillis();
                            if (curTime >= task.getTime()) {
                                // 到点了, 改执行任务了!!
                                task.getRunnable().run();
                            } else {
                                // 还没到点
                                queue.put(task);
                                // 没到点, 就等待
                                locker.wait(task.getTime() - curTime);
                            }
                        }	
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
    
        public void schedule(Runnable runnable, long after) throws InterruptedException {
            synchronized (locker) {
                MyTask myTask = new MyTask(runnable, after);
                queue.put(myTask);
                locker.notify();
            }
        }
    
    }
    
    public class ThreadMyTimer {
        public static void main(String[] args) throws InterruptedException {
            MyTimer timer = new MyTimer();
            timer.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("时间到1!");
                }
            }, 3000);
            timer.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("时间到2!");
                }
            }, 4000);
            timer.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("时间到3!");
                }
            }, 5000);
            System.out.println("开始计时");
    
            ArrayDeque<String> a = new ArrayDeque<>();
            a.peekLast();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    9.4 线程池

    线程池是什么
    虽然创建线程 / 销毁线程 的开销

    想象这么一个场景:
    在学校附近新开了一家快递店,老板很精明,想到一个与众不同的办法来经营。店里没有雇人,
    而是每次有业务来了,就现场找一名同学过来把快递送了,然后解雇同学。这个类比我们平时来
    一个任务,起一个线程进行处理的模式。
    很快老板发现问题来了,每次招聘 + 解雇同学的成本还是非常高的。老板还是很善于变通的,知道了为什么大家都要雇人了,所以指定了一个指标,公司业务人员会扩张到 3 个人,但还是随着业务逐步雇人。于是再有业务来了,老板就看,如果现在公司还没 3 个人,就雇一个人去送快递,否则只是把业务放到一个本本上,等着 3 个快递人员空闲的时候去处理。这个就是我们要带出的线程池的模式。

    线程池最大的好处就是减少每次启动、销毁线程的损耗

    标准库中的线程池

    • 使用 Executors.newFixedThreadPool(10) 能创建出固定包含 10 个线程的线程池.
    • 返回值类型为 ExecutorService
    • 通过 ExecutorService.submit 可以注册一个任务到线程池中
    ExecutorService pool = Executors.newFixedThreadPool(10);
    pool.submit(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello");
       }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Executors 创建线程池的几种方式

    • newFixedThreadPool: 创建固定线程数的线程池
    • newCachedThreadPool: 创建线程数目动态增长的线程池.
    • newSingleThreadExecutor: 创建只包含单个线程的线程池.
    • newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer.

    Executors 本质上是 ThreadPoolExecutor 类的封装.
    ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定. (后面再介绍)

    实现线程池

    • 核心操作为 submit, 将任务加入线程池中
    • 使用 Worker 类描述一个工作线程. 使用 Runnable 描述一个任务.
    • 使用一个 BlockingQueue 组织所有的任务
    • 每个 worker 线程要做的事情: 不停的从 BlockingQueue 中取任务并执行.
    • 指定一下线程池中的最大线程数 maxWorkerCount; 当当前线程数超过这个最大值时, 就不再新增线程了.
    线程池的实现
    package threading;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    class MyThreadPool {
        private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    
        public void submit(Runnable runnable) throws InterruptedException {
            queue.put(runnable);
        }
    
        public MyThreadPool(int m) {
            // 在构造方法中, 创建出 M 个线程. 负责完成工作.
            for (int i = 0; i < m; i++) {
                Thread t = new Thread(() -> {
                    while (true) {
                        try {
                            Runnable runnable = queue.take();
                            runnable.run();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                t.start();
            }
        }
    }
    
    public class Demo28 {
        public static void main(String[] args) throws InterruptedException {
            MyThreadPool pool = new MyThreadPool(10);
            for (int i = 0; i < 1000; i++) {
                int taskId = i;
                pool.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("执行当前任务: " + taskId + " 当前线程: " + Thread.currentThread().getName());
                    }
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    线程池的使用
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Demo26 {
        public static void main(String[] args) {
            ExecutorService pool = Executors.newCachedThreadPool();
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("这是任务");
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    ★ 编写代码, 基于 Callable 实现 1+2+3+…+1000
    
    public class Thread_2547 {
        public static void main(String[] args) {
            // Thread实现
            methods_01();
            // 线程池实现
            methods_02();
        }
    
        // methods_02 用线程池的方式实现
        // 把1000拆成10组,分别用10个线程去执行,最后汇总结果
        private static void methods_02() {
            // 定义一个Callable描述任务
            class MyCallable implements Callable<Integer> {
                // 每组数据的开始值
                private int start;
                // 每组数据的结束值
                private int end;
    
                // 构造方法,传入开始与结束值,明确累加区间
                public MyCallable(int start, int end) {
                    this.start = start;
                    this.end = end;
                }
    
                // 实现call方法
                @Override
                public Integer call() throws Exception {
                    // 完成累加操作并返回结果
                    int sum = 0;
                    for (int i = start; i <= end; i++) {
                        sum += i;
                    }
                    return sum;
                }
            }
            // 将0 ~ 1000拆成果10组数据分别用10个线程去执行,最后统计结果
            int count = 10;
            // 创建线程池
            ExecutorService pool = Executors.newFixedThreadPool(5);
            // 定义一个Future数组
            Future<Integer>[] futures = new Future[count];
            // 拆分大数,并创建任务,提交到线程池
            for (int i = 0; i < count; i++) {
                int start = i * 100 + 1;
                int end = (i + 1) * 100;
                // 提交到线程池,并保存返回的Futuer
                Future<Integer> future = pool.submit(new MyCallable(start, end));
                futures[i] = future;
            }
    
            // 先定义返回的结果,初始为0
            int result = 0;
            for (int i = 0; i < futures.length; i++) {
                // 获取Future
                Future<Integer> future = futures[i];
                try {
                    // 从Future中获取每条线程执行的结果
                    int sum = future.get();
                    // 累加
                    result += sum;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            // 停止线程池
            pool.shutdown();
            System.out.println("methods_02结果为:" + result);
    
    
        }
    
        // 使用一条线程去执行任务
        public static void methods_01() {
            // 定义callable描述任务
            Callable<Integer> callable = new Callable<Integer>() {
                int sum = 0;
    
                @Override
                public Integer call() throws Exception {
                    for (int i = 1; i <= 1000; i++) {
                        sum += i;
                    }
                    return sum;
                }
            };
    
            // 创建FutureTask并绑定Callable任务
            FutureTask<Integer> futureTask = new FutureTask<>(callable);
            // FutureTask也实现了Runnable可以做为入参
            Thread t = new Thread(futureTask);
            // 启动线程
            t.start();
    
    
            try {
                // 从Future中获取结果并打印
                Integer result = futureTask.get();
                System.out.println("methods_01结果为:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    ★ 编写代码, 基于 AtomicInteger 实现多线程自增同一个变量
    
    public class Thread_2548 {
        // 定义一个原子类,默认值为0
        private static volatile AtomicInteger counter = new AtomicInteger();
    
        public static void main(String[] args) throws InterruptedException {
            // 定义两个线程,分别对counter执行5W次自增
            Thread t1 = new Thread(() -> {
                for (int i = 0; i < 50000; i++) {
                    // 调用获取并自增的方法,类似于counter++操作
                    // 原子类里要通过这个方法来完成
                    counter.getAndIncrement();
                }
            });
            Thread t2 = new Thread(() -> {
                for (int i = 0; i < 50000; i++) {
                    // 调用获取并自增的方法,类似于counter++操作
                    // 原子类里要通过这个方法来完成
                    counter.getAndIncrement();
                }
            });
    
            // 启动线程
            t1.start();
            t2.start();
            // 等待线程完成
            t1.join();
            t2.join();
            // 打印结果
            System.out.println("结果 = " + counter.get());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    ★ 编写代码, 基于 CountDownLatch 模拟跑步比赛的过程
    
    
    public class Thread_2549 {
        // 定义参赛选手数量
        private static int playerCount = 5;
        // 定义选手准备的CountDownLatch
        private static CountDownLatch standby   = new CountDownLatch(playerCount);
        // 定义起跑的CountDownLatch
        private static CountDownLatch start   = new CountDownLatch(1);
        // 定义选手到达终点的CountDownLatch
        private static CountDownLatch finished  = new CountDownLatch(playerCount);
    
        static class Runner implements Runnable {
            @Override
            public void run() {
                // 准备就绪,计数减1
                System.out.println(Thread.currentThread().getName() + " 准备就绪.");
                standby.countDown();
                try {
                    // 等待开始比赛
                    start.await();
                    System.out.println(Thread.currentThread().getName() + "出发");
                    // 模拟比赛用时
                    Random random = new Random();
                    int time = random.nextInt(10) + 10;
                    Thread.sleep(time);
                    System.out.println(Thread.currentThread().getName() + "到在终点,用时 " + time + "秒.");
                    // 计数减1
                    finished.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            // 定义选手线程并启动
            for (int i = 0; i < playerCount; i++) {
                new Thread(new Runner(), "player " + (i + 1)).start();
            }
            // 等待选手准备
            standby.await();
            // 等待参赛选手准备就绪
            System.out.println("所有参赛选手准备就绪,比赛开始...");
            // 发令枪响
            start.countDown();
            // 比赛中,等待选手到达终点
            finished.await();
            System.out.println("比赛结束.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 相关阅读:
    Vue Chrome浏览器手动调节模拟网速
    golang性能分析pprof使用
    自动驾驶寻找「商业闭环」
    强化学习问题(一)--- 输入conda activate base无法激活虚拟环境
    C++ Vector的概念和原理和构造
    利用宝塔实现百度自动推送
    【Docker】从零开始:2.Docker三要素
    基于jsp+mysql+ssm进销存管理系统-计算机毕业设计
    【五】sql 语言 -- 概览
    Zabbix邮箱报警
  • 原文地址:https://blog.csdn.net/m0_63571404/article/details/134043565