JAVA线程池是一种基于池化思想的线程管理工具,它能够控制线程运行的数量。在线程运行中,线程过多会带来额外的开销,其中包括创建销毁线程的开销,操作系统调度线程的开销等,若频繁创建大量线程、销毁线程,会极大地降低计算机地整体性能。线程池技术能够维护指定数量线程,且根据线程数量安排执行数量。这种做法的优势是降低创建和销毁线程的代价,另一方面避免同一时间创建大量线程导致计算机资源消耗,程序运行异常。本文将分析JAVA线程池的使用及底层实现原理,帮助大家更好地理解该技术。
若一个程序中并发的线程数量较大,并且每个线程都是执行一个较短的任务就结束了,频繁创建线程会严重影响系统的效率,因为频繁创建和销毁线程需要时间,此时就需要线程池来提高效率。
核心原理其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源。
1.降低资源消耗。减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务;
2.提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行;
3.提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java里面线程池的顶级接口是 Executor,不过真正的线程池接口是 ExecutorService, ExecutorService 的默认实现是 ThreadPoolExecutor;普通类 Executors 里面调用的就是 ThreadPoolExecutor。具体类图如下所示:

上述类图说明:
| 接口或类 | 解释 |
|---|---|
| Executor接口 | 定义了一个接收Runnable对象的方法executor |
| ExecutorService接口 | 继承了Executor接口,增加了获取返回结果的submit方法 |
| AbstractExecutorService抽象类 | AbstractExecutorService类实现了ExecutorService接口,默认实现了一些方法 |
| ScheduledExecutorService接口 | 一个可定时调度任务的接口 |
| ScheduledThreadPoolExecutor类 | 一个可定时调度任务的线程池 |
| ThreadPoolExecutor类 | 用于创建线程池的类 |
| ForkJoinPool类 | 是一种拆分任务,合并计算结果的线程池 |
1.newCachedThreadPool
newCachedThreadPool 是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。
创建方式如下:
ExecutorService executorService = Executors.newCachedThreadPool();
2.newSingleThreadExecutor
创建一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
创建方式如下:
ExecutorService executorService = Executors.newSingleThreadExecutor();
3.newFixedThreadPool
newFixedThreadPool 创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
创建方式如下:
ExecutorService executorService = Executors.newFixedThreadPool(3);
4.newScheduledThreadPool
newScheduledThreadPool 创建一个队列长度为Integer.MAX_VALUE大小的线程池,此线程池支持定时以及周期性执行任务的需求。
创建方式如下:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
在《阿里巴巴开发者手册》中,关于线程池技术的使用有如下约束:

其实通过Executors创建的四种线程池,底层都是调用ThreadPoolExecutor类来进行创建的,源码如下:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
因此本文主要探索线程池创建类ThreadPoolExecutor的使用及原理。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handle
由源码可知,ThreadPoolExecutor共有四个构造方法,共有7个核心参数,具体核心参数解释如下:
| 参数 | 解释 |
|---|---|
| corePoolSize | 核心线程数,线程池中常驻线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程 |
| maximumPoolSize | 最大线程数,核心线程满的基础上增加额外非核心参数,需要注意的是只有workQueue队列满时才会创建最多(maximumPoolSize-corePoolSize)个非核心线程 |
| keepAliveTime | 非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉,当corePoolSize=maxPoolSize时此参数就不起作用了,此时不存在非核心线程 |
| unit | keepAliveTime的时间单位,单位包括:TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟)、TimeUnit.MILLISECONDS(微秒)等 |
| workQueue | 阻塞队列,当线程池中的任务数大于corePoolSize时,新进来的任务就会被放到队列中,常用的三种队列包括:ArrayBlockingQueue(基于数组的先进先出队列,此队列创建时必须指定大小) 、LinkedBlockingQueue(基于链表的先进先出队列,如果创建时没有指定队列大小,则使用默认值Integer.MAX_VALUE) 、SynchronousQueue(此队列不会保存提交的任务,而是直接新建一个线程执行新来的任务) |
| threadFactory | 线程工厂,用来创建线程,默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建 |
| handler | 拒绝策略,当线程任务大于最大线程数时,会采用拒绝策略。默认拒绝策略有如下四种:AbortPolicy(丢弃任务并抛出RejectedExecutionException异常)、DiscardPolicy(丢弃任务,但是不抛出异常) 、DiscardOldestPolicy(丢弃队列最前面的任务,然后重新尝试执行任务,不断重复此过程)、CallerRunsPolicy(由调用线程处理该任务) |

若有一个线程池,核心线程corePoolSize=10,maximumPoolSize=50,workQueue=100,当任务过来会先创建10个核心线程,若任务不断增多,超过核心线程最大值时,会放入阻塞队列workQueue中,若任务量还在增多,超过阻塞队列最大值(100),就会创建非核心线程来执行任务(最多40个),此时如果还有任务来就会按照拒绝策略拒绝任务。
//ctl是一个线程安全的int类型数值,它有两个意思:1.声明当前线程池的状态 2.声明线程池中的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS的值固定为29,因为Integer类型占4个字节,共32位,Integer.SIZE就为32
private static final int COUNT_BITS = Integer.SIZE - 3;
//将1往前位移29位,得到值00100000 00000000 00000000 00000000,再减去1,得到00011111 11111111 11111111 11111111,因此COUNT_MASK值为3777777777
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
//-1左移29位,得到11100000 00000000 00000000 00000000,也就是111表示线程为Running状态,表示线程池正常接收任务
private static final int RUNNING = -1 << COUNT_BITS;
//0左移29位,得到00000000 00000000 00000000 00000000,也就是000表示线程池为SHUTDOWN状态,不接收新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理
private static final int SHUTDOWN = 0 << COUNT_BITS;
//1左移29位,得到00100000 00000000 00000000 00000000,也就是001表示线程池状态为STOP,不接受新任务,也不去处理阻塞团队中的任务,同时会中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
//2左移29位,得到01000000 00000000 00000000 00000000,也就是010表示线程池状态为TIDYING,代表当前线程池即将终止
private static final int TIDYING = 2 << COUNT_BITS;
//3左移29位,得到01100000 00000000 00000000 00000000,也就是011表示线程池状态为TERMINATED,代表线程池为TERMINATED,
private static final int TERMINATED = 3 << COUNT_BITS;
//得到线程池的状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//得到当前线程池的线程数量(正在工作的线程)
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//把运行状态和线程数量打包成一个整数
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
//锁,线程池的状态改变(线程池大小、运行状态等)的改变都需要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
//用来存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();
//用来记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
//用来记录已经执行完毕的任务个数
private long completedTaskCount;
//线程工厂,用来创建线程
private volatile ThreadFactory threadFactory;
//任务拒绝策略
private volatile RejectedExecutionHandler handler;
//线程存活时间
private volatile long keepAliveTime;
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;
//核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int corePoolSize;
//线程池最大能容忍的线程数
private volatile int maximumPoolSize;
//线程池中当前的线程数
private volatile int poolSize;

1.RUNNING:当创建线程池后,初始化完成后,线程池处于RUNNING状态;
2.SHUTDOWN:调用shutdown()方法,线程池会处于SHUTDOWN状态,此时线程池不能够接收新任务,可以处理队列中的任务,等待所有任务执行完成;
3.STOP:调用shutdownNow()方法,线程池处于停止状态,此时线程池不能接收新的任务,同时尝试中断正在工作的线程;
4.TIDYING:过渡状态,此时线程池中所有任务已终止,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态;
5.TERMINATED:当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
1.execute()
public void execute(Runnable command) {
//若Runnable为空,直接抛出异常
if (command == null)
throw new NullPointerException();
//获取32位的ctl的值,高三位是线程状态
int c = ctl.get();
//判断工作线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//是否创建核心线程
if (addWorker(command, true))
//核心线程创建成功并添加任务成功
return;
//执行此语句,表明创建核心线程数失败,重新获取ctl的值
c = ctl.get();
}
//此处是因为创建核心线程失败
//判断线程池是running状态,将任务添加到阻塞队列中,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判断是否是Running状态,如果不是Running,移除任务
if (! isRunning(recheck) && remove(command))
//拒绝策略
reject(command);
//如果线程池工作线程数量为0,则新建一个空任务的线程
else if (workerCountOf(recheck) == 0)
//如果线程池非running状态,无法添加任务
//阻塞队列有任务,但是没有工作现场,添加一个任务为空的工作线程处理阻塞队列中的任务
addWorker(null, false);
}
//线程池不是Running状态或入队失败,尝试扩容maxPoolSize后再次
else if (!addWorker(command, false))
reject(command);
}
2.addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //循环跳出点
for (;;) {
//获取ctl
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// 除了RUNNING状态,其它状态均有可能
//rs == SHUTDOWN,如果不是SHUTDOWN ,就代表是STOP或者更高的状态,这时不需要添加任务处理
//如果任务为空,并且线程池状态不是Running,不需要处理
//如果阻塞队列为空,则返回false,外侧!再次取反,该条件为true,不做处理
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
//获取工作线程个数
int wc = workerCountOf(c);
//如果工作线程数大于线程池最大容量,不再创建
if (wc >= CAPACITY ||
//判断wc是否超过核心线程或者最大线程
wc >= (core ? corePoolSize : maximumPoolSize))
//构建工作线程失败
return false;
//采用CAS将工作线程数+1
if (compareAndIncrementWorkerCount(c))
//成功,跳出外侧循环
break retry;
//并发操作导致失败,重新获取ctl
c = ctl.get(); // Re-read ctl
//重新判断线程池状态
if (runStateOf(c) != rs)
//如果线程池状态发生变化,开启下一次循环,如果状态没变化,重新执行内存循环for(;;)
continue retry;
}
}
//worker开始为false
boolean workerStarted = false;
//worker添加为false
boolean workerAdded = false;
//Worker就是工作线程
Worker w = null;
try {
//创建worker,传入任务
w = new Worker(firstTask);
//从worker中获取线程t
final Thread t = w.thread;
//线程t不为null
if (t != null) {
//获取线程池的全局锁,避免当前线程添加任务时,其它线程清除了线程池,清除线程池需要先获取这个锁
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//获取线程池运行状态
int rs = runStateOf(ctl.get());
//如果线程池是Running状态,
if (rs < SHUTDOWN ||
//如果是SHUTDOWN状态,创建空任务工作线程,处理阻塞队列中的任务
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // workerStarted 为false状态,线程已经运行,抛出异常
throw new IllegalThreadStateException();
//将工作线程添加到集合中
workers.add(w);
//获取工作线程个数
int s = workers.size();
//如果现在工作线程数大于之前记录的最大工作线程数,修改largestPoolSize的值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); //释放锁
}
if (workerAdded) {
t.start(); //启动工作线程
workerStarted = true; //启动工作线程成功
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted; //返回工作是否启动
}
3.Worker类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
//执行任务的线程
final Thread thread;
//初始化Worker时传进来的任务,可能为null
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
4.runWorker(Worker w)
//循环从队列获取任务并执行
final void runWorker(Worker w) {
//获取当前执行线程
Thread wt = Thread.currentThread();
//获取当前传进线程池的任务
Runnable task = w.firstTask;
//将Worker.firstTask置为空
w.firstTask = null;
//此过程允许发生线程中断
w.unlock(); // allow interrupts
//执行被中断标志,记录是否因异常而跳出循环
boolean completedAbruptly = true;
try {
//如果线程池外部传递了任务则直接执行外部传递的任务
//如果没有获取到外部传递进来的任务则调用getTask()从队列中获取任务并执行,如果在任务队列中获取到了任务则直接执行已经获取的任务,如果任务队列为空则空转,直至当前线程死亡
while (task != null || (task = getTask()) != null) {
//加锁,禁止线程中断(防止线程在执行过程中中断导致不可恢复的错误)
w.lock();
//再次确认线程池以及当前工作线程状态,如果线程停止,确保当前线程被中断
//如果线程池为停止,确保当前线程被中断
//
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//钩子方法
beforeExecute(wt, task);
try {
//调用任务的run方法,因为worker本身就是一个Runnable接口的实现类
task.run();
//钩子方法
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
//执行完成后将获取的任务置空
task = null;
//执行完成自增当前工作线程的任务数量
w.completedTasks++;
//释放锁
w.unlock();
}
}
//执行到此,表示任务正常执行完成,将 异常中断标志 置为false
completedAbruptly = false;
} finally {
//执行回收工作线程的逻辑
processWorkerExit(w, completedAbruptly);
}
}
上述代码的实现逻辑主要如下:通过一个死循环让当前线程一直处于运行状态,防止操作系统将当前工作线程回收。是否执行死循环的条件在于判断task是否为空,在调用方法执行任务时会先获取外部传递进来的任务,如果没有外部任务再利用getTask()方法从队列中获取任务并执行。
5.getTask()
private Runnable getTask() {
//是否超时标志位
boolean timedOut = false;
//死循环
for (;;) {
int c = ctl.get();
//如果线程池的状态为SHUTDOWN 或者阻塞队列为空
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount(); //CAS自旋递减workerCount直到成功
return null;
}
//获取工作线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut默认是false,核心线程不需要进行超时控制
//当工作线程数量大于核心线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
本段代码的执行逻辑如下:先验证线程池的状态,正常时开启任务获取的逻辑,如果线程队列中任务为空,当前线程会阻塞等待,直到任务队列中有新的任务时才会获取并执行,同时如果线程池设置了存活时间,当前线程阻塞至存活时间阈值,超出存活时间则返回null,若返回null,runWorker会执行processWorkerExit方法来进行线程资源的回收。
6.processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为true,表明任务执行过程中发生异常,需要对WorkerCount进行递减
// 如果completedAbruptly=false,说明是由getTask返回null导致的,WorkerCount递减的操作已经执行
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//从workers中删除当前worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//根据线程池状态判断是否结束线程池
tryTerminate();
//如果是异常导致线程结束(completedAbruptly=true),需要重新调用addWorker()方法增加一个线程,保持线程数量
//如果是由getTask()返回null导致的线程结束,需要进行如下判断:
//1.如果allowCoreThreadTimeOut=true且队列不为空,那么需要保证至少有一个线程
//2.如果allowCoreThreadTimeOut=false,那么需要保证线程数大于等于corePoolSize
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit方法会先判断当前线程是因为什么而发生回收(执行异常或超出存活时间),如果是超出存活时间,需要先判断线程池状态再移除当前线程。如果是由于异常导致,需要先对线程池工作线程进行自减,然后再移出工作集中的工作线程,最后调用addWorker方法添加一个工作线程保证线程池内工作线程的数量。
7.tryTerminate()
//根据线程池状态判断是否结束线程
final void tryTerminate() {
//死循环
for (;;) {
//获取ctl
int c = ctl.get();
//如果线程池为Running状态,或者大于等于TIDYING,或者运行状态为SHUTDOWN且队列不为空,直接return;
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程数不为0,则中断一个空闲线程并return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//利用CAS设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
//设置成功执行terminated()禁止中断操作
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
上述代码逻辑简单,若线程池不处于STOP或者TERMINATED状态则直接返回,否则执行terminated()函数终止线程池。
线程池大小如何合理配置?主要分为CPU密集型和IO密集型两种情况。
CPU密集型任务计算量比较大,需要耗费大量CPU计算能力,阻塞情况出现概率较小。计算公式如下:
CPU核数+1
CPU密集型任务只有在真正的多核CPU上才能提高效率,若配置较多线程数,反而降低线程效率。
IO密集型需要频繁在磁盘读取数据,CPU需要等到数据的读取,CPU可能会出现一定空闲,计算公式如下:
(1)CPU核数*2
(2)CPU核数 / 1 - 阻塞系数(cpu密集型任务阻塞系数为0,IO密集型一般在0.8-0.9之间)
阻塞系数计算公式:执行该任务所需的时间与(阻塞时间+计算时间)的比值,即w/(w+c)
1.线程池能够提升创建线程的效率,减少资源损耗;
2.ThreadPoolExecutor有7大核心参数,推荐使用ThreadPoolExecutor来创建线程池,避免使用Executors来创建线程池;
3.针对CPU密集型和IO密集型,线程池内线程大小需要根据情况区分。
1.《JAVA并发编程的艺术》-方腾飞著
2.https://www.bilibili.com/video/BV1244y1n7bz
3.https://www.bilibili.com/video/BV1mV4y1J7qc