• 【异步与线程池】串讲&优化详情页加载


    💫你好,我是小航,一个正在变秃、变强的准大三党
    💫本文主要讲解异步与线程池,示例Demo采用Java语言演示
    💫欢迎大家的关注!

    概念

    1.初始化线程的4种方式

    1)、继承Thread
    2)、实现 Runnable接口
    3)、实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
    4)、线程池
    
    区别:
    	12不能得到返回值。3可以获取返回值
    	123都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】)
    	4可以控制资源,性能稳定,不会一下子所有线程一起运行
    
    结论:
    	实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.创建线程池的方式

    创建固定线程数的线程池ExecutorService

    固定线程数的线程池
    Executors.newFixedThreadPool(10);
    
    • 1
    • 2

    execute和submit区别

    作用:都是提交异步任务的
    
    execute:只能提交Runnable任务,没有返回值
    submit:可以提交Runnable、Callable,返回值是FutureTask
    
    • 1
    • 2
    • 3
    • 4

    创建原生线程池ThreadPoolExecutor

    new ThreadPoolExecutor(5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    
    /**
             * 七大参数:
             * corePoolSize:核心线程数【一直存在,除非(allowCoreThreadTimeOut)】;线程池创建好以后就准备就绪的线程数量,就等待来接受异步任务去执行
             *         5个 Thread thread = new Thread(); thread.start();
             * maximumPoolSize:最大线程数量;控制资源
             * keepAliveTime:存活时间。如果当前的线程数量大于核心数量。释放空闲的线程(maximumPoolSize - corePoolSize)。只要线程空闲大于制定的keepAliveTime;
             * unit:时间单位
             * BlockingQueue workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲,就会去队列里面取出新的任务
             * threadFactory:线程的创建工厂。
             * RejectedExecutionHandler handler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务
             * 工作顺序:
             *      1.线程池创建,准备好core数量的核心线程,准备接受任务
             *      2.core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务执行
             *      3.阻塞队列满了,就直接开新线程执行,最大能开到max指定的数量
             *      4.max满了就用RejectedExecutionHandler拒绝任务
             *      5.max都执行完成,有很多空闲,在指定的时间keepAliveTime之后,释放max-core这些线程
             *
             *  面试题:一个线程池:core-7,max-20,queue-50,  100并发进入怎么分配
             *  7个会立即得到,50个会进去队列,再开13个进行执行,剩余30个使用拒绝策略
             */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    拒绝策略

    DiscardOldestPolicy:丢弃最老的任务
    AbortPolicy:丢弃当前任务,抛出异常【默认策略】
    CallerRunsPolicy:同步执行run方法
    DiscardPolicy:丢弃当前任务,不抛出异常
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    阻塞队列

    new LinkedBlockingDeque<>();
    // 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小

    在这里插入图片描述

    线程池

    1.常见的4种默认线程池

    注意:
        回收线程 = maximumPoolSize - corePoolSize
    
    可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
    定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize
    周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行)
    单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程)
      
    Executors.newCachedThreadPool();
    Executors.newFixedThreadPool(10);
    Executors.newScheduledThreadPool(10);
    Executors.newSingleThreadExecutor();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.为什么使用线程池?

    1. 降低资源的消耗【减少创建销毁操作】
      通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
      高并发状态下过多创建线程可能将资源耗尽
    2. 提高响应速度【控制线程个数】
      因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
    3. 提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】
      线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

    异步编排CompletableFuture

    1.概念:

    • 多个任务可以交给线程池来异步执行,但是多个待执行的任务之间部分有逻辑先后的顺序,因此需要异步编排
    • CompletableFuture< T > 实现 Future< T > 接口,可以通过他完成异步编排,类似前端的Promise,可以一直then(…)
    • FutureJava 5 添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel 方法停止任务的执行。
    • 虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
    • 很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自己扩展了 Java 的 Future接口,提供了addListener等多个扩展方法;Google guava 也提供了通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。
      作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?
    • Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
    • CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
      CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果
    • 使用Callable < T > + FutureTask< T > 创建线程时,后者也是Future< T > 的子实现接口

    在这里插入图片描述

    2.使用:

    异步操作:

    • CompletableFuture 提供了四个静态方法来创建一个异步操作:
      在这里插入图片描述
    • runXXX都是没有返回结果的,supplyXXX可以获取返回结果
    • 可以传入自定义线程池,否则使用默认线程池

    示例Demo:

    // runAsync
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    		System.out.println("main...start...");
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getId());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
            }, executor);
            System.out.println("main...end...");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1.whenComplete完成时回调

    可以在异步任务的下面写业务代码,也可以放到成功回调里面,因为 CompletableFuture.runAsync()和CompletableFuture.supplyAsync() 执行完之后还会返回completableFuture接着调方法
    在这里插入图片描述

    • whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。

    • whenComplete 和 whenCompleteAsync 的区别:
      – whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
      – whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
      – 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

    举栗:

    在这里插入图片描述
    注意前面的只能感知异常,如果无异常不能对原结果在进行处理
    在这里插入图片描述

    线程串化

    thenApply线程串化方法

    概念

    • 简单理解就是将线程顺序串起来,一个线程可能依赖另外一个线程的结果

    • 当前线程执行完接着可以执行另外一个任务,当前线程的结果可被消费,消费之后结果可以接着向下传递
      在这里插入图片描述

    • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

    • thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

    • thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后续操作。

    • 带有 Async 默认是异步执行的。同之前。

    以上都要前置任务成功完成。Function

    • T:上一个任务返回结果的类型
    • U:当前任务的返回值类型

    任务组合

    1.两任务组合-都要完成

    在这里插入图片描述
    在这里插入图片描述
    runAfterBothAsync

    // 5.6.1.二者都要完成,组合【不获取前两个任务返回值,且自己无返回值】
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1执行");
        return 10 / 2;
    }, executor);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2执行");
        return "hello";
    }, executor);
    CompletableFuture<Void> future03 = future01.runAfterBothAsync(future02, () -> {
        System.out.println("任务3执行");
    }, executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    thenAcceptBothAsync

    // 5.6.2.二者都要完成,组合【获取前两个任务返回值,自己无返回值】
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1执行");
        return 10 / 2;
    }, executor);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2执行");
        return "hello";
    }, executor);
    CompletableFuture<Void> future03 = future01.thenAcceptBothAsync(future02,
            (result1, result2) -> {
                System.out.println("任务3执行");
                System.out.println("任务1返回值:" + result1);
                System.out.println("任务2返回值:" + result2);
            }, executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    thenCombineAsync

    // 5.6.3.二者都要完成,组合【获取前两个任务返回值,自己有返回值】
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1执行");
        return 10 / 2;
    }, executor);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2执行");
        return "hello";
    }, executor);
    CompletableFuture<String> future03 = future01.thenCombineAsync(future02,
            (result1, result2) -> {
                System.out.println("任务3执行");
                System.out.println("任务1返回值:" + result1);
                System.out.println("任务2返回值:" + result2);
                return "任务3返回值";
            }, executor);
    System.out.println(future03.get());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    2.两任务组合-任一完成

    在这里插入图片描述
    在这里插入图片描述
    runAfterEitherAsync

    // 不获取前任务返回值,且当前任务无返回值
    
    • 1

    acceptEitherAsync

    // 获取前任务返回值,但当前任务无返回值
    
    • 1

    applyToEitherAsync

    // 获取前任务返回值,当前任务有返回值
    
    • 1

    3.多任务组合

    allOf

    // 等待所有任务完成
    CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
    allOf.get();// 阻塞等待所有任务完成
    
    • 1
    • 2
    • 3

    anyOf

    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);
    anyOf.get();// 阻塞等待任一任务完成,返回值是执行成功的任务返回值
    
    • 1
    • 2

    以上涉及到的示例代码:

    package com.example.demo.thread;
    
    import java.util.concurrent.*;
    
    /**
     * @author xh
     * @Date 2022/8/7
     */
    public class ThreadTest {
        public static ExecutorService executor = Executors.newFixedThreadPool(10);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main...start...");
    //        CompletableFuture future = CompletableFuture.runAsync(() -> {
    //            System.out.println("当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 2;
    //            System.out.println("运行结果:" + i);
    //        }, executor);
            /**
             * 方法完成后的感知
             */
    //        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    //            System.out.println("当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 2;
    //            System.out.println("运行结果:" + i);
    //            return i;
    //        }, executor).whenComplete((res, exception) -> {
    //            // 虽然能得到异常信息,但是没法修改返回数据。
    //            System.out.println("异步任务成功完成了...结果是:" + res + ";异常是:" + exception);
    //        }).exceptionally(throwable -> {
    //            // 可以感知异常,同时返回默认值
    //            return 10;
    //        });
    
            /**
             * 方法执行完成后的处理
             */
    //        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    //            System.out.println("当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 5;
    //            System.out.println("运行结果:" + i);
    //            return i;
    //        }, executor).handle((res, thr) -> {
    //            if(res != null) {
    //                return res * 2;
    //            }
    //            if(thr != null) {
    //                return 0;
    //            }
    //            return 0;
    //        });
    
            /**
             * 线程串行化
             * 1.thenRun:不能获取上一步的执行结果,无返回值
             * 2.thenAcceptAsync:能接受上一步结果,但是无返回值
             * 3.
             */
    //        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    //            System.out.println("当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 5;
    //            System.out.println("运行结果:" + i);
    //            return i;
    //        }, executor).thenRunAsync(() -> {
    //            System.out.println("任务二启动了...");
    //        }, executor);
    //        CompletableFuture.supplyAsync(() -> {
    //            System.out.println("当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 5;
    //            System.out.println("运行结果:" + i);
    //            return i;
    //        }, executor).thenAcceptAsync(res -> {
    //            System.out.println("任务二启动了..." + res);
    //        });
    //        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    //            System.out.println("当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 5;
    //            System.out.println("运行结果:" + i);
    //            return i;
    //        }, executor).thenApplyAsync((res) -> {
    //            System.out.println("任务二启动了..." + res);
    //            return "Hello" + res;
    //        }, executor);
    //        // R apply(T t)
    //        // future.get()是阻塞方法
    //        System.out.println("main...end..." + future.get());
    
            /**
             * 两个都完成
             */
    //        CompletableFuture future01 = CompletableFuture.supplyAsync(() -> {
    //            System.out.println("任务1-当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 5;
    //            System.out.println("任务1-运行结果:" + i);
    //            return i;
    //        }, executor);
    //        CompletableFuture future02 = CompletableFuture.supplyAsync(() -> {
    //            System.out.println("任务2-当前线程:" + Thread.currentThread().getId());
    //            int i = 10 / 5;
    //            try {
    //                Thread.sleep(3000);
    //                System.out.println("任务2-运行结果:" + i);
    //            } catch (InterruptedException e) {
    //                e.printStackTrace();
    //            }
    //            return "hello";
    //        }, executor);
    //        future01.runAfterBothAsync(future02, () -> {
    //            System.out.println("任务3开始");
    //        }, executor);
            // void accept(T t, U u);
    //        future01.thenAcceptBothAsync(future02, (f1, f2) -> {
    //            System.out.println("任务3开始...之前的结果:" + f1 + "--->" + f2);
    //        }, executor);
            // R apply(T t, U u);
    //        CompletableFuture future = future01.thenCombineAsync(future02, (f1, f2) -> {
    //            return f1 + ": " + f2 + "-> Haha";
    //        }, executor);
    //        System.out.println("main...end..." + future.get());
    
            /**
             * 两个任务只要有一个完成,我们就执行任务3
             * runAfterEitherAsync:不感知结果,自己也无返回值
             * acceptEitherAsync: 感知结果,没有返回值,注意类型要一致,这里我们把String,Integer都换成了Object
             */
    //        future01.runAfterEitherAsync(future02, () -> {
    //            System.out.println("任务3开始...");
    //        }, executor);
            // void accept(T t);
    //        future01.acceptEitherAsync(future02, (res) -> {
    //            System.out.println("任务3开始..., 结果是:" + res);
    //        }, executor);
            // R apply(T t);
    //        CompletableFuture future = future01.applyToEitherAsync(future02, res -> {
    //            System.out.println("任务3开始...之前的结果是:" + res);
    //            return res.toString() + "-> hello";
    //        }, executor);
    //        System.out.println("main...start..." + future.get());
    
            CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("查询商品的图片信息");
                return "hello.jpg";
            }, executor);
            CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println("查询商品的属性信息");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "黑色.jpg";
            }, executor);
            CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
                System.out.println("查询商品的介绍信息");
                return "华为";
            }, executor);
            CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
            allOf.get(); // 等待所有结果完成
            System.out.println("main...end..." + future01.get() + future02.get() + future03.get());
        }
    
    
        /**
         * JUC java.util.concurrent
         * 

    * 1.继承Thread * 2.实现Runnable接口 * 3.实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常) jdk1.5以后添加 * 4.线程池 给线程池直接提交任务。 * service.execute(new Runable01()); * 1、创建: * 1)Executors *

    * Future:可以获取到异步结果 *

    * 区别:1、2不能得到返回值 3可以获取返回值 * 1、2、3都不能控制资源 * 4可以控制资源,性能稳定。 */ // public static ExecutorService service = Executors.newFixedThreadPool(10); public void thread(String[] args) throws Exception { System.out.println("main...start..."); // // 1.继承Thread // Thread01 thread = new Thread01(); // thread.start(); // 2.实现Runnable接口 // Runable01 runable01 = new Runable01(); // new Thread(runable01).start(); // 3.实现Callable接口 + FutureTask // FutureTask futureTask = new FutureTask<>(new Callable01()); // new Thread(futureTask).start(); // // 阻塞等待整个线程执行完成,获取返回结果 // Integer integer = futureTask.get(); // System.out.println("main...end..." + integer); // 我们以后在业务代码里面,以上三种启动线程的方式都不用,应该将所有的多线程异步任务都交给线程池执行,达到资源控制 // 4.线程池 // 保证当前系统中池只有一两个,每个异步任务,提交给线程池让他自己去执行。 // ExecutorService service = Executors.newFixedThreadPool(10); // service.execute(new Runable01()); /** * 七大参数: * corePoolSize:核心线程数【一直存在,除非(allowCoreThreadTimeOut)】;线程池创建好以后就准备就绪的线程数量,就等待来接受异步任务去执行 * 5个 Thread thread = new Thread(); thread.start(); * maximumPoolSize:最大线程数量;控制资源 * keepAliveTime:存活时间。如果当前的线程数量大于核心数量。释放空闲的线程(maximumPoolSize - corePoolSize)。只要线程空闲大于制定的keepAliveTime; * unit:时间单位 * BlockingQueue workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲,就会去队列里面取出新的任务 * threadFactory:线程的创建工厂。 * RejectedExecutionHandler handler:如果队列满了,按照我们指定的拒绝策略拒绝执行任务 * 工作顺序: * 1.线程池创建,准备好core数量的核心线程,准备接受任务 * 2.core满了,就将再进来的任务放入阻塞队列中。空闲的core就会自己去阻塞队列获取任务执行 * 3.阻塞队列满了,就直接开新线程执行,最大能开到max指定的数量 * 4.max满了就用RejectedExecutionHandler拒绝任务 * 5.max都执行完成,有很多空闲,在指定的时间keepAliveTime之后,释放max-core这些线程 * * 面试题:一个线程池:core-7,max-20,queue-50, 100并发进入怎么分配 * 7个会立即得到,50个会进去队列,再开13个进行执行,剩余30个使用拒绝策略 */ // ThreadPoolExecutor executor = new ThreadPoolExecutor(); System.out.println("main...end..."); } // 1.继承Thread public static class Thread01 extends Thread { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } } // 2.实现Runnable接口 public static class Runable01 implements Runnable { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); } } // 3.实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常) jdk1.5以后添加 public static class Callable01 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257

    项目整合异步编排

    业务场景:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    1.注入线程池

    gulimall:
      thread:
        core-size: 20
        max-size: 200
        keep-alive-time: 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    @ConfigurationProperties(prefix = "gulimall.thread")
    @Data
    public class ThreadPoolConfigProperties {
        private Integer coreSize;
        private Integer maxSize;
        private Integer keepAliveTime;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    @EnableConfigurationProperties(ThreadPoolConfigProperties.class)
    @Configuration
    public class MyThreadConfig {
        @Bean
        public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
            return new ThreadPoolExecutor(
                    pool.getCoreSize(),
                    pool.getMaxSize(),
                    pool.getKeepAliveTime(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(100000),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.实际业务使用异步编排

    @Service("skuInfoService")
    public class SkuInfoServiceImpl extends ServiceImpl<SkuInfoDao, SkuInfoEntity> implements SkuInfoService {
    
        @Autowired
        SkuImagesService skuImagesService;
        @Autowired
        SkuSaleAttrValueService skuSaleAttrValueService;
        @Autowired
        CouponAgentService couponAgentService;
        @Autowired
        SpuInfoDescService spuInfoDescService;
        @Autowired
        AttrGroupServiceImpl attrGroupService;
        @Autowired
        ThreadPoolExecutor executor;
    
        /**
         * 查询skuId商品信息,封装VO返回
         */
        @Override
        public SkuItemVO item(Long skuId) throws ExecutionException, InterruptedException {
            SkuItemVO result = new SkuItemVO();
    
            CompletableFuture<SkuInfoEntity> skuInfoFuture = CompletableFuture.supplyAsync(() -> {
                // 1.获取sku基本信息(pms_sku_info)【默认图片、标题、副标题、价格】
                SkuInfoEntity skuInfo = getById(skuId);
                result.setInfo(skuInfo);
                return skuInfo;
            }, executor);
    
            CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> {
                // 2.获取sku图片信息(pms_sku_images)
                List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
                result.setImages(images);
            }, executor);
    
            CompletableFuture<Void> saleAttrFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> {
                // 3.获取当前sku所属spu下的所有销售属性组合(pms_sku_info、pms_sku_sale_attr_value)
                List<SkuItemSaleAttrVO> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
                result.setSaleAttr(saleAttr);
            }, executor);
    
            CompletableFuture<Void> descFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> {
                // 4.获取spu商品介绍(pms_spu_info_desc)【描述图片】
                SpuInfoDescEntity desc = spuInfoDescService.getById(skuInfo.getSpuId());
                result.setDesc(desc);
            }, executor);
    
            CompletableFuture<Void> groupAttrsFuture = skuInfoFuture.thenAcceptAsync((skuInfo) -> {
                // 5.获取spu规格参数信息(pms_product_attr_value、pms_attr_attrgroup_relation、pms_attr_group)
                List<SpuItemAttrGroupVO> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(), skuInfo.getCatalogId());
                result.setGroupAttrs(groupAttrs);
            }, executor);
    
            // 6.等待所有任务都完成
            CompletableFuture.allOf(imagesFuture, saleAttrFuture, descFuture, groupAttrsFuture).get();
    
            return result;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
  • 相关阅读:
    java计算机毕业设计智慧医疗医患交流系统设计MyBatis+系统+LW文档+源码+调试部署
    【MySQL】多表查询的分类2:自连接和非自连接
    Phoenix Digital网络模块——将新的PLC连接到传统远程I/O
    博客摘录「 MPLS/LDP原理介绍+报文分析+配置示例」2023年9月26日
    ES学习看这一篇文章就够了
    基于 Vagrant 手动部署多个 Redis Server
    【预测模型-SVM预测】基于粒子群算法结合支持向量机SVM实现Covid-19风险预测附matlab代码
    自己动手写编译器:实现命令行模块
    JavaScript系列:JS实现复制粘贴文字以及图片
    Redis(三) Redis的java客户端
  • 原文地址:https://blog.csdn.net/m0_51517236/article/details/126252820