public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
the number of threads to keep in the pool, even if they are idle, unless is set
核心线程数大小:不管它们创建以后是不是空闲的。线程池需要保持 corePoolSize 数量的线程,除非设置了 allowCoreThreadTimeOut。
the maximum number of threads to allow in the pool
最大线程数:线程池中最多允许创建 maximumPoolSize 个线程。
when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
存活时间:如果经过 keepAliveTime 时间后,超过核心线程数的线程还没有接受到新的任务,那就回收。
the time unit for the keepAliveTime argument
keepAliveTime 的时间单位
the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
存放待执行任务的队列:当提交的任务数超过核心线程数大小后,再提交的任务就存放在这里。
the factory to use when the executor creates a new thread
创建线程的工厂。创建线程或线程池时指定有意义的线程名称,方便出错时回溯。
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
// 定义线程组名称,在利用 jstack 来排查问题时,非常有帮助
UserThreadFactory(String whatFeatureOfGroup) {
namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}
the handler to use when execution is blocked because the thread bounds and queue capacities are reached
拒绝策略:当队列里面放满了任务、最大线程数的线程都在工作时,这时继续提交的任务线程池就处理不了,应该执行怎么样的拒绝策略。
抛出 RejectedExecutionException 异常。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
默默丢弃被拒绝的任务。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
提交任务的线程去处理被拒绝的任务。除非 executor 已经关闭了,这种情况下,任务将被丢弃。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
丢掉 workQueue 里排队最久还没被处理掉的任务,然后重试这个任务。除非,excutor 被 shut down 了,那这个任务就被丢弃了。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
实现 RejectedExecutionHandler 重写 rejectedExecution 方法。
class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//自定义处理方式,可以先打个日志 xxx 被我拒绝了 然后我把它放到了 redis 里 ...
//log("r rejected")
//save r in kafka、mysql、redis
}
}
// jdk 源码中常见操作,高 3 位表示线程池状态,低 29 位表示 worker 数量
// 111 00000000000000000000000000000 RUNNING
// 000 00000000000000000000000000000 SHUTDOWN
// 001 00000000000000000000000000000 STOP
// 010 00000000000000000000000000000 TYDYING
// 011 00000000000000000000000000000 TERMINATED
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE = 32,所以 COUNT_BITS 为 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池允许的最大线程数。1 左移 29 位,然后减 1,即低 29 位都是 1
// 00100000 00000000 00000000 00000000 - 1
// 00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池有 5 种状态
// RUNNING:运行状态,接受新任务,持续处理任务队列里的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// 调用 shutdown()方法会进入此状态,不再接受新任务,但要处理任务队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 调用 shutdownNow()方法,不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务
private static final int STOP = 1 << COUNT_BITS;
// 表示线程池正在停止运作,中止所有任务,销毁所有工作线程。
private static final int TIDYING = 2 << COUNT_BITS;
// 表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池状态即 ctl 高 3 位(低 29 位都是 0)
// 111 00000000000000000000000000000 ctl 初始值
// 111 00000000000000000000000000000 ~CAPACITY(CAPACITY 取反)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池 worker 数量,即 ctl 低 29 位(高 3 位都是 0)
// 111 00000000000000000000000000000 ctl 初始值
// 000 11111111111111111111111111111 CAPACITY
private static int workerCountOf(int c) { return c & CAPACITY; }
//根据线程池状态和线程池 worker 数量,生成 ctl 值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 线程池状态小于 xxx
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 线程池状态大于等于 xxx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 线程池中曾经创建的最大的线程数量
private int largestPoolSize;
// 已经完成的任务数量
private long completedTaskCount;
在运行期线程池使用方调用此方法设置 corePoolSize 之后,线程池会直接覆盖原来的 corePoolSize 值,并且基于当前值和原始值的比较结果采取不同的处理策略。
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
// 若 corePoolSize 小于当前工作线程数,则说明有多余的工作线程
if (workerCountOf(ctl.get()) > corePoolSize)
// 对空闲的线程发起中断请求以实现回收
interruptIdleWorkers();
// 若 corePoolSize 大于当前工作线程数且阻塞队列中有待执行任务
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
// 创建工作线程执行阻塞队列里的任务
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
// 若工作线程数大于最大线程数
if (workerCountOf(ctl.get()) > maximumPoolSize)
// 对空闲线程发起中断请求
interruptIdleWorkers();
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程少于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程并执行任务
if (addWorker(command, true))
return;
// 添加核心线程失败(线程池状态是 SHUTDOWN 或者 STOP),重新获取 ctl
c = ctl.get();
}
// 线程数不小于核心线程数且线程池是运行状态,添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// DCL 再次判断线程池状态
if (! isRunning(recheck) && remove(command))
reject(command);
// 若工作线程数为 0,则创建 command 为 null 的非核心线程去执行阻塞队列里的任务
// 核心线程空闲时间超过 keepAlived 被销毁或者将核心线程数设置为 0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建非核心线程执行任务。
// 1. 线程池不是运行状态时,addWorker 内部会判断线程池状态
// 2. addWorker 第 2 个参数为 true 表示创建核心线程,为 false 表示创建非核心线程
// 3. addWorker 返回 false,则说明非核心线程添加失败,需要执行reject操作
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
// 将外层循环标记为 retry
retry:
// retry 循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 一:判断线程池状态
if (rs >= SHUTDOWN &&
// 线程池状态大于 SHUTDOWN 的只有 STOP、TIDYING、TERMINATED 这 3 种。当线程池是这 3 种状态时,就直接返回 false,不添加工作线程了。
// 当线程池状态为 SHUTDOWN 时,阻塞队列没有任务了或者 firstTask 不为空也直接返回 false。
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
// 二:判断工作线程个数
for (;;) {
int wc = workerCountOf(c);
// 工作线程数量大于最大线程数,直接返回 false
// core 是 addWorker 第二个参数,true 表示核心线程,false 表示非核心线程
// 若添加核心线程且工作线程数大于等于 corePoolSize,则返回 false
// 若添加非核心线程且工作线程大于等于 maximumPoolSize,则返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用 CAS 的方式增加工作线程的数量。
// 若增加成功,则直接跳出外层循环进入到 三:添加工作线程
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 失败,重写获取 ctl
c = ctl.get();
// 线程池状态发生变化,重新进入 retry 循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 三:添加工作线程
// 工作线程是否已启动
boolean workerStarted = false;
// 工作线程是否已添加
boolean workerAdded = false;
// class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
// 一般不为 null,除非自定义的线程工厂返回 null
if (t != null) {
// 获取锁资源
final ReentrantLock mainLock = this.mainLock;
// 加锁。启动工作线程时,避免线程池状态发生改变
// shutdown() 和 shutdownNow() 方法都需要获取 mainLock 锁
mainLock.lock();
try {
// 这儿需要重新检查线程池状态 DCL
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断工作线程中的 Thread 是否已启动,一般情况不会启动
// 除非在自定义线程工厂里把线程启动,则抛出异常。
// 线程的启动要交由线程池
if (t.isAlive())
throw new IllegalThreadStateException();
// HashSet workers = new HashSet();
// 将工作线程存储到 HashSet 中
workers.add(w);
// largestPoolSize 记录了历史最大工作线程数
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程添加成功
workerAdded = true;
}
} finally {
// 释放锁资源
mainLock.unlock();
}
// 若工作线程添加成功
if (workerAdded) {
// 启动工作线程
t.start();
// workerStarted 更新为 true,表明工作线程已启动
workerStarted = true;
}
}
} finally {
// 添加工作线程失败
if (! workerStarted)
// 移除 works 中的工作线程,并将 workerCount -1(回滚上面的操作)
addWorkerFailed(w);
}
return workerStarted;
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 调用 unlock() 是为了让外部可以中断
w.unlock();
boolean completedAbruptly = true;
try {
// 1. 若构造工作线程的时候,指定了任务即 firstTask 不为 null,则执行任务;
// 2. 如果 firstTask 为 null,则调用 getTask() 从阻塞队列获取任务。若 getTask() 返回 null,则跳出 while 循环。
// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// 这儿对worker进行加锁,是为了达到下面的目的
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
w.lock();
// 如果线程池状态大于等于 STOP,则中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 执行任务,且在执行前后通过 beforeExecute() 和 afterExecute() 来扩展其功能。
// 这两个方法在当前类里面为空实现。
try {
// 执行任务前的钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后的钩子
afterExecute(task, thrown);
}
} finally {
// 帮助gc
task = null;
// 执行完任务数加一
w.completedTasks++;
w.unlock();
}
}
// 钩子函数抛异常不会走这里
completedAbruptly = false;
} finally {
// completedAbruptly 为 true 可能钩子函数抛异常了,while 循环没有正常结束
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 一:线程池状态判断
// rs >= SHUTDOWN && workQueue.isEmpty() 线程池状态为 SHUTDOWN,且阻塞队列为空
// rs >= STOP 线程池状态为 STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 工作线程数减一
decrementWorkerCount();
return null;
}
// 二:工作线程数判断
// 获取工作线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 核心线程是否允许超时
// 工作线程数是否大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 工作线程数 -1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 三:处理任务
try {
Runnable r = timed ?
// 非核心线程,使用 poll 方法拉取阻塞队列的任务
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 核心线程,使用 take 方法阻塞获取任务
workQueue.take();
if (r != null)
// 获取到任务直接返回
return r;
// poll 任务超时,没获取到任务
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// runWorker() 方法异常时,completedAbruptly 为 true
// 工作线程数减一
// 正常情况下在 getTask() 方法中会将工作线程数减一
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 线程池完成的任务数 = 线程池完成的任务数 + 当前工作线程完成的任务数
completedTaskCount += w.completedTasks;
// 移除当前工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 判断当前可以允许的最小核心线程数量
// allowCoreThreadTimeOut 为 true 的话,最小核心线程数可以为 0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 若阻塞队列不为空,则最少需要一个工作线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 不需要添加工作线程
if (workerCountOf(c) >= min)
return;
}
// 添加一个非核心线程执行任务
addWorker(null, false);
}
}
上文我们介绍线程池的核心参数 workQueue 时讲到,线程池提交任务时,先交由空闲的核心线程处理,没有空闲的核心线程以后就放到阻塞队列里。阻塞队列也满了就创建非核心线程来处理。
但是有没有一种可能,没有空闲的核心线程以后,先创建非核心线程,直到核心线程数 + 非核心线程达到 maximumPoolSize 再将任务放到队列里。
答案是有的,在 Dubbo 源码中就有现成的方案。Github issue : Extension: Eager Thread Pool #1568

看最后一句,Reference: Tomcat’s thread pool design 其实他也是借鉴了 Tomcat 的设计。
先一起看下扩展之前,再复习下 execute() 方法的逻辑。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程少于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程并执行任务
if (addWorker(command, true))
return;
// 添加核心线程失败(线程池状态是 SHUTDOWN 或者 STOP),重新获取 ctl
c = ctl.get();
}
// 线程数不小于核心线程数且线程池是运行状态,添加任务到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// DCL 再次判断线程池状态
if (! isRunning(recheck) && remove(command))
reject(command);
// 若工作线程数为 0,则创建 command 为 null 的非核心线程去执行阻塞队列里的任务
// 核心线程空闲时间超过 keepAlived 被销毁或者将核心线程数设置为 0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建非核心线程执行任务。
// 1. 线程池不是运行状态时,addWorker 内部会判断线程池状态
// 2. addWorker 第 2 个参数为 true 表示创建核心线程,为 false 表示创建非核心线程
// 3. addWorker 返回 false,则说明非核心线程添加失败,需要执行reject操作
else if (!addWorker(command, false))
reject(command);
}
package eagerthreadpool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
// 自定义的线程池类,继承自ThreadPoolExecutor
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
// offer方法的含义是:将任务提交到队列中,返回值为true/false,分别代表提交成功/提交失败
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
// 线程池的当前线程数
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
// 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理
return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程
return false;
}
// 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中
return super.offer(runnable);
}
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* 定义一个成员变量,用于记录当前线程池中已提交的任务数量,在队列的 offer() 方法中要用
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
// 将池中已提交的任务数 + 1
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
自定义的 EagerThreadPoolExecutor 依赖自定义的 TaskQueue 的 offer 返回值来决定是否创建更多的线程,达到先判断 maximumPoolSize 再判断队列的目的。