Java 线程池


12. 线程池 Thread Pool

12.1 线程池简介

线程池 Thread Pool 是一种基于池化思想管理线程的工具,经常出现在多线程服务器中。

  • 如数据库连接池、httpClient连接池。
  • Java 中的线程池是 ThreadPoolExecutor 类。

12.2 线程池相关概念

  • 核心线程 CorePool : 当有新任务提交时,如果核心线程都在工作,并且数量已经达到最大核心线程数,则不会新建核心线程,而是把任务放入等待队列;
  • 阻塞队列 workQueue : 等待队列是一个线程安全的阻塞队列。当核心线程都在忙时,阻塞队列用于存放新增的任务。核心线程完成当前任务会去等待队列中拉取新出的任务。
  • 非核心线程 : 当等待队列满时,若当前总线程数没有超过最大线程数,则会创建新的非核心线程。
    • 核心线程与非核心线程没有区别,只会在比较线程池中线程数目时,区分核心线程与非核心线程
  • 线程活动保持时间 keepAliveTime : 当一个线程空闲下来之后,其保持继续存活的时间。超过该时间则线程销毁。默认情况下,核心线程数量会一直保持,即使它空闲下来了;当设置 threadPoolExecutor.allowCoreThreadTimeOut(true) 时,则对核心线程也执行销毁。
  • 饱和策略 rejectedExecutionHandler : 当等待队列满,且总线程数达到最大线程数时,会执行饱和策略。默认饱和策略是抛弃新的任务请求。

线程池处理策略:

线程池运作流程图

ThreadPoolExecutor(int corePoolSize, // 核心线程数
                   int maximumPoolSize, // 最大线程数
                   long keepAliveTime, // 空闲线程的存活时间
                   TimeUnit unit, // 时间单位
                   BlockingQueue<Runnable> workQueue, // 阻塞队列
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler); // 饱和策略

12.3 为什么需要线程池

  1. 降低资源消耗:通过池化技术复用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控,避免线程被无限制地创建。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

12.4 Java 线程池 ThreadPoolExecutor

ThreadPoolExecutor 是 Java 线程池的核心实现类。

12.4.1 继承关系

线程池在内部实际上构建了一个生产者-消费者模型,利用阻塞队列,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分成两部分:任务管理线程管理

  • 任务管理部分充当生产者,当任务提交后,线程池会判断该任务后续的流转:
    1. 直接申请线程执行该任务;
    2. 缓冲到队列中等待线程执行;
    3. 拒绝该任务。
  • 线程管理部分是消费者,线程被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

12.4.2 线程池状态 及其转化

线程池的 5 种状态:

  1. RUNNING 运行状态 -1:
  2. SHUTDOWN 停工状态 0:不再接收新任务,已接收的任务会继续执行完
  3. STOP 停止状态 1:不再接收新任务,已接收的任务也会中断
  4. TIDYING 清空状态 2:所有任务都已停止,目前没有正在工作的线程,会触发 terminated() 方法
  5. TERMINATED 终止状态 3:线程池已被销毁,即调用完 terminated() 之后的状态

线程池生命周期状态转化:

12.4.3 线程池任务执行机制

线程池的任务调度 execute()

当用户提交了一个任务,所有任务的调度都是由 execute() 方法完成的。包括:检查现在线程池的运行状态、运行线程数、运行策略,并决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。具体的执行过程如下:

  1. 首先检测线程池运行状态 runState,如果不是 RUNNING,则直接拒绝,线程池要保证在 RUNNING 的状态下执行任务。
  2. 如果 workerCount < corePoolSize,即核心线程还没创建满,则创建并启动一个核心线程来执行新提交的任务。
  3. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个非核心线程来执行新提交的任务。
  5. 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

线程池的任务执行流程

线程池处理流程源码分析

任务缓冲与阻塞队列 BlockingQueue

线程池维护了一个 BlockingQueue<Runnable> 阻塞队列,用于存放等待执行的线程,队列中的所有线程都处于 Runnable 状态。队列的维护相当于一个 producer-consumer 模型,producer 把线程放入阻塞队列,consumer 从阻塞队列中拿出线程来执行。

  1. 当阻塞队列满时,生产者线程会等待队列可用;
  2. 当阻塞队列空时,消费者线程会等待队列变成非空。

一些阻塞队列

为什么用阻塞队列不用普通队列?

阻塞队列是为了实现生产者-消费者模型。当消费者从空的队列中取元素时,线程会被阻塞直到队列非空,然后消费者会被自动唤醒。

如果使用普通队列的话,需要我们自己去实现这样的同步和互斥机制,以及一些线程阻塞与唤醒的策略。

任务拒绝 RejectedExecutionHandler

当线程数达到了线程池的最大线程数,并且阻塞队列已满时,会执行拒绝策略。

  • 拒绝策略是通过 RejectedExecutionHandler 接口的实现类来实现的。

ThreadPoolExecutor 中的任务拒绝处理器:

private volatile RejectedExecutionHandler handler;

// 执行 reject() 来进行任务拒绝,根据具体的实现类来确定拒绝策略
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

// RejectedExecutionHandler 接口,它的实现类要实现 rejectedExecution() 方法
public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

RejectedExecutionHandler 接口有 4 个实现类,实现不同的拒绝策略:

12.4.4 线程池源码分析

重要属性
// 等待队列
private final BlockingQueue<Runnable> workQueue;
// 正在工作的线程集合,每个线程被包装成了 worker,存放对应的线程和将要执行的任务
// worker 创建时会利用 threadFactory 来创建新的线程
private final HashSet<Worker> workers = new HashSet<Worker>();

// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 线程空闲时的持续存活时间
private volatile long keepAliveTime;
// 执行线程饱和策略的拒绝处理器
private volatile RejectedExecutionHandler handler;
ctl 无锁并发线程控制

变量 ctl 通过 AtomicInteger 来实现无锁并发。组合了 runStateworkerCount

  • 3 位来标识线程池状态 runState
  • 29 位来标识线程池中的线程数 workCount

ctl 用一个变量同时处理两个值,避免了用锁去维护二者的一致性。

    // 用原子类来把 runState 和 workerCount 组合成一个 ctl 变量,并支持 CAS 原子操作
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 表示线程池线程数的 bit 数,默认为 ctl 的前 3 位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大的线程数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池的 5 种状态,对应 ctl 的高三位
    // 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 0000 0000 0000 0000 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 0010 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    // 0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 0110 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 在 ctl 中获取线程池的状态 runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 在 ctl 中获取线程的数量 workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 把 runState 和 workerCount 组装成 ctl,作为一个统一的变量
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    // 判断状态 c 是否小于 状态 s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 判断状态 c 是否不小于 状态 s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // 判断线程是否在运行
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

12.5 ExecutorService 线程池的实现类

Executors 提供了 4 种线程池

1. 单线程化线程池 newSingleThreadExecutor

Executors.newSingleThreadExecutor();

被提交的任务是串行执行的,只有唯一的一个线程存在,按照线程被提交到队列中的顺序来依次执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。

2. 可控最大并发数线程池 newFixedThreadPool

Executors.newFixedThreadPool(nThreads);

创建一个定长的线程池,可以控制最大并发数,超出的线程在队列中等待。

3. 可回收缓存线程池 newCachedThreadPool

Executors.newCachedThreadPool();

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

4. 支持定时与周期性任务的线程池 newScheduledThreadPool

Executors.newScheduledThreadPool(nThreads);

创建一个定长线程池,支持定时及周期性任务执行。

Reference

美团技术团队-Java线程池

redspider-线程池部分

线程池源码分析:

掘金-线程池源码分析

掘金-Java 线程池

segmentfault-线程池源码分析


文章作者: Yu Yang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Yu Yang !
 上一篇
4.线程间通信 4.线程间通信
4. 线程间通信4.1 锁与同步JAVA 中的锁都是对象锁,是基于对象的。 4.1.1 线程同步线程同步是指约束线程按照一定的顺序执行。 线程同步可以通过锁 synchronized 来实现: public class ObjectLock
下一篇 
Java 对象头 Java 对象头
Java 对象头1. Java 对象头的组成 Mark Word 指向类元信息的指针 Klass Pointer 数组的长度 1.1 Mark WordMark Word 在 32 位 JVM 中的长度是 32 bit,在 64 位 J
  目录