12. 线程池 Thread Pool
12.1 线程池简介
线程池 Thread Pool 是一种基于池化思想管理线程的工具,经常出现在多线程服务器中。
- 如数据库连接池、httpClient连接池。
- Java 中的线程池是
ThreadPoolExecutor
类。
12.2 线程池相关概念
- 核心线程 CorePool : 当有新任务提交时,如果核心线程都在工作,并且数量已经达到最大核心线程数,则不会新建核心线程,而是把任务放入等待队列;
- 阻塞队列 workQueue : 等待队列是一个线程安全的阻塞队列。当核心线程都在忙时,阻塞队列用于存放新增的任务。核心线程完成当前任务会去等待队列中拉取新出的任务。
- 非核心线程 : 当等待队列满时,若当前总线程数没有超过最大线程数,则会创建新的非核心线程。
- 核心线程与非核心线程没有区别,只会在比较线程池中线程数目时,区分核心线程与非核心线程
- 线程活动保持时间 keepAliveTime : 当一个线程空闲下来之后,其保持继续存活的时间。超过该时间则线程销毁。默认情况下,核心线程数量会一直保持,即使它空闲下来了;当设置
threadPoolExecutor.allowCoreThreadTimeOut(true)
时,则对核心线程也执行销毁。 - 饱和策略 rejectedExecutionHandler : 当等待队列满,且总线程数达到最大线程数时,会执行饱和策略。默认饱和策略是抛弃新的任务请求。
线程池处理策略:
ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程的存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory,
RejectedExecutionHandler handler); // 饱和策略
12.3 为什么需要线程池
- 降低资源消耗:通过池化技术复用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控,避免线程被无限制地创建。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池
ScheduledThreadPoolExecutor
,就允许任务延期执行或定期执行。
12.4 Java 线程池 ThreadPoolExecutor
ThreadPoolExecutor
是 Java 线程池的核心实现类。
12.4.1 继承关系
线程池在内部实际上构建了一个生产者-消费者模型,利用阻塞队列,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:任务管理、线程管理。
- 任务管理部分充当生产者,当任务提交后,线程池会判断该任务后续的流转:
- 直接申请线程执行该任务;
- 缓冲到队列中等待线程执行;
- 拒绝该任务。
- 线程管理部分是消费者,线程被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
12.4.2 线程池状态 及其转化
线程池的 5 种状态:
- RUNNING 运行状态 -1:
- SHUTDOWN 停工状态 0:不再接收新任务,已接收的任务会继续执行完
- STOP 停止状态 1:不再接收新任务,已接收的任务也会中断
- TIDYING 清空状态 2:所有任务都已停止,目前没有正在工作的线程,会触发 terminated() 方法
- TERMINATED 终止状态 3:线程池已被销毁,即调用完 terminated() 之后的状态
线程池生命周期状态转化:
12.4.3 线程池任务执行机制
线程池的任务调度 execute()
当用户提交了一个任务,所有任务的调度都是由 execute()
方法完成的。包括:检查现在线程池的运行状态、运行线程数、运行策略,并决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。具体的执行过程如下:
- 首先检测线程池运行状态
runState
,如果不是RUNNING
,则直接拒绝,线程池要保证在RUNNING
的状态下执行任务。 - 如果
workerCount < corePoolSize
,即核心线程还没创建满,则创建并启动一个核心线程来执行新提交的任务。 - 如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。 - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个非核心线程来执行新提交的任务。 - 如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务缓冲与阻塞队列 BlockingQueue
线程池维护了一个 BlockingQueue<Runnable>
阻塞队列,用于存放等待执行的线程,队列中的所有线程都处于 Runnable
状态。队列的维护相当于一个 producer-consumer 模型,producer 把线程放入阻塞队列,consumer 从阻塞队列中拿出线程来执行。
- 当阻塞队列满时,生产者线程会等待队列可用;
- 当阻塞队列空时,消费者线程会等待队列变成非空。
为什么用阻塞队列不用普通队列?
阻塞队列是为了实现生产者-消费者模型。当消费者从空的队列中取元素时,线程会被阻塞直到队列非空,然后消费者会被自动唤醒。
如果使用普通队列的话,需要我们自己去实现这样的同步和互斥机制,以及一些线程阻塞与唤醒的策略。
任务拒绝 RejectedExecutionHandler
当线程数达到了线程池的最大线程数,并且阻塞队列已满时,会执行拒绝策略。
- 拒绝策略是通过
RejectedExecutionHandler
接口的实现类来实现的。
ThreadPoolExecutor
中的任务拒绝处理器:
private volatile RejectedExecutionHandler handler;
// 执行 reject() 来进行任务拒绝,根据具体的实现类来确定拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
// RejectedExecutionHandler 接口,它的实现类要实现 rejectedExecution() 方法
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
RejectedExecutionHandler
接口有 4 个实现类,实现不同的拒绝策略:
12.4.4 线程池源码分析
重要属性
// 等待队列
private final BlockingQueue<Runnable> workQueue;
// 正在工作的线程集合,每个线程被包装成了 worker,存放对应的线程和将要执行的任务
// worker 创建时会利用 threadFactory 来创建新的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 线程空闲时的持续存活时间
private volatile long keepAliveTime;
// 执行线程饱和策略的拒绝处理器
private volatile RejectedExecutionHandler handler;
ctl 无锁并发线程控制
变量 ctl
通过 AtomicInteger
来实现无锁并发。组合了 runState
和 workerCount
。
- 高
3
位来标识线程池状态runState
- 低
29
位来标识线程池中的线程数workCount
ctl
用一个变量同时处理两个值,避免了用锁去维护二者的一致性。
// 用原子类来把 runState 和 workerCount 组合成一个 ctl 变量,并支持 CAS 原子操作
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示线程池线程数的 bit 数,默认为 ctl 的前 3 位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大的线程数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的 5 种状态,对应 ctl 的高三位
// 1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// 在 ctl 中获取线程池的状态 runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 在 ctl 中获取线程的数量 workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
// 把 runState 和 workerCount 组装成 ctl,作为一个统一的变量
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判断状态 c 是否小于 状态 s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 判断状态 c 是否不小于 状态 s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判断线程是否在运行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
12.5 ExecutorService 线程池的实现类
Executors 提供了 4 种线程池
1. 单线程化线程池 newSingleThreadExecutor
Executors.newSingleThreadExecutor();
被提交的任务是串行执行的,只有唯一的一个线程存在,按照线程被提交到队列中的顺序来依次执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
2. 可控最大并发数线程池 newFixedThreadPool
Executors.newFixedThreadPool(nThreads);
创建一个定长的线程池,可以控制最大并发数,超出的线程在队列中等待。
3. 可回收缓存线程池 newCachedThreadPool
Executors.newCachedThreadPool();
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
4. 支持定时与周期性任务的线程池 newScheduledThreadPool
Executors.newScheduledThreadPool(nThreads);
创建一个定长线程池,支持定时及周期性任务执行。