• JAVA多线程进阶篇-探索线程池ThreadPoolExecutor源码


    1.概述

    JAVA线程池是一种基于池化思想的线程管理工具,它能够控制线程运行的数量。在线程运行中,线程过多会带来额外的开销,其中包括创建销毁线程的开销,操作系统调度线程的开销等,若频繁创建大量线程、销毁线程,会极大地降低计算机地整体性能。线程池技术能够维护指定数量线程,且根据线程数量安排执行数量。这种做法的优势是降低创建和销毁线程的代价,另一方面避免同一时间创建大量线程导致计算机资源消耗,程序运行异常。本文将分析JAVA线程池的使用及底层实现原理,帮助大家更好地理解该技术。

    2.线程池使用及原理

    2.1 线程池使用

    2.1.1 为什么要使用线程池

    若一个程序中并发的线程数量较大,并且每个线程都是执行一个较短的任务就结束了,频繁创建线程会严重影响系统的效率,因为频繁创建和销毁线程需要时间,此时就需要线程池来提高效率。

    2.1.2 线程池的概念

    核心原理其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源。

    2.1.3 线程池的优势

    1.降低资源消耗。减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务;
    2.提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行;
    3.提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

    2.1.4 线程池的使用

    Java里面线程池的顶级接口是 Executor,不过真正的线程池接口是 ExecutorService, ExecutorService 的默认实现是 ThreadPoolExecutor;普通类 Executors 里面调用的就是 ThreadPoolExecutor。具体类图如下所示:
    在这里插入图片描述
    上述类图说明:

    接口或类解释
    Executor接口定义了一个接收Runnable对象的方法executor
    ExecutorService接口继承了Executor接口,增加了获取返回结果的submit方法
    AbstractExecutorService抽象类AbstractExecutorService类实现了ExecutorService接口,默认实现了一些方法
    ScheduledExecutorService接口一个可定时调度任务的接口
    ScheduledThreadPoolExecutor类一个可定时调度任务的线程池
    ThreadPoolExecutor类用于创建线程池的类
    ForkJoinPool类是一种拆分任务,合并计算结果的线程池

    2.1.5 Executors 提供四种线程池

    1.newCachedThreadPool
    newCachedThreadPool 是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。
    创建方式如下:

     ExecutorService executorService = Executors.newCachedThreadPool();
    
    • 1

    2.newSingleThreadExecutor
    创建一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
    创建方式如下:

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    • 1

    3.newFixedThreadPool
    newFixedThreadPool 创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
    创建方式如下:

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    
    • 1

    4.newScheduledThreadPool
    newScheduledThreadPool 创建一个队列长度为Integer.MAX_VALUE大小的线程池,此线程池支持定时以及周期性执行任务的需求。
    创建方式如下:

    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
    
    • 1

    2.2 ThreadPoolExecutor使用及原理

    在《阿里巴巴开发者手册》中,关于线程池技术的使用有如下约束:
    在这里插入图片描述
    其实通过Executors创建的四种线程池,底层都是调用ThreadPoolExecutor类来进行创建的,源码如下:

    public class Executors {
    
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    因此本文主要探索线程池创建类ThreadPoolExecutor的使用及原理。

    2.2.1 ThreadPoolExecutor构造方法

    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handle
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    由源码可知,ThreadPoolExecutor共有四个构造方法,共有7个核心参数,具体核心参数解释如下:

    参数解释
    corePoolSize核心线程数,线程池中常驻线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程
    maximumPoolSize最大线程数,核心线程满的基础上增加额外非核心参数,需要注意的是只有workQueue队列满时才会创建最多(maximumPoolSize-corePoolSize)个非核心线程
    keepAliveTime非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉,当corePoolSize=maxPoolSize时此参数就不起作用了,此时不存在非核心线程
    unitkeepAliveTime的时间单位,单位包括:TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟)、TimeUnit.MILLISECONDS(微秒)等
    workQueue阻塞队列,当线程池中的任务数大于corePoolSize时,新进来的任务就会被放到队列中,常用的三种队列包括:ArrayBlockingQueue(基于数组的先进先出队列,此队列创建时必须指定大小) 、LinkedBlockingQueue(基于链表的先进先出队列,如果创建时没有指定队列大小,则使用默认值Integer.MAX_VALUE) 、SynchronousQueue(此队列不会保存提交的任务,而是直接新建一个线程执行新来的任务)
    threadFactory线程工厂,用来创建线程,默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建
    handler拒绝策略,当线程任务大于最大线程数时,会采用拒绝策略。默认拒绝策略有如下四种:AbortPolicy(丢弃任务并抛出RejectedExecutionException异常)、DiscardPolicy(丢弃任务,但是不抛出异常) 、DiscardOldestPolicy(丢弃队列最前面的任务,然后重新尝试执行任务,不断重复此过程)、CallerRunsPolicy(由调用线程处理该任务)

    2.2.2 线程池创建线程流程图

    在这里插入图片描述
    若有一个线程池,核心线程corePoolSize=10,maximumPoolSize=50,workQueue=100,当任务过来会先创建10个核心线程,若任务不断增多,超过核心线程最大值时,会放入阻塞队列workQueue中,若任务量还在增多,超过阻塞队列最大值(100),就会创建非核心线程来执行任务(最多40个),此时如果还有任务来就会按照拒绝策略拒绝任务。

    2.2.3 ThreadPoolExecutor核心属性

        //ctl是一个线程安全的int类型数值,它有两个意思:1.声明当前线程池的状态 2.声明线程池中的线程数 
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //COUNT_BITS的值固定为29,因为Integer类型占4个字节,共32位,Integer.SIZE就为32
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //将1往前位移29位,得到值00100000 00000000 00000000 00000000,再减去1,得到00011111 11111111 11111111 11111111,因此COUNT_MASK值为3777777777
        private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    
        //-1左移29位,得到11100000 00000000 00000000 00000000,也就是111表示线程为Running状态,表示线程池正常接收任务
        private static final int RUNNING    = -1 << COUNT_BITS;
        //0左移29位,得到00000000 00000000 00000000 00000000,也就是000表示线程池为SHUTDOWN状态,不接收新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        //1左移29位,得到00100000 00000000 00000000 00000000,也就是001表示线程池状态为STOP,不接受新任务,也不去处理阻塞团队中的任务,同时会中断正在执行的任务
        private static final int STOP       =  1 << COUNT_BITS;
        //2左移29位,得到01000000 00000000 00000000 00000000,也就是010表示线程池状态为TIDYING,代表当前线程池即将终止
        private static final int TIDYING    =  2 << COUNT_BITS;
        //3左移29位,得到01100000 00000000 00000000 00000000,也就是011表示线程池状态为TERMINATED,代表线程池为TERMINATED,
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        //得到线程池的状态
        private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
        //得到当前线程池的线程数量(正在工作的线程)
        private static int workerCountOf(int c)  { return c & COUNT_MASK; }
        //把运行状态和线程数量打包成一个整数
        private static int ctlOf(int rs, int wc) { return rs | wc; }
        //任务缓存队列,用来存放等待执行的任务
    	private final BlockingQueue<Runnable> workQueue;
    	//锁,线程池的状态改变(线程池大小、运行状态等)的改变都需要使用这个锁
    	private final ReentrantLock mainLock = new ReentrantLock();
    	//用来存放工作集
        private final HashSet<Worker> workers = new HashSet<Worker>();
        //用来记录线程池中曾经出现过的最大线程数 
        private int largestPoolSize;
        //用来记录已经执行完毕的任务个数 
        private long completedTaskCount;
        //线程工厂,用来创建线程
        private volatile ThreadFactory threadFactory;
        //任务拒绝策略
        private volatile RejectedExecutionHandler handler;
        //线程存活时间
        private volatile long keepAliveTime;
        //是否允许为核心线程设置存活时间
        private volatile boolean allowCoreThreadTimeOut;
        //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
        private volatile int corePoolSize;
        //线程池最大能容忍的线程数
        private volatile int maximumPoolSize;
        //线程池中当前的线程数  
        private volatile int poolSize;
       
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    2.2.4 线程池状态变化

    在这里插入图片描述
    1.RUNNING:当创建线程池后,初始化完成后,线程池处于RUNNING状态;
    2.SHUTDOWN:调用shutdown()方法,线程池会处于SHUTDOWN状态,此时线程池不能够接收新任务,可以处理队列中的任务,等待所有任务执行完成;
    3.STOP:调用shutdownNow()方法,线程池处于停止状态,此时线程池不能接收新的任务,同时尝试中断正在工作的线程;
    4.TIDYING:过渡状态,此时线程池中所有任务已终止,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态;
    5.TERMINATED:当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

    2.2.5 核心方法分析

    1.execute()

     public void execute(Runnable command) {
     		//若Runnable为空,直接抛出异常
            if (command == null)
                throw new NullPointerException();
            //获取32位的ctl的值,高三位是线程状态
            int c = ctl.get();
            //判断工作线程数是否小于核心线程数
            if (workerCountOf(c) < corePoolSize) {
                //是否创建核心线程
                if (addWorker(command, true))
                //核心线程创建成功并添加任务成功
                    return;
                //执行此语句,表明创建核心线程数失败,重新获取ctl的值
                c = ctl.get();
            }
            //此处是因为创建核心线程失败
            //判断线程池是running状态,将任务添加到阻塞队列中,
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //再次判断是否是Running状态,如果不是Running,移除任务
                if (! isRunning(recheck) && remove(command))
                	//拒绝策略
                    reject(command);
                 //如果线程池工作线程数量为0,则新建一个空任务的线程
                else if (workerCountOf(recheck) == 0)
                    //如果线程池非running状态,无法添加任务
                    //阻塞队列有任务,但是没有工作现场,添加一个任务为空的工作线程处理阻塞队列中的任务
                    addWorker(null, false);
            }
            //线程池不是Running状态或入队失败,尝试扩容maxPoolSize后再次
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2.addWorker(Runnable firstTask, boolean core)

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:  //循环跳出点
            for (;;) {
                //获取ctl
                int c = ctl.get();
                //获取线程池状态
                int rs = runStateOf(c);
    
                // 除了RUNNING状态,其它状态均有可能
                //rs == SHUTDOWN,如果不是SHUTDOWN ,就代表是STOP或者更高的状态,这时不需要添加任务处理
                //如果任务为空,并且线程池状态不是Running,不需要处理
                //如果阻塞队列为空,则返回false,外侧!再次取反,该条件为true,不做处理
                if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //获取工作线程个数
                    int wc = workerCountOf(c);
                    //如果工作线程数大于线程池最大容量,不再创建
                    if (wc >= CAPACITY ||
                    //判断wc是否超过核心线程或者最大线程
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        //构建工作线程失败
                        return false;
                        //采用CAS将工作线程数+1
                    if (compareAndIncrementWorkerCount(c))
                    //成功,跳出外侧循环
                        break retry;
                    //并发操作导致失败,重新获取ctl
                    c = ctl.get();  // Re-read ctl
                    //重新判断线程池状态
                    if (runStateOf(c) != rs)
                    //如果线程池状态发生变化,开启下一次循环,如果状态没变化,重新执行内存循环for(;;)
                        continue retry;
                }
            }
    
            //worker开始为false
            boolean workerStarted = false;
            //worker添加为false
            boolean workerAdded = false;
            //Worker就是工作线程
            Worker w = null;
            try {
                //创建worker,传入任务
                w = new Worker(firstTask);
                //从worker中获取线程t
                final Thread t = w.thread;
                //线程t不为null
                if (t != null) {
                //获取线程池的全局锁,避免当前线程添加任务时,其它线程清除了线程池,清除线程池需要先获取这个锁
                    final ReentrantLock mainLock = this.mainLock;
                    //加锁
                    mainLock.lock();
                    try {
                        //获取线程池运行状态
                        int rs = runStateOf(ctl.get());
    					//如果线程池是Running状态,
                        if (rs < SHUTDOWN ||
                        //如果是SHUTDOWN状态,创建空任务工作线程,处理阻塞队列中的任务
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // workerStarted 为false状态,线程已经运行,抛出异常
                                throw new IllegalThreadStateException();
                            //将工作线程添加到集合中
                            workers.add(w);
                            //获取工作线程个数
                            int s = workers.size();
                            //如果现在工作线程数大于之前记录的最大工作线程数,修改largestPoolSize的值
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock(); //释放锁
                    }
                    if (workerAdded) {
                        t.start(); //启动工作线程
                        workerStarted = true; //启动工作线程成功
                    }
                }
            } finally {
                if (! workerStarted) 
                    addWorkerFailed(w);
            }
            return workerStarted; //返回工作是否启动
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    3.Worker类

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            //执行任务的线程
            final Thread thread;
            //初始化Worker时传进来的任务,可能为null
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    4.runWorker(Worker w)

    //循环从队列获取任务并执行
    final void runWorker(Worker w) {
            //获取当前执行线程
            Thread wt = Thread.currentThread();
            //获取当前传进线程池的任务
            Runnable task = w.firstTask;
            //将Worker.firstTask置为空
            w.firstTask = null;
            //此过程允许发生线程中断
            w.unlock(); // allow interrupts
            //执行被中断标志,记录是否因异常而跳出循环
            boolean completedAbruptly = true;
            try {
                //如果线程池外部传递了任务则直接执行外部传递的任务
                //如果没有获取到外部传递进来的任务则调用getTask()从队列中获取任务并执行,如果在任务队列中获取到了任务则直接执行已经获取的任务,如果任务队列为空则空转,直至当前线程死亡
                while (task != null || (task = getTask()) != null) {
                //加锁,禁止线程中断(防止线程在执行过程中中断导致不可恢复的错误)
                    w.lock();
                    //再次确认线程池以及当前工作线程状态,如果线程停止,确保当前线程被中断
                    //如果线程池为停止,确保当前线程被中断
                    //          
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted()  &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                    	//钩子方法
                        beforeExecute(wt, task);
                        try {
                        //调用任务的run方法,因为worker本身就是一个Runnable接口的实现类
                            task.run();
                            //钩子方法
                            afterExecute(task, null);
                        } catch (Throwable ex) {
                            afterExecute(task, ex);
                            throw ex;
                        }
                    } finally {
                    //执行完成后将获取的任务置空
                        task = null;
                        //执行完成自增当前工作线程的任务数量
                        w.completedTasks++;
                        //释放锁
                        w.unlock();
                    }
                }
                //执行到此,表示任务正常执行完成,将 异常中断标志 置为false
                completedAbruptly = false;
            } finally {
                //执行回收工作线程的逻辑
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    上述代码的实现逻辑主要如下:通过一个死循环让当前线程一直处于运行状态,防止操作系统将当前工作线程回收。是否执行死循环的条件在于判断task是否为空,在调用方法执行任务时会先获取外部传递进来的任务,如果没有外部任务再利用getTask()方法从队列中获取任务并执行。
    5.getTask()

    private Runnable getTask() {
    		//是否超时标志位
            boolean timedOut = false;
    		//死循环
            for (;;) {
                int c = ctl.get();
    
                //如果线程池的状态为SHUTDOWN 或者阻塞队列为空
                if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                    decrementWorkerCount(); //CAS自旋递减workerCount直到成功
                    return null;
                }
    			//获取工作线程数
                int wc = workerCountOf(c);
    
                // allowCoreThreadTimeOut默认是false,核心线程不需要进行超时控制
                //当工作线程数量大于核心线程,需要进行超时控制
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    			//
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    本段代码的执行逻辑如下:先验证线程池的状态,正常时开启任务获取的逻辑,如果线程队列中任务为空,当前线程会阻塞等待,直到任务队列中有新的任务时才会获取并执行,同时如果线程池设置了存活时间,当前线程阻塞至存活时间阈值,超出存活时间则返回null,若返回null,runWorker会执行processWorkerExit方法来进行线程资源的回收。
    6.processWorkerExit

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 如果completedAbruptly为true,表明任务执行过程中发生异常,需要对WorkerCount进行递减
            // 如果completedAbruptly=false,说明是由getTask返回null导致的,WorkerCount递减的操作已经执行
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                //从workers中删除当前worker
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    		//根据线程池状态判断是否结束线程池
            tryTerminate();
    		//如果是异常导致线程结束(completedAbruptly=true),需要重新调用addWorker()方法增加一个线程,保持线程数量
    		//如果是由getTask()返回null导致的线程结束,需要进行如下判断:
    		//1.如果allowCoreThreadTimeOut=true且队列不为空,那么需要保证至少有一个线程
    		//2.如果allowCoreThreadTimeOut=false,那么需要保证线程数大于等于corePoolSize
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    processWorkerExit方法会先判断当前线程是因为什么而发生回收(执行异常或超出存活时间),如果是超出存活时间,需要先判断线程池状态再移除当前线程。如果是由于异常导致,需要先对线程池工作线程进行自减,然后再移出工作集中的工作线程,最后调用addWorker方法添加一个工作线程保证线程池内工作线程的数量。

    7.tryTerminate()

    //根据线程池状态判断是否结束线程
    final void tryTerminate() {
            //死循环
            for (;;) {
            //获取ctl
                int c = ctl.get();
                //如果线程池为Running状态,或者大于等于TIDYING,或者运行状态为SHUTDOWN且队列不为空,直接return;
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                // 如果工作线程数不为0,则中断一个空闲线程并return
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                //利用CAS设置线程池状态为TIDYING
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    //设置成功执行terminated()禁止中断操作
                        try {
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    上述代码逻辑简单,若线程池不处于STOP或者TERMINATED状态则直接返回,否则执行terminated()函数终止线程池。

    2.3 注意事项

    线程池大小如何合理配置?主要分为CPU密集型和IO密集型两种情况。

    2.3.1 CPU密集型

    CPU密集型任务计算量比较大,需要耗费大量CPU计算能力,阻塞情况出现概率较小。计算公式如下:

    CPU核数+1

    CPU密集型任务只有在真正的多核CPU上才能提高效率,若配置较多线程数,反而降低线程效率。

    2.3.2 IO密集型

    IO密集型需要频繁在磁盘读取数据,CPU需要等到数据的读取,CPU可能会出现一定空闲,计算公式如下:

    (1)CPU核数*2
    (2)CPU核数 / 1 - 阻塞系数(cpu密集型任务阻塞系数为0,IO密集型一般在0.8-0.9之间)
    阻塞系数计算公式:执行该任务所需的时间与(阻塞时间+计算时间)的比值,即w/(w+c)

    3.小结

    1.线程池能够提升创建线程的效率,减少资源损耗;
    2.ThreadPoolExecutor有7大核心参数,推荐使用ThreadPoolExecutor来创建线程池,避免使用Executors来创建线程池;
    3.针对CPU密集型和IO密集型,线程池内线程大小需要根据情况区分。

    4.参考文献

    1.《JAVA并发编程的艺术》-方腾飞著
    2.https://www.bilibili.com/video/BV1244y1n7bz
    3.https://www.bilibili.com/video/BV1mV4y1J7qc

  • 相关阅读:
    天气很热,用Python告诉你奶茶哪家最好喝性价比最高?
    Qt pro文件中 CONFIG += debug 作用
    136. 只出现一次的数字
    人脸识别技术演进:从几何算法到深度学习的深度剖析
    解决uni.getLocation用户端首次拒绝后,点第二次不会再调用的问题
    Redis专题(六):Redis主从复制、哨兵搭建以及原理
    实操 | 制造一个OOM,生成jvm的dump文件,并通过jvisualvm工具解析
    引爆用户参与:消息重弹,让您的推送不再被忽略
    【js基础】js中常用的操作数组的方法
    俄罗斯方块
  • 原文地址:https://blog.csdn.net/qq_33479841/article/details/126178872