ThreadPoolExecutor 类
前面我们提到了,线程池实现类 ThreadPoolExecutor 是
Executor
框架最核心的类,对于Executor接口和ExecutorService接口我们都已经分析过了,在这里只贴一下接口内容方便回顾,详细请看上篇
1 | public interface ExecutorService extends Executor, AutoCloseable { |
ThreadPoolExecutor参数分析
ThreadPoolExecutor 类中提供了的四个构造方法。
来看最长的构造方法,他能指定ThreadPoolExecutor
中所有可以自定义的参数
corePoolSize:任务队列未达到队列容量时,最大可以同时运行的线程数量。maximumPoolSize: 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor其他常见参数 :
keepAliveTime:线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime才会被回收销毁。unit:keepAliveTime参数的时间单位。threadFactory:executor创建新线程的时候会用到的线程工厂handler:拒绝策略
关于handler
拒绝策略:如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝新任务的处理。这个是默认策略
ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝了的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能,因为可能影响主线程。如果应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求
Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过
ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定
RejectedExecutionHandler
拒绝策略来配置线程池的时候,默认使用的是
AbortPolicy。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor
将抛出 RejectedExecutionException 异常来拒绝新来的任务
,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicy。CallerRunsPolicy
和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务
ThreadPoolExecutor常用的阻塞队列
ThreadPoolExecutor中有一个参数就是设置workQueue,它用于设置线程池中的等待队列,那么它可以使用哪些队列来使用呢?
因为当所有核心线程都在忙,且线程数未达最大值时,新任务会先放入
workQueue
等待。所以说,有界队列可防止内存溢出,无界队列可能导致任务无限堆积。
不同业务和功能的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。
LinkedBlockingQueue:链表阻塞队列,默认无界,基于链表实现,FIFO,默认容量为Integer.MAX_VALUE,但是实际上它也可通过构造函数指定容量,变成有界队列。- 对于
Executors工厂类,FixedThreadPool和SingleThreadExector默认使用了这个队列。FixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
- 对于
ArrayBlockingQueue:数组阻塞队列,基于数组实现,FIFO,必须在构造时指定容量,它内部使用ReentrantLock保证线程安全,对于需要严格控制内存使用的系统用它是很好的,而且它能希望在队列满时触发扩容或拒绝策略SynchronousQueue:同步移交队列,容量为0,所以它不存储任何元素,每次put()必须等待另一个线程take(),反之亦然,本质上是一个直接传递通道,适用于短生命周期、高并发突发任务的情况,它是CachedThreadPool的默认队列。所以说,使用这个队列的线程池的行为如下,提交任务时,若无空闲线程,则立即尝试创建新线程,只要未达maximumPoolSize,它无法缓冲任务,压力直接传导到线程创建,所以说常配合大 maxPoolSize 使用。也就是说,CachedThreadPool的最大线程数是Integer.MAX_VALUE,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。DelayedWorkQueue:延迟阻塞队列,DelayedWorkQueue的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的,谁先能被执行谁在前面。DelayedWorkQueue添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。它通常配合ScheduledThreadPoolExecutor使用- 对于
Executors工厂类,ScheduledThreadPool和SingleThreadScheduledExecutor默认使用了这个队列
- 对于
PriorityBlockingQueue:优先级阻塞队列,无界,它按任务优先级排序,任务类必须实现Comparable<Runnable>或提供Comparator,任务有优先级差异,同优先级任务的执行顺序不保证 FIFO,因无界仍存在 OOM 风险
对ThreadPoolExecutor的原理分析
首先,ThreadPoolExecutor 的核心目标是:
- 复用线程:避免为每个任务都创建新线程的巨大开销。
- 控制资源:通过限制线程数量和队列大小,防止系统因资源耗尽(如内存溢出 OOM)而崩溃。
- 提供灵活的策略:允许用户自定义线程工厂、任务队列、拒绝策略等,以适应不同场景。
其工作模式可以概括为一个三层漏斗模型:
- 核心线程池
(
corePoolSize):优先使用这部分常驻线程处理任务。 - 任务队列
(
workQueue):当核心线程忙时,新任务进入队列等待。 - 最大线程池
(
maximumPoolSize):当队列也满了,才会创建额外的非核心线程来处理任务。
如果连最大线程数都达到了,且队列已满,则触发拒绝策略。
核心数据结构与字段
ThreadPoolExecutor有一个原子状态控制
ctl,这是整个线程池最精妙的设计之一。它用一个
AtomicInteger 同时存储了两个关键信息:
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
wc:是workerCount,低29位,表示当前活跃的工作线程数rs:是runState,高3位,表示线程池的生命周期状态。
1 | RUNNING -> SHUTDOWN (调用 shutdown()) |
这种设计的好处是,对状态和线程数的原子操作可以在一次 CAS 中完成,极大地提高了并发性能。
对于工作队列workQueue,这就是我们之前讨论的任务缓冲队列。所有提交但尚未被线程处理的任务都存放在这里。
- 可看到这个是个阻塞队列
对于工作者集合 workers
1 | private final ReentrantLock mainLock = new ReentrantLock(); |
workers:一个HashSet,存储了所有活着的工作线程(封装在Worker对象中)。mainLock:一个全局重入锁,保护对workers集合、largestPoolSize、completedTaskCount等共享变量的访问。虽然牺牲了一点并发度,但简化了实现,并能有效防止中断风暴。
其他关键的配置参数均为
volatile,保证了多线程下的可见性,使得动态调整(如
setCorePoolSize)能够生效
1 | private volatile ThreadFactory threadFactory; // 创建新线程的工厂 |
核心内部类Worker
Worker 是 ThreadPoolExecutor
的灵魂内部类。它既是工作单元,也是一个互斥锁。
对于核心变量,如下
其中,对于构造方法
其中的
run()方法委托给外部方法执行主循环runWorker
对于 worker 如何原子性的执行任务,有两个核心的 AQS 方法
为什么需要 AQS 锁?
- 在任务执行期间
(
task.run()),需要锁定这个Worker。 - 这样,在调用
shutdown()时,interruptIdleWorkers()方法可以通过w.tryLock()来判断线程是否正在执行任务。- 如果能获取到锁,说明线程是空闲的,可以安全中断。
- 如果不能获取到锁,说明线程正在忙碌,则不中断(除非是
shutdownNow())。
- 这个设计巧妙地实现了只中断空闲线程的语义。
任务提交与执行流程:execute(Runnable command)
这是线程池的入口方法,逻辑非常清晰,分三步
1 | // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) |
addWorker方法是创建新线程的核心,它会进行复杂的 CAS 操作来更新workerCount,并确保在正确的线程池状态下创建线程。
工作者主循环:runWorker(Worker w)
这是每个工作线程的执行能力来源,是一个无限循环,不断从队列中获取任务并执行。
1 | final void runWorker(Worker w) { |
- 对于
getTask()的作用:- 它是线程阻塞等待任务的地方。
- 如果线程是非核心线程或者允许核心线程超时,并且已经空闲超过了
keepAliveTime,那么getTask()会返回null,从而导致runWorker退出循环,线程自然死亡。
关闭线程池
首先,我们必须牢记线程池的五种状态及其流转关系,因为所有的关闭操作都是围绕状态变更展开的:
RUNNING(-1): 接受新任务,处理队列任务。SHUTDOWN(0): 不接受新任务,但继续处理队列中的任务。 (优雅关闭的起点)STOP(1): 不接受新任务,不处理队列任务,中断所有工作线程。 (强制关闭的起点)TIDYING(2): 所有任务都已终止,工作线程数为0。准备执行terminated()钩子。TERMINATED(3):terminated()钩子执行完毕。线程池完全终结。
对于shutdown()方法,这是优雅关闭的标准入口。它的目标是平滑过渡,确保所有已提交的任务都能被执行。
- 将状态设为
SHUTDOWN。 - 不接受新任务,但会继续处理队列中已有的任务。
- 只中断空闲的工作线程,让正在工作的线程完成手头任务。
1 | public void shutdown() { |
其中,对于
interruptIdleWorkers方法,最后会执行到如下方法,它会只中断空闲的工作线程,让正在工作的线程完成手头任务。
二编
interruptIdleWorkers()是shutdown()不会中断正在执行任务的线程的关键,它会遍历workers集合中的每一个Worker。对每个Worker,它会尝试调用w.tryLock()- 如果
tryLock()成功:说明该Worker当前没有在执行任务(处于workQueue.take()或workQueue.poll()的阻塞等待状态),是一个空闲线程。此时可以安全地调用t.interrupt()来中断它,使其从阻塞状态醒来。 - 如果
tryLock()失败:说明该Worker正在runWorker的主循环中执行task.run(),此时不会中断它,让它把当前任务执行完。
- 如果
对于
advanceRunState(SHUTDOWN):这是一个 CAS 循环,确保只有在当前状态是RUNNING时,才能将其安全地更新为SHUTDOWN。一旦状态变为SHUTDOWN,后续任何调用execute()的尝试都会因为isRunning(c)返回 false 而直接进入拒绝策略。对于
tryTerminate():这个方法会检查当前状态是否已经是SHUTDOWN或STOP,并且workers集合为空、workQueue也为空。如果满足条件,它会将状态推进到TIDYING,并最终调用terminated()钩子,完成整个生命周期。
对于shutdownNow(),这是强制关闭的方法,用于需要立即释放资源的紧急情况。它会牺牲任务的完整性来换取资源释放速度。
- 将状态设为
STOP。 - 不接受新任务,也不处理队列中的任务。
- 中断所有工作线程(无论是否空闲)。
- 返回一个包含未执行任务的列表。
1 | public List<Runnable> shutdownNow() { |
其中,
interruptWorkers();与interruptIdleWorkers()不同,这个方法无差别地遍历所有Worker并调用interrupt()
对每个线程都中断,无论线程是在空闲等待还是在执行任务,都会收到中断信号,但是!中断是否真的能停止一个任务,完全取决于任务代码本身是否能够响应中断。如果任务是一个死循环且不检查中断状态,那么
shutdownNow()对它也是无效的 。
对于
awaitTermination(long timeout, TimeUnit unit),无论是
shutdown() 还是
shutdownNow(),它们都只是发起关闭请求,并立即返回,并不会等待线程池真正关闭。awaitTermination()
就是用来同步等待关闭完成的。它通过条件变量等待,为调用者提供了同步感知线程池最终状态的能力。
- 阻塞当前线程,直到线程池完全终止(
TERMINATED状态)或超时。 - 通常在
shutdown()或shutdownNow()之后调用,以实现优雅停机。
- 对于
termination,这是一个Condition对象 - 触发
signalAll()的地方就在tryTerminate()方法的最后。当线程池状态成功变为TERMINATED时,会调用termination.signalAll(),唤醒所有在awaitTermination()中等待的线程。
它提供了一种机制,让主线程可以阻塞,直到后台的线程池真正完成了所有清理工作。这对于应用的优雅停机至关重要。
单独使用任何一个方法都无法实现完美的优雅关闭。
1 | // 1. 首先发起有序关闭 |
一般情况下,先给任务一个体面完成的机会,但是不能无限期等待,必须有超时,超时后,不惜代价强制清理,防止应用无法退出。
ScheduledThreadPoolExecutor 类
线程池原理分析
https://javaguide.cn/java/concurrent/java-thread-pool-summary.html#%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-%E9%87%8D%E8%A6%81
我们上面讲解了 Executor框架以及
ThreadPoolExecutor
类的原理,包括各种线程池的使用内容,下面让我们实战一下,来通过写一个
ThreadPoolExecutor 的小 Demo 来回顾相关的原理内容
线程池示例代码
首先创建一个 Runnable 接口的实现类(当然也可以是
Callable 接口)
1 | import java.util.Date; |
编写测试程序,我们这里以阿里巴巴推荐的使用
ThreadPoolExecutor
构造函数自定义参数的方式来创建线程池。
1 | import java.util.concurrent.ArrayBlockingQueue; |
可以看到我们上面的代码指定了:
corePoolSize: 核心线程数为 5。maximumPoolSize:最大线程数 10keepAliveTime: 等待时间为 1L。unit: 等待时间的单位为 TimeUnit.SECONDS。workQueue:任务队列为ArrayBlockingQueue,并且容量为 100;handler:拒绝策略为CallerRunsPolicy。
来看输出
我们通过前面的代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。
详细的说说发生了什么?
1 | pool-1-thread-3 Start. Time = Sat Feb 14 09:57:02 CST 2026 |
- 当第一个任务被提交时,线程池发现当前没有线程(
workerCount=0),小于corePoolSize(5),于是立即创建一个新线程来执行它。 - 同样的逻辑,对于前 5 个任务,线程池都因为
workerCount < corePoolSize而直接创建了新的核心线程。 - 结果:5个核心线程 (
thread-1到thread-5) 在同一时刻(09:57:02)开始执行前5个任务。
为什么是5个? 因为
corePoolSize就是5,这是线程池的“常驻部队”。
对于后续任务入队与复用(5-10秒)
1 | // ... (5秒后,第一批任务结束) |
- 在前5个任务执行的这5秒钟里,主程序已经把剩下的 5个任务(第6到第10个) 全部提交给了线程池。
- 但是,此时5个核心线程都在忙碌中
(
workerCount == corePoolSize)。 - 根据
ThreadPoolExecutor的规则,当workerCount >= corePoolSize时,新任务不会立即创建新线程,而是尝试加入workQueue。 - 由于你的队列 (
ArrayBlockingQueue) 容量是100,远大于待提交的5个任务,所以这5个任务全部成功进入了队列等待。 - 5秒后,第一批5个任务执行完毕。每个工作线程在
runWorker主循环中会再次调用getTask()从队列中获取新任务。 - 因为队列里正好有5个等待的任务,所以这5个刚空闲下来的线程立刻各自领取了一个新任务,并马上开始执行(09:57:07)。
这里没有创建任何新的非核心线程!因为队列 (
workQueue) 足够大,能够容纳所有超额任务。只有当队列满了之后,才会去创建超过corePoolSize的线程。
然后,任务全部完成,线程池关闭
1 | // ... (又过了5秒,第二批任务结束) |
- 第二批5个任务在5秒后(09:57:12)全部执行完毕。
- 所有工作线程再次调用
getTask(),但此时队列已空。 - 由于
allowCoreThreadTimeOut默认是false,核心线程即使空闲也不会被销毁。它们会一直阻塞在workQueue.take()上,等待新任务。 - 但是,主程序紧接着调用了
executor.shutdown()- 这将线程池状态置为
SHUTDOWN。 shutdown()方法会中断所有空闲的工作线程。- 这些阻塞在
take()上的核心线程收到中断信号后,会从getTask()返回null,从而退出runWorker主循环。 - 线程自然死亡,
workers集合变为空。
- 这将线程池状态置为
- 主线程通过
while (!executor.isTerminated()) {}轮询等待,直到所有线程都退出,最后打印"Finished all threads"。
但是,如果你把 QUEUE_CAPACITY 改成
1,再运行这个程序,会发生什么?
- 前5个任务依然由5个核心线程执行。
- 第6个任务提交时,核心线程忙,队列空(容量1),所以第6个任务入队成功。
- 第7个任务提交时,核心线程忙,队列已满(1个任务在里面)。此时,线程池会检查
workerCount(5) < MAX_POOL_SIZE(10),于是创建第6个工作线程来执行第7个任务。 - 后续的第8、9、10个任务也会因为同样的原因,分别创建第7、8、9、10个工作线程来执行。
- 最终你会看到最多 10 个线程 同时在工作!
线程池工作流程的详细分析
线程池的任务执行
我们需要首先分析一下 execute方法。
在示例代码中,我们使用
executor.execute(worker)来提交一个任务到线程池中去。
这个方法非常重要。可以去上面看看源码,反正总之就是
- 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
- 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
- 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
- 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用
RejectedExecutionHandler.rejectedExecution()方法。
在 execute 方法中,多次调用 addWorker
方法。addWorker 这个方法主要用来创建新的工作线程,如果返回
true 说明创建和启动工作线程成功,否则的话返回的就是 false。
1 | /** |
更多关于线程池源码分析的内容推荐这篇文章:硬核干货:4W 字从源码上分析 JUC 线程池 ThreadPoolExecutor 的实现原理。
execute() vs
submit()
execute() 和
submit()是两种提交任务到线程池的方法,有一些区别:
- 返回值:
execute()方法用于提交不需要返回值的任务。通常用于执行Runnable任务,无法判断任务是否被线程池成功执行。submit()方法用于提交需要返回值的任务。可以提交Runnable或Callable任务。submit()方法返回一个Future对象,通过这个Future对象可以判断任务是否执行成功,并获取任务的返回值(get()方法会阻塞当前线程直到任务完成,get(long timeout,TimeUnit unit)多了一个超时时间,如果在timeout时间内任务还没有执行完,就会抛出java.util.concurrent.TimeoutException)。 - 异常处理:在使用
submit()方法时,它的返回值有可以异步获取结果和取消任务等Future接口具有的能力,可以通过Future对象处理任务执行过程中抛出的异常;而在使用execute()方法时,异常处理需要通过自定义的ThreadFactory(在线程工厂创建线程的时候设置UncaughtExceptionHandler对象来 处理异常)或ThreadPoolExecutor的afterExecute()方法来处理
execute()源码上面有,来简单看看submit()的,submit()的源码在AbstractExecutorService这个线程池的抽象模板中,然后接口定义在ExecuteService中,有三种重载方式
1 | // 1. 提交一个 Runnable,返回一个 Future,其 get() 方法返回 null |
随便看一种
它将
Runnable的任务包装成FutureTask后,调用execute(),当然,它也能接受Runnable或Callable<T>newTaskFor()做了什么?1
2
3
4
5
6
7protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}返回了
FutureTask,传入的Runnable被包装,有Runnable和Future的双重能力
反正,所有 submit() 的调用,最终都会转化为对
execute()
的调用。差别仅仅在于,submit() 在调用
execute() 之前,先用 FutureTask
把你的原始任务(Runnable/Callable)包装了一下。
它这个
ececute方法就交给对应的线程池对Exceutor这个接口中方法的实现了
可以看到execute是直接接受并且执行
Runnable的,这是线程池最原始、最底层的任务提交方式,只接受
Runnable,因为 Runnable 的 run()
方法没有返回值
isTerminated() VS
isShutdown()
简单来说,
isShutDown当调用shutdown()方法后返回为 true。isTerminated当调用shutdown()方法后,并且所有提交的任务完成后返回为 true
isTerminated是如何判断所有提交的任务完成后返回为 true 的首先,
isTerminated会进行这样的方法调用![]()
其中,
runStateAtLeast(c, s)是一个工具方法,用于判断当前状态是否 大于等于 指定状态s。所以说,
TERMINATED是 3, 满足 >=SHUTDOWN,判断是TERMINATED的状态在SHUTDOWN之上![]()
那么同理了
![]()
几种常见的内置线程池
FixedThreadPool
介绍
FixedThreadPool 被称为可重用固定线程数的线程池。通过
Executors 类中的相关源代码来看一下相关实现:
1 | /** |
另外还有一个 FixedThreadPool
的实现方法,和上面的类似,所以这里不多做阐述:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
从上面源代码可以看出新创建的 FixedThreadPool 的
corePoolSize 和 maximumPoolSize 都被设置为
nThreads,这个 nThreads
参数是我们使用的时候自己传递的。
即使 maximumPoolSize 的值比 corePoolSize
大,也至多只会创建 corePoolSize
个线程。这是因为FixedThreadPool 使用的是容量为
Integer.MAX_VALUE 的
LinkedBlockingQueue(无界队列),队列永远不会被放满
执行任务过程介绍
FixedThreadPool 的 execute()
方法运行示意图
上图说明:
- 如果当前运行的线程数小于
corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务; - 当前运行的线程数等于
corePoolSize后, 如果再来新任务的话,会将任务加入LinkedBlockingQueue; - 线程池中的线程执行完 手头的任务后,会在循环中反复从
LinkedBlockingQueue中获取任务来执行
为什么不推荐使用FixedThreadPool?
FixedThreadPool 使用无界队列
LinkedBlockingQueue(队列的容量为
Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:
- 当线程池中的线程数达到
corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize; - 由于使用无界队列时
maximumPoolSize将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建FixedThreadPool的源码可以看出创建的FixedThreadPool的corePoolSize和maximumPoolSize被设置为同一个值。 - 由于 1 和 2,使用无界队列时
keepAliveTime将是一个无效参数; - 运行中的
FixedThreadPool(未执行shutdown()或shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。
SingleThreadExecutor
介绍
SingleThreadExecutor
是只有一个线程的线程池。下面看看SingleThreadExecutor
的实现:
1 | /** |
1 | public static ExecutorService newSingleThreadExecutor() { |
从上面源代码可以看出新创建的 SingleThreadExecutor 的
corePoolSize 和 maximumPoolSize 都被设置为
1,其他参数和 FixedThreadPool 相同
执行任务过程介绍
SingleThreadExecutor 的运行示意图(该图片来源:《Java
并发编程的艺术》)
上图说明 :
- 如果当前运行的线程数少于
corePoolSize,则创建一个新的线程执行任务; - 当前线程池中有一个运行的线程后,将任务加入
LinkedBlockingQueue - 线程执行完当前的任务后,会在循环中反复从
LinkedBlockingQueue中获取任务来执行;
为什么不推荐使用SingleThreadExecutor?
SingleThreadExecutor 和 FixedThreadPool
一样,使用的都是容量为 Integer.MAX_VALUE 的
LinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor
使用无界队列作为线程池的工作队列会对线程池带来的影响与
FixedThreadPool 相同。说简单点,就是可能会导致 OOM
CachedThreadPool
介绍
CachedThreadPool
是一个会根据需要创建新线程的线程池。下面通过源码来看看
CachedThreadPool 的实现:
1 | /** |
1 | public static ExecutorService newCachedThreadPool() { |
CachedThreadPool 的corePoolSize
被设置为空(0),maximumPoolSize被设置为
Integer.MAX_VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于
maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源
执行任务过程介绍
CachedThreadPool 的 execute()
方法的执行示意图
上图说明:
- 首先执行
SynchronousQueue.offer(Runnable task)提交任务到任务队列。如果当前maximumPool中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2; - 当初始
maximumPool为空,或者maximumPool中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时CachedThreadPool会创建新线程执行任务,execute 方法执行完成;
为什么不推荐使用CachedThreadPool?
CachedThreadPool 使用的是同步队列
SynchronousQueue, 允许创建的线程数量为
Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM
ScheduledThreadPool
介绍
ScheduledThreadPool
用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下即可。
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
ScheduledThreadPool 是通过
ScheduledThreadPoolExecutor
创建的,使用的DelayedWorkQueue(延迟阻塞队列)作为线程池的任务队列。
DelayedWorkQueue
的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue
添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达
Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。
ScheduledThreadPoolExecutor 继承了
ThreadPoolExecutor,所以创建
ScheduledThreadExecutor 本质也是创建一个
ThreadPoolExecutor 线程池,只是传入的参数不相同。
1 | public class ScheduledThreadPoolExecutor |
ScheduledThreadPoolExecutor 和 Timer 对比
Timer对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;Timer只有一个执行线程,因此长时间运行的任务可以延迟其他任务。ScheduledThreadPoolExecutor可以配置任意数量的线程。 此外,如果你想(通过提供ThreadFactory),你可以完全控制创建的线程;- 在
TimerTask中抛出的运行时异常会杀死一个线程,从而导致Timer死机即计划任务将不再运行。ScheduledThreadExecutor不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。






