ThreadPoolExecutor 类

前面我们提到了,线程池实现类 ThreadPoolExecutorExecutor 框架最核心的类,对于Executor接口和ExecutorService接口我们都已经分析过了,在这里只贴一下接口内容方便回顾,详细请看上篇

image-20260212115724180
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public interface ExecutorService extends Executor, AutoCloseable {

void shutdown();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

@Override
default void close() {....}
}

ThreadPoolExecutor参数分析

ThreadPoolExecutor 类中提供了的四个构造方法。

来看最长的构造方法,他能指定ThreadPoolExecutor 中所有可以自定义的参数

image-20260214085834683
  • corePoolSize:任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unitkeepAliveTime 参数的时间单位。
  • threadFactoryexecutor 创建新线程的时候会用到的线程工厂
  • handler :拒绝策略
线程池各个参数的关系

关于handler 拒绝策略:如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。这个是默认策略

    image-20260214090320557
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝了的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能,因为可能影响主线程。如果应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。

  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。

  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求

Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 拒绝策略来配置线程池的时候,默认使用的是 AbortPolicy。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor 将抛出 RejectedExecutionException 异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicyCallerRunsPolicy 和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务

image-20260214090555733
image-20260214090611302

ThreadPoolExecutor常用的阻塞队列

ThreadPoolExecutor中有一个参数就是设置workQueue,它用于设置线程池中的等待队列,那么它可以使用哪些队列来使用呢?

因为当所有核心线程都在忙,且线程数未达最大值时,新任务会先放入 workQueue 等待。所以说,有界队列可防止内存溢出,无界队列可能导致任务无限堆积。

不同业务和功能的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。

  • LinkedBlockingQueue :链表阻塞队列,默认无界,基于链表实现,FIFO,默认容量为 Integer.MAX_VALUE,但是实际上它也可通过构造函数指定容量,变成有界队列。
    • 对于Executors工厂类,FixedThreadPoolSingleThreadExector 默认使用了这个队列。FixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
  • ArrayBlockingQueue:数组阻塞队列,基于数组实现,FIFO,必须在构造时指定容量,它内部使用 ReentrantLock 保证线程安全,对于需要严格控制内存使用的系统用它是很好的,而且它能希望在队列满时触发扩容或拒绝策略
  • SynchronousQueue:同步移交队列,容量为0,所以它不存储任何元素,每次 put() 必须等待另一个线程 take(),反之亦然,本质上是一个直接传递通道,适用于短生命周期、高并发突发任务的情况,它是CachedThreadPool的默认队列。所以说,使用这个队列的线程池的行为如下,提交任务时,若无空闲线程,则立即尝试创建新线程,只要未达 maximumPoolSize,它无法缓冲任务,压力直接传导到线程创建,所以说常配合大 maxPoolSize 使用。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
  • DelayedWorkQueue:延迟阻塞队列,DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的,谁先能被执行谁在前面。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。它通常配合 ScheduledThreadPoolExecutor 使用
    • 对于Executors工厂类,ScheduledThreadPoolSingleThreadScheduledExecutor 默认使用了这个队列
  • PriorityBlockingQueue :优先级阻塞队列,无界,它按任务优先级排序,任务类必须实现 Comparable<Runnable> 或提供 Comparator,任务有优先级差异,同优先级任务的执行顺序不保证 FIFO,因无界仍存在 OOM 风险

对ThreadPoolExecutor的原理分析

首先,ThreadPoolExecutor 的核心目标是:

  1. 复用线程:避免为每个任务都创建新线程的巨大开销。
  2. 控制资源:通过限制线程数量和队列大小,防止系统因资源耗尽(如内存溢出 OOM)而崩溃。
  3. 提供灵活的策略:允许用户自定义线程工厂、任务队列、拒绝策略等,以适应不同场景。

其工作模式可以概括为一个三层漏斗模型

  1. 核心线程池 (corePoolSize):优先使用这部分常驻线程处理任务。
  2. 任务队列 (workQueue):当核心线程忙时,新任务进入队列等待。
  3. 最大线程池 (maximumPoolSize):当队列也满了,才会创建额外的非核心线程来处理任务。

如果连最大线程数都达到了,且队列已满,则触发拒绝策略

核心数据结构与字段

ThreadPoolExecutor有一个原子状态控制 ctl,这是整个线程池最精妙的设计之一。它用一个 AtomicInteger 同时存储了两个关键信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29位用于计数
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0x1FFFFFFF

// 高3位表示运行状态 (runState)
private static final int RUNNING = -1 << COUNT_BITS; // 111...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000...
private static final int STOP = 1 << COUNT_BITS; // 001...
private static final int TIDYING = 2 << COUNT_BITS; // 010...
private static final int TERMINATED = 3 << COUNT_BITS; // 011...

// 打包/解包方法
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • wc:是workerCount,低29位,表示当前活跃的工作线程数
  • rs:是runState,高3位,表示线程池的生命周期状态
1
2
3
4
5
6
7
8
9
10
11
RUNNING -> SHUTDOWN (调用 shutdown())
|
V
(SHUTDOWN or RUNNING) -> STOP (调用 shutdownNow())
|
V
SHUTDOWN -> TIDYING (队列和线程池都空了)
STOP -> TIDYING (线程池空了)
|
V
TIDYING -> TERMINATED (terminated() 钩子执行完毕)

这种设计的好处是,对状态和线程数的原子操作可以在一次 CAS 中完成,极大地提高了并发性能。

对于工作队列workQueue,这就是我们之前讨论的任务缓冲队列。所有提交但尚未被线程处理的任务都存放在这里。

image-20260214092805730
  • 可看到这个是个阻塞队列

对于工作者集合 workers

1
2
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
  • workers:一个 HashSet,存储了所有活着的工作线程(封装在 Worker 对象中)。
  • mainLock:一个全局重入锁,保护对 workers 集合、largestPoolSizecompletedTaskCount 等共享变量的访问。虽然牺牲了一点并发度,但简化了实现,并能有效防止中断风暴。

其他关键的配置参数均为 volatile,保证了多线程下的可见性,使得动态调整(如 setCorePoolSize)能够生效

1
2
3
4
5
6
private volatile ThreadFactory threadFactory;        // 创建新线程的工厂
private volatile RejectedExecutionHandler handler; // 拒绝策略处理器
private volatile long keepAliveTime; // 空闲线程存活时间
private volatile boolean allowCoreThreadTimeOut; // 核心线程是否也能超时
private volatile int corePoolSize; // 核心线程数
private volatile int maximumPoolSize; // 最大线程数

核心内部类Worker

WorkerThreadPoolExecutor 的灵魂内部类。它既是工作单元,也是一个互斥锁

对于核心变量,如下

image-20260214093101188

其中,对于构造方法

image-20260214093155050
  • 其中的 run() 方法委托给外部方法执行主循环runWorker

    image-20260214093408107

对于 worker 如何原子性的执行任务,有两个核心的 AQS 方法

image-20260214093236257

为什么需要 AQS 锁?

  • 在任务执行期间 (task.run()),需要锁定这个 Worker
  • 这样,在调用 shutdown()时,interruptIdleWorkers()方法可以通过 w.tryLock()来判断线程是否正在执行任务。
    • 如果能获取到锁,说明线程是空闲的,可以安全中断。
    • 如果不能获取到锁,说明线程正在忙碌,则不中断(除非是 shutdownNow())。
  • 这个设计巧妙地实现了只中断空闲线程的语义。

任务提交与执行流程:execute(Runnable command)

这是线程池的入口方法,逻辑非常清晰,分三步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();

// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前工作线程数量为0,新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
// 传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
  • addWorker 方法是创建新线程的核心,它会进行复杂的 CAS 操作来更新 workerCount,并确保在正确的线程池状态下创建线程。

工作者主循环:runWorker(Worker w)

这是每个工作线程的执行能力来源,是一个无限循环,不断从队列中获取任务并执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 先处理构造时传入的第一个任务
w.firstTask = null;
w.unlock(); // 允许中断

boolean completedAbruptly = true; // 标记是否异常退出
try {
// 循环:要么有 firstTask,要么从 getTask() 获取新任务
while (task != null || (task = getTask()) != null) {
w.lock(); // 执行任务前加锁,标记为忙碌

// 处理中断:如果线程池已停止,确保线程被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();

try {
beforeExecute(wt, task); // 钩子方法
try {
task.run(); // 执行任务!
afterExecute(task, null); // 钩子方法
} catch (Throwable ex) {
afterExecute(task, ex); // 钩子方法
throw ex; // 抛出异常,导致线程退出
}
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 任务执行完毕,解锁
}
}
completedAbruptly = false; // 正常退出循环(getTask返回null)
} finally {
// 清理工作:移除worker,统计已完成任务数,并尝试终止线程池
processWorkerExit(w, completedAbruptly);
}
}
  • 对于getTask() 的作用
    • 它是线程阻塞等待任务的地方。
    • 如果线程是非核心线程或者允许核心线程超时,并且已经空闲超过了 keepAliveTime,那么 getTask() 会返回 null,从而导致 runWorker 退出循环,线程自然死亡。

关闭线程池

首先,我们必须牢记线程池的五种状态及其流转关系,因为所有的关闭操作都是围绕状态变更展开的:

  1. RUNNING (-1): 接受新任务,处理队列任务。
  2. SHUTDOWN (0): 不接受新任务,但继续处理队列中的任务。 (优雅关闭的起点)
  3. STOP (1): 不接受新任务,不处理队列任务,中断所有工作线程。 (强制关闭的起点)
  4. TIDYING (2): 所有任务都已终止,工作线程数为0。准备执行 terminated() 钩子。
  5. TERMINATED (3): terminated() 钩子执行完毕。线程池完全终结。

对于shutdown()方法,这是优雅关闭的标准入口。它的目标是平滑过渡,确保所有已提交的任务都能被执行。

  • 将状态设为 SHUTDOWN
  • 不接受新任务,但会继续处理队列中已有的任务
  • 只中断空闲的工作线程,让正在工作的线程完成手头任务。
image-20260214093827692
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1. 安全检查:确保调用者有权限关闭线程池
checkShutdownAccess();

// 2. 线程池状态从 RUNNING 变更为 SHUTDOWN
advanceRunState(SHUTDOWN);

// 3. 中断所有空闲的工作线程
interruptIdleWorkers();

// 4. 钩子方法,供子类(如 ScheduledThreadPoolExecutor)扩展
onShutdown();
} finally {
mainLock.unlock();
}

// 5. 尝试进入终止流程(如果条件满足)
tryTerminate();
}
  • 其中,对于 interruptIdleWorkers方法,最后会执行到如下方法,它会只中断空闲的工作线程,让正在工作的线程完成手头任务。

    image-20260214093953184

    二编

    interruptIdleWorkers()shutdown() 不会中断正在执行任务的线程的关键,它会遍历 workers 集合中的每一个 Worker。对每个 Worker,它会尝试调用 w.tryLock()

    • 如果 tryLock() 成功:说明该 Worker 当前没有在执行任务(处于 workQueue.take()workQueue.poll() 的阻塞等待状态),是一个空闲线程。此时可以安全地调用 t.interrupt() 来中断它,使其从阻塞状态醒来。
    • 如果 tryLock() 失败:说明该 Worker 正在 runWorker 的主循环中执行 task.run(),此时不会中断它,让它把当前任务执行完。
  • 对于advanceRunState(SHUTDOWN):这是一个 CAS 循环,确保只有在当前状态是 RUNNING 时,才能将其安全地更新为 SHUTDOWN。一旦状态变为 SHUTDOWN,后续任何调用 execute() 的尝试都会因为 isRunning(c) 返回 false 而直接进入拒绝策略。

  • 对于tryTerminate():这个方法会检查当前状态是否已经是 SHUTDOWNSTOP,并且 workers 集合为空、workQueue 也为空。如果满足条件,它会将状态推进到 TIDYING,并最终调用 terminated() 钩子,完成整个生命周期。

    image-20260214094854260

对于shutdownNow(),这是强制关闭的方法,用于需要立即释放资源的紧急情况。它会牺牲任务的完整性来换取资源释放速度。

  • 将状态设为 STOP
  • 不接受新任务,也不处理队列中的任务
  • 中断所有工作线程(无论是否空闲)。
  • 返回一个包含未执行任务的列表。
image-20260214094231795
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();

// 1. 将线程池状态直接变更为 STOP
advanceRunState(STOP);

// 2. 中断所有工作线程(无论是否空闲!)
interruptWorkers();

// 3. 清空工作队列,并返回所有未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
  • 其中,interruptWorkers();interruptIdleWorkers() 不同,这个方法无差别地遍历所有 Worker 并调用 interrupt()

    image-20260214094309336

    对每个线程都中断,无论线程是在空闲等待还是在执行任务,都会收到中断信号,但是!中断是否真的能停止一个任务,完全取决于任务代码本身是否能够响应中断。如果任务是一个死循环且不检查中断状态,那么 shutdownNow() 对它也是无效的 。

对于 awaitTermination(long timeout, TimeUnit unit),无论是 shutdown() 还是 shutdownNow(),它们都只是发起关闭请求,并立即返回,并不会等待线程池真正关闭。awaitTermination() 就是用来同步等待关闭完成的。它通过条件变量等待,为调用者提供了同步感知线程池最终状态的能力。

  • 阻塞当前线程,直到线程池完全终止TERMINATED 状态)或超时。
  • 通常在 shutdown()shutdownNow() 之后调用,以实现优雅停机。
image-20260214095139123
  • 对于termination,这是一个 Condition 对象
  • 触发 signalAll() 的地方就在 tryTerminate() 方法的最后。当线程池状态成功变为 TERMINATED 时,会调用 termination.signalAll(),唤醒所有在 awaitTermination() 中等待的线程。

它提供了一种机制,让主线程可以阻塞,直到后台的线程池真正完成了所有清理工作。这对于应用的优雅停机至关重要。

单独使用任何一个方法都无法实现完美的优雅关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. 首先发起有序关闭
threadPool.shutdown();

try {
// 2. 等待一段时间(例如60秒),让存量任务有机会完成
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
// 3. 如果超时了,说明还有顽固任务没完成,此时进行强制干预
threadPool.shutdownNow();

// 4. 再次等待一小段时间,看强制关闭是否生效
if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("线程池无法完全关闭!");
}
}
} catch (InterruptedException e) {
// 5. 如果等待过程被中断,也要进行兜底处理
threadPool.shutdownNow();
// 重新设置中断状态
Thread.currentThread().interrupt();
}

一般情况下,先给任务一个体面完成的机会,但是不能无限期等待,必须有超时,超时后,不惜代价强制清理,防止应用无法退出。

ScheduledThreadPoolExecutor 类

线程池原理分析

https://javaguide.cn/java/concurrent/java-thread-pool-summary.html#%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-%E9%87%8D%E8%A6%81

我们上面讲解了 Executor框架以及 ThreadPoolExecutor 类的原理,包括各种线程池的使用内容,下面让我们实战一下,来通过写一个 ThreadPoolExecutor 的小 Demo 来回顾相关的原理内容

线程池示例代码

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Date;

/**
* 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {

private String command;

public MyRunnable(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return this.command;
}
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

可以看到我们上面的代码指定了:

  • corePoolSize: 核心线程数为 5。
  • maximumPoolSize:最大线程数 10
  • keepAliveTime : 等待时间为 1L。
  • unit: 等待时间的单位为 TimeUnit.SECONDS。
  • workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  • handler:拒绝策略为 CallerRunsPolicy

来看输出

image-20260214100043132

我们通过前面的代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。

详细的说说发生了什么?

1
2
3
4
5
pool-1-thread-3 Start. Time = Sat Feb 14 09:57:02 CST 2026
pool-1-thread-5 Start. Time = Sat Feb 14 09:57:02 CST 2026
pool-1-thread-2 Start. Time = Sat Feb 14 09:57:02 CST 2026
pool-1-thread-4 Start. Time = Sat Feb 14 09:57:02 CST 2026
pool-1-thread-1 Start. Time = Sat Feb 14 09:57:02 CST 2026
  • 当第一个任务被提交时,线程池发现当前没有线程(workerCount=0),小于 corePoolSize(5),于是立即创建一个新线程来执行它。
  • 同样的逻辑,对于前 5 个任务,线程池都因为 workerCount < corePoolSize 而直接创建了新的核心线程。
  • 结果:5个核心线程 (thread-1thread-5) 在同一时刻(09:57:02)开始执行前5个任务。

为什么是5个? 因为 corePoolSize 就是5,这是线程池的“常驻部队”。

对于后续任务入队与复用(5-10秒)

1
2
3
4
5
6
7
8
// ... (5秒后,第一批任务结束)
pool-1-thread-4 End. Time = Sat Feb 14 09:57:07 CST 2026
pool-1-thread-5 End. Time = Sat Feb 14 09:57:07 CST 2026
...
// 紧接着,同一线程立刻开始执行新任务!
pool-1-thread-4 Start. Time = Sat Feb 14 09:57:07 CST 2026
pool-1-thread-5 Start. Time = Sat Feb 14 09:57:07 CST 2026
...
  • 在前5个任务执行的这5秒钟里,主程序已经把剩下的 5个任务(第6到第10个) 全部提交给了线程池。
  • 但是,此时5个核心线程都在忙碌中 (workerCount == corePoolSize)。
  • 根据 ThreadPoolExecutor 的规则,当 workerCount >= corePoolSize 时,新任务不会立即创建新线程,而是尝试加入 workQueue
  • 由于你的队列 (ArrayBlockingQueue) 容量是100,远大于待提交的5个任务,所以这5个任务全部成功进入了队列等待
  • 5秒后,第一批5个任务执行完毕。每个工作线程在 runWorker 主循环中会再次调用 getTask() 从队列中获取新任务。
  • 因为队列里正好有5个等待的任务,所以这5个刚空闲下来的线程立刻各自领取了一个新任务,并马上开始执行(09:57:07)。

这里没有创建任何新的非核心线程!因为队列 (workQueue) 足够大,能够容纳所有超额任务。只有当队列满了之后,才会去创建超过 corePoolSize 的线程。

然后,任务全部完成,线程池关闭

1
2
3
4
5
// ... (又过了5秒,第二批任务结束)
pool-1-thread-3 End. Time = Sat Feb 14 09:57:12 CST 2026
...
pool-1-thread-1 End. Time = Sat Feb 14 09:57:12 CST 2026
Finished all threads
  • 第二批5个任务在5秒后(09:57:12)全部执行完毕。
  • 所有工作线程再次调用 getTask(),但此时队列已空。
  • 由于 allowCoreThreadTimeOut 默认是 false核心线程即使空闲也不会被销毁。它们会一直阻塞在 workQueue.take() 上,等待新任务。
  • 但是,主程序紧接着调用了 executor.shutdown()
    • 这将线程池状态置为 SHUTDOWN
    • shutdown() 方法会中断所有空闲的工作线程
    • 这些阻塞在 take() 上的核心线程收到中断信号后,会从 getTask() 返回 null,从而退出 runWorker 主循环。
    • 线程自然死亡,workers 集合变为空。
  • 主线程通过 while (!executor.isTerminated()) {} 轮询等待,直到所有线程都退出,最后打印 "Finished all threads"

但是,如果你把 QUEUE_CAPACITY 改成 1,再运行这个程序,会发生什么?

  • 前5个任务依然由5个核心线程执行。
  • 第6个任务提交时,核心线程忙,队列空(容量1),所以第6个任务入队成功。
  • 第7个任务提交时,核心线程忙,队列已满(1个任务在里面)。此时,线程池会检查 workerCount(5) < MAX_POOL_SIZE(10),于是创建第6个工作线程来执行第7个任务。
  • 后续的第8、9、10个任务也会因为同样的原因,分别创建第7、8、9、10个工作线程来执行。
  • 最终你会看到最多 10 个线程 同时在工作!

线程池工作流程的详细分析

线程池的任务执行

我们需要首先分析一下 execute方法。 在示例代码中,我们使用 executor.execute(worker)来提交一个任务到线程池中去。

image-20260214100155253

这个方法非常重要。可以去上面看看源码,反正总之就是

  • 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  • 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
  • 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
  • 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。
图解线程池实现原理

execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* 添加一个新的工作线程(Worker)到线程池中。
*
* @param firstTask 新线程启动后要执行的第一个任务(可为 null)
* @param core true 表示受 corePoolSize 限制;false 表示受 maximumPoolSize 限制
* @return 成功添加并启动线程返回 true,否则返回 false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层循环:检查线程池状态是否允许添加新线程
for (int c = ctl.get(); ; ) {
// 如果线程池已关闭(SHUTDOWN/STOP等),且满足以下任一条件,则拒绝创建:
// - 已进入 STOP 状态(不再处理任何任务)
// - 要执行一个新任务(firstTask != null),但线程池已 SHUTDOWN(SHUTDOWN 不接受新任务)
// - 队列为空(说明没有遗留任务需要处理)
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

// 内层循环:尝试通过 CAS 增加工作线程计数(workerCount)
for (;;) {
// 检查当前线程数是否已达到上限(core ? corePoolSize : maximumPoolSize)
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;

// 尝试原子性地将 workerCount +1
if (compareAndIncrementWorkerCount(c))
break retry; // 成功则跳出双重循环,准备创建线程

// CAS 失败,重新读取 ctl(可能被其他线程修改了)
c = ctl.get();
// 如果在此期间线程池状态变为 SHUTDOWN 或更高,回到外层 retry
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 否则只是 workerCount 变化,继续内层循环重试 CAS
}
}

// ========== 以下为真正创建和启动线程的阶段 ==========

boolean workerStarted = false; // 标记线程是否成功启动
boolean workerAdded = false; // 标记 Worker 是否成功加入 workers 集合
Worker w = null;

try {
// 创建 Worker 对象(内部包含一个 Thread,该 Thread 的 target 是 Worker 自身)
w = new Worker(firstTask);
final Thread t = w.thread;

if (t != null) {
// 加全局锁,保护 workers 集合等共享资源
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次检查线程池状态(防止在创建 Worker 期间被 shutdown)
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 确保线程是 NEW 状态(未启动过)
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将新 Worker 加入 workers 集合
workers.add(w);
workerAdded = true;
// 更新历史最大线程数(用于监控)
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}

// 如果成功加入集合,则启动线程
if (workerAdded) {
container.start(t); // 实际就是 t.start()
workerStarted = true;
}
}
} finally {
// 如果线程启动失败(例如在 start() 中抛异常),进行回滚清理
if (!workerStarted)
addWorkerFailed(w);
}

return workerStarted;
}

更多关于线程池源码分析的内容推荐这篇文章:硬核干货:4W 字从源码上分析 JUC 线程池 ThreadPoolExecutor 的实现原理

execute() vs submit()

execute()submit()是两种提交任务到线程池的方法,有一些区别:

  • 返回值execute() 方法用于提交不需要返回值的任务。通常用于执行 Runnable 任务,无法判断任务是否被线程池成功执行。submit() 方法用于提交需要返回值的任务。可以提交 RunnableCallable 任务。submit() 方法返回一个 Future 对象,通过这个 Future 对象可以判断任务是否执行成功,并获取任务的返回值(get()方法会阻塞当前线程直到任务完成, get(long timeout,TimeUnit unit)多了一个超时时间,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException)。
  • 异常处理:在使用 submit() 方法时,它的返回值有可以异步获取结果和取消任务等Future接口具有的能力,可以通过 Future 对象处理任务执行过程中抛出的异常;而在使用 execute() 方法时,异常处理需要通过自定义的 ThreadFactory (在线程工厂创建线程的时候设置UncaughtExceptionHandler对象来 处理异常)或 ThreadPoolExecutorafterExecute() 方法来处理

execute()源码上面有,来简单看看submit()的,submit()的源码在AbstractExecutorService这个线程池的抽象模板中,然后接口定义在ExecuteService中,有三种重载方式

1
2
3
4
5
6
7
8
// 1. 提交一个 Runnable,返回一个 Future,其 get() 方法返回 null
Future<?> submit(Runnable task);

// 2. 提交一个 Runnable,并指定一个“假”的返回值
<T> Future<T> submit(Runnable task, T result);

// 3. 提交一个 Callable,可以获取真实的计算结果
<T> Future<T> submit(Callable<T> task);

随便看一种

image-20260214101256273
  • 它将Runnable的任务包装成 FutureTask 后,调用 execute(),当然,它也能接受RunnableCallable<T>

  • newTaskFor() 做了什么?

    1
    2
    3
    4
    5
    6
    7
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

    返回了 FutureTask,传入的Runnable被包装,有 RunnableFuture的双重能力

反正,所有 submit() 的调用,最终都会转化为对 execute() 的调用。差别仅仅在于,submit() 在调用 execute() 之前,先用 FutureTask 把你的原始任务(Runnable/Callable包装了一下。

它这个 ececute方法就交给对应的线程池对Exceutor这个接口中方法的实现了

image-20260214101318600

可以看到execute是直接接受并且执行 Runnable的,这是线程池最原始、最底层的任务提交方式,只接受 Runnable,因为 Runnablerun() 方法没有返回值

image-20260214101142122

isTerminated() VS isShutdown()

简单来说,

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

isTerminated是如何判断所有提交的任务完成后返回为 true 的

首先,isTerminated会进行这样的方法调用

image-20260214102652340

其中,runStateAtLeast(c, s) 是一个工具方法,用于判断当前状态是否 大于等于 指定状态 s

所以说,TERMINATED 是 3, 满足 >= SHUTDOWN,判断是TERMINATED的状态在SHUTDOWN之上

image-20260214102620598

那么同理了

image-20260214102732001

几种常见的内置线程池

FixedThreadPool

介绍

FixedThreadPool 被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:

1
2
3
4
5
6
7
8
9
/**
* 创建一个可重用固定数量线程的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

另外还有一个 FixedThreadPool 的实现方法,和上面的类似,所以这里不多做阐述:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

从上面源代码可以看出新创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为FixedThreadPool 使用的是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列),队列永远不会被放满

执行任务过程介绍

FixedThreadPoolexecute() 方法运行示意图

FixedThreadPool的execute(../../../../PersonalLearn/JavaLearn/重说JavaSE基础/多线程/详解JavaSE之并发编程 part8—详解线程池背后的原理/FixedThreadPool-C0OTNJpn.png)方法运行示意图

上图说明:

  1. 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
  2. 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue
  3. 线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行

为什么不推荐使用FixedThreadPool

FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

SingleThreadExecutor

介绍

SingleThreadExecutor 是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:

1
2
3
4
5
6
7
8
9
10
/**
*返回只有一个线程的线程池
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

从上面源代码可以看出新创建的 SingleThreadExecutorcorePoolSizemaximumPoolSize 都被设置为 1,其他参数和 FixedThreadPool 相同

执行任务过程介绍

SingleThreadExecutor 的运行示意图(该图片来源:《Java 并发编程的艺术》)

SingleThreadExecutor的运行示意图

上图说明 :

  1. 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
  2. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
  3. 线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行;

为什么不推荐使用SingleThreadExecutor

SingleThreadExecutorFixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点,就是可能会导致 OOM

CachedThreadPool

介绍

CachedThreadPool 是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool 的实现:

1
2
3
4
5
6
7
8
9
/**
* 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CachedThreadPoolcorePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX_VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源

执行任务过程介绍

CachedThreadPoolexecute() 方法的执行示意图

CachedThreadPool的execute(../../../../PersonalLearn/JavaLearn/重说JavaSE基础/多线程/详解JavaSE之并发编程 part8—详解线程池背后的原理/CachedThreadPool-execute-CmSVV1Ww.png)方法的执行示意图

上图说明:

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

为什么不推荐使用CachedThreadPool

CachedThreadPool 使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM

ScheduledThreadPool

介绍

ScheduledThreadPool 用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下即可。

1
2
3
4
5
6
7
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledThreadPool 是通过 ScheduledThreadPoolExecutor 创建的,使用的DelayedWorkQueue(延迟阻塞队列)作为线程池的任务队列。

DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,所以创建 ScheduledThreadExecutor 本质也是创建一个 ThreadPoolExecutor 线程池,只是传入的参数不相同。

1
2
3
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService

ScheduledThreadPoolExecutor 和 Timer 对比

  • Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
  • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
  • TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。