• 线程池ThreadPoolExecutor


    线程池的生命周期

     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

     ThreadPoolExecutor使用一个ctl变量代表两个信息,线程池的运行状态 (runState) 和 线程池内有效线程的数量 (workerCount),高三位表示状态。

    workerCount:线程池内有效线程的数量,工作线程允许开始,不允许停止,使用ctl的低29位表示

    runState:线程的生命周期,是有序的随时间递增,也就是状态不可逆的。

    1. private static final int COUNT_BITS = Integer.SIZE - 3;
    2. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    3. // runState is stored in the high-order bits
    4. private static final int RUNNING = -1 << COUNT_BITS;
    5. private static final int SHUTDOWN = 0 << COUNT_BITS;
    6. private static final int STOP = 1 << COUNT_BITS;
    7. private static final int TIDYING = 2 << COUNT_BITS;
    8. private static final int TERMINATED = 3 << COUNT_BITS;
    9. // Packing and unpacking ctl
    10. private static int runStateOf(int c) { return c & ~CAPACITY; }
    11. private static int workerCountOf(int c) { return c & CAPACITY; }
    12. private static int ctlOf(int rs, int wc) { return rs | wc; }
    状态描述

       状态位

    (高三位)

    RUNNING运行状态,允许提交新的任务,正常处理阻塞队列中的任务111
    SHUTDOWN关闭状态,不允许提交新的任务,但可以正常处理队列中的任务000
    STOP停止状态,不允许提交新的任务,也不处理队列中的任务,也会停止正在运行的状态001
    TIDYING整理状态,所有的任务都停止,workerCount清零,将会调用terminated()010
    TERMINATED终止状态,terminated()方法调用完成后的状态011

     

     任务调度

    execute

    1. public void execute(Runnable command) {
    2. //判断任务是否为空
    3. if (command == null)
    4. throw new NullPointerException();
    5. //获取当前工作线程数量和线程状态
    6. int c = ctl.get();
    7. //如果工作线程数小于核心线程数
    8. if (workerCountOf(c) < corePoolSize) {
    9. //增加一个核心线程去执行任务
    10. //增加成功结束
    11. if (addWorker(command, true))
    12. return;
    13. //增加失败,再次获取当前线程池的工作线程数量和线程状态
    14. c = ctl.get();
    15. }
    16. //如果当前线程池正在运行
    17. //将任务放入阻塞队列
    18. if (isRunning(c) && workQueue.offer(command)) {
    19. //插入成功★
    20. //再次获取当前线程池的工作线程数量和线程状态
    21. int recheck = ctl.get();
    22. //此时(并发情况)线程池非Running状态
    23. //将任务移除阻塞队列
    24. if (! isRunning(recheck) && remove(command))
    25. //移除成功,执行抛弃策略
    26. reject(command);
    27. //移除失败,则判断当前的工作线程数是否为0
    28. else if (workerCountOf(recheck) == 0)
    29. //为0则增加一个空工作线程,(去执行★之前插入成功的任务)
    30. addWorker(null, false);
    31. }
    32. //创建一个非核心线程去执行任务
    33. else if (!addWorker(command, false))
    34. //创建失败,执行拒绝策略
    35. reject(command);
    36. }

     submit

    submit传入任务以及其返回值类型,如果任务是Runnable的话上会将任务包装成Callable任务也就是RunnableFuture,然后再丢给execute去执行。

    executesubmit
    是否有返回值有,需要阻塞去获取
    外部对的异常处理无法处理,使用execute任务出现异常,外部无法感知可以在Future.get()中捕获异常,进行处理
    执行过程直接放到工作线程如果参数是Runnable则需要包装成Callable然后执行execute去执行
    接受任务类型RunnableRunnable、Callable

    阻塞队列

    线程池使用阻塞队列将要执行的任务与线程分开,用生产者和消费者模型实现。

    当任务数量超过核心线程数,后续任务就会放入阻塞队列中进行排队。

    因为实现了BlockingQueue所以都是线程安全的。

    Array

    Blocking

    Queue

    Linked

    Blocking

    Queue

    Priority

    Blocking

    Queue

    Delay

    Queue

    Synchronous

    Queue

    Linked

    Transfer

    Queue

    Linked

    Blocking

    Deque

    名称数组有界阻塞队列链表有界阻塞队列优先级无界阻塞队列使用优先级队列实现的无界阻塞队列不存储元素的阻塞队列链表无界阻塞队列链表双向阻塞队列
    实现的方式数组链表自定义优先级使用

    Priority

    Blocking

    Queue

    链表、tryTransfer双向链表
    队列长度自定义Integer.MAX_VALUE无界无界无界无界无界
    特点支持公平和非公平锁可以自定义compareTo()方法进行排序,不能保证同优先级元素的排序可以指定所有才能从队列中获取当前元素,延迟期满了之后才能从队列种获取元素每put操作必须等待take,否则不能put成功。支持公平和非公平锁。

    LinkedBlockingQueue多了tryTransfer,transfer方法

    头部和尾部都可以插入元素,多线程并发时们可以将所得竞争降低到一半

    任务拒绝

    当线程池的缓存队列已满,线程数也达到了maximumPoolSize 时,再新增任务就会触发拒绝策略。

    拒绝策略收到三个参数影响:corePoolSize 、maximumPoolSize 、workQueue 。

    当任务提交后,如果没达到核心线程数,则会直接创建核心线程运行,如果达到了核心线程数则会放入队列中等待,当任务数大于workQueue.size+maximumPoolSize则触发拒绝策略。

    名称AbortPolicy CallerRunsPolicy DiscardPolicy DiscardOldestPolicy 
    策略抛出异常,终止任务使用调用线程执行任务,当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务直接丢弃丢弃workQueue队列最老任务,添加新任务
    场景需要捕获异常进行处理适合并发小,性能要求不高且不允许失败的情况。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大允许任务不执行允许任务不执行,且希望执行最新的任务
    JDK是否默认使用

    常用线程池

    newCachedThreadPool

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue());

    核心参数

    核心线程数:0

    最大线程数:Integer.MAX_VALUE

    线程存活时间:60s

    阻塞队列:SynchronousQueue(同步队列)

    分析:因为没有核心线程,所以任务进来都会放入同步队列中,然后创建工作线程去执行,由于没有核心线程,最大线程数又是一个很大的值,基本意味着线程数量无限大,线程池中只要没有空闲线程就会创建新的去执行任务。线程池的线程一旦超过60s就会被销毁。所以当没有任务的时候,几乎不会占用什么资源。

    场景:适合任务量大但是时间很短的场景

    newFixedThreadPool

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue());

    核心参数

    核心线程数=最大线程数

    最大线程数=核心线程数

    线程存活时间:0

    阻塞队列:LinkedBlockingQueue(链表阻塞队列)

    分析:由于核心线程数=最大线程数,所以线程池无空闲核心线程则会进入阻塞队列,并且线程池中的线程都是核心线程也不需要销毁。同时使用了LinkedBlockingQueue,队列长度Integer.MAX_VALUE。

    场景:适合任务量比较固定但耗时长的任务

    newSingleThreadPool

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue()));
    6. }

    核心参数

    核心线程数=1

    最大线程数=1

    线程存活时间:0

    阻塞队列:LinkedBlockingQueue(链表阻塞队列)

    分析:只有一个线程的线程池,当唯一的线程发生异常会重启一个线程代替执行,也就是约等于以单线程的形式进行工作。

    场景:适合需要顺序执行任务的场景

    newScheduledThreadPool

    1. public ScheduledThreadPoolExecutor(int corePoolSize) {
    2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    3. new DelayedWorkQueue());
    4. }

    核心参数

    核心线程数=可指定

    最大线程数=Integer.MAX_VALUE

    线程存活时间:0

    阻塞队列:DelayedWorkQueue(优先级工作队列)

    分析:核心线程数是固定的,但非核心是基本没有限制的,因为空闲时间为0,所以一旦非核心线程执行完就会被回收。由于使用的是DelayedWorkQueue,任务会按照顺序执行。

    场景:适合执行定时任务和固定周期的重复任务

    newWorkStealingPool

    1. public static ExecutorService newWorkStealingPool() {
    2. return new ForkJoinPool
    3. (Runtime.getRuntime().availableProcessors(),
    4. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    5. null, true);
    6. }

    核心参数:无

    分析:Java8新增的创建线程方式创建时不设置任何参数的话,就会以当前机器的cpu个数为线程个数,此线程池会并行处理任务,不能保证执行顺序。可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。

    场景:需要合理使用cpu资源的场景,且任务耗时很大且无需顺序执行的场景。

    ThreadPoolExecutor

    ThreadPoolExecutor的核心池线程的数量该如何设置?

    CPU密集型:cpu core+1

    IO密集型:2cpu core

    cpu core:Runtime.getRuntime().availableProcessors()

  • 相关阅读:
    2023-简单点-开启防火墙后,ping显示请求超时;windows共享盘挂在不上
    【LeetCode】二叉树OJ
    适合在虚拟化环境中部署 Kubernetes 的三个场景
    【Python脚本进阶】2.1、端口扫描器(下):NMAP端口扫描
    【Android高级UI】PorterDuffMode颜色混合公式
    在服务中无法正常启动Nacos
    贪心算法之——阶乘之和(nyoj91)
    Java编程实践:使用面向对象编程(OOP)概念构建简单的国际象棋游戏
    600. 不含连续1的非负整数 动态规划
    Golang开发--select
  • 原文地址:https://blog.csdn.net/fox_233/article/details/128113876