• 多线程之ThreadPoolExecutor相关


    根据ThreadPoolExecutor的构造方法,JDK提供了很多工厂方法来创建各种用途的线程池.

    1 newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
     0L, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    说明:

    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • 阻塞队列是无界的,可以放任意数量的任务(最大为Integer.MAX_VALUE)

    适用于 任务量一已知,相对耗时的任务

    2 newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
     60L, TimeUnit.SECONDS,
     new SynchronousQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    说明:

    • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s
      • 全部都是救急线程(60s 后可以回收)
      • 救急线程可以无限创建(最大是Integer.MAX_VALUE)
    • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交 货)

    如下案例:

    SynchronousQueue<Integer> integers = new SynchronousQueue<>();
    new Thread(() -> {
         try {
             log.debug("putting {} ", 1);
             integers.put(1);
             log.debug("{} putted...", 1);
             log.debug("putting...{} ", 2);
             integers.put(2);
             log.debug("{} putted...", 2);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
    },"t1").start();
    sleep(1);
    new Thread(() -> {
         try {
             log.debug("taking {}", 1);
             integers.take();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
    },"t2").start();
    sleep(1);
    new Thread(() -> {
         try {
             log.debug("taking {}", 2);
             integers.take();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
    },"t3").start();
    /*
    运行结果:
    11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
    11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 
    11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 
    11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
    11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
    11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted... 
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。

    适用于 任务数比较密集,但每个任务执行时间较短的情况

    3 newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
     (new ThreadPoolExecutor(1, 1,
     0L, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放.

    与其他线程区别:

    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作
    • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
      • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法.
    • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
      • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

    4 提交任务

    // 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
    
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException;
    
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;
    
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    上述都是提供的提交任务的方法,根据不同的业务场景需求,选择对应的提交方法.

    5 关闭线程池

    shutdown

    /*
    线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完
    - 此方法不会阻塞调用线程的执行
    */
    void shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    public void shutdown() {
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
         checkShutdownAccess();
         // 修改线程池状态
         advanceRunState(SHUTDOWN);
         // 仅会打断空闲线程
         interruptIdleWorkers();
         onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
         } finally {
         mainLock.unlock();
         }
         // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
         tryTerminate();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    shutdownNow

    /*
    线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
    */
    List<Runnable> shutdownNow();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    public List<Runnable> shutdownNow() {
         List<Runnable> tasks;
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
         checkShutdownAccess();
         // 修改线程池状态
         advanceRunState(STOP);
         // 打断所有线程
         interruptWorkers();
         // 获取队列中剩余任务
         tasks = drainQueue();
         } finally {
         mainLock.unlock();
         }
         // 尝试终结
         tryTerminate();
         return tasks;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    其他打断方法

    // 不在 RUNNING 状态的线程池,此方法就返回 true
    boolean isShutdown();
    // 线程池状态是否是 TERMINATED
    boolean isTerminated();
    // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
    情,可以利用此方法等待
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    vue3.0 如何自定义指令
    【Unity俯视角射击】我们来做一个《元气骑士》的完整Demo1
    活字格性能优化技巧(1)——如何利用数据库主键提升访问性能
    Go构建模式:GOPATH、vendor、Go Module
    平行哲学与智能技术:平行产业与智慧社会的对偶方程与测试基础
    AI+Science系列(二):国内首个基于AI框架的CFD工具组件!赛桨v1.0 Beta API介绍以及典型案例分享!
    动态树的第2大值
    【Mybatis】基于Mybatis插件+注解,实现敏感数据自动加解密
    springboot shiro vue使用websocket案例
    iOS开发:对于动态库共享缓存(dyld)的了解
  • 原文地址:https://blog.csdn.net/ABestRookie/article/details/126292778