线程池
Categories:
什么是线程池
线程池(ThreadPool) 是一种基于池化思想管理线程的工具
线程池解决的问题是, 在任意时刻下, 确定系统应该投入多少资源, 解决多少任务
不将线程统一管理可能会出现以下问题
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
线程池内部维护了许多线程, 当有新任务时就会分配一个空闲线程执行 当所有线程都有任务时, 新的任务要么放到阻塞队列里面要么增加线程
使用线程池
用法:
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);
上述代码创建了大小为3的线程池, 并提交了5个任务
ExecutorService
该接口要求实现工厂方法, 返回如下类型的线程池
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池;
- ScheduledThreadPool: 定时执行线程池
- SingleThreadExecutor:仅单线程执行的线程池。
线程池原理解析
Java的线程池核心实现为 ThreadPoolExecutor
顶级接口 Excutor
提供了一种思想, 将任务提交与任务执行解耦
用户只需提供 Runnable
对像, 将任务提交到执行器中, 剩余的线程调配和任务执行由执行器完成
ExecutorService接口增加了一些能力: (1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。
线程池构造类 ThreadPoolExecutor
通过传入参数实现线程池的构造
如下是成员变量,
![[content/en/java/Basic/Concurrent/Pasted image 20250203123133.png]]
参数解释
corePoolSize
:线程池中用来工作的核心线程数量。maximumPoolSize
:最大线程数,线程池允许创建的最大线程数, 可以理解为非核心线程 + 核心线程数workQueue
:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。threadFactory
:线程池内部创建线程所用的工厂。handler
:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务
线程池使用前记得初始化
线程池的默认策略是构造完成后不创建线程, 也就是说刚创建好的线程池是没有线程的, 只有传入的 BlockingQueue
也就是[[阻塞队列]] 和其他参数
![[content/en/java/Basic/Concurrent/Pasted image 20250203123400.png]]
使用线程池之后
当调用任务执行方法 execute()
时启动执行流程
线程执行任务流程如下
- 核心线程数未满:直接创建新线程执行任务(不会进队列)。
- 核心线程数已满,队列未满:任务进入阻塞队列等待空闲线程执行。
- 核心线程数已满,队列已满:
- 如果当前线程池线程数未达到最大线程数,创建非核心线程执行任务。
- 否则,执行拒绝策略。
队列未满, 创建核心线程
![[content/en/java/Basic/Concurrent/Pasted image 20250203123742.png]] 队列已满, 创建非核心线程![[content/en/java/Basic/Concurrent/Pasted image 20250203124516.png]]
所以,就算队列中已经塞满了任务,新创建的线程还是会优先处理提交的任务,而不是从队列中获取已有的任务执行,从这可以看出,先提交的任务不一定先执行。
当上述流程无法完成时, 会使用拒绝策略抛出异常
JDK 自带的 RejectedExecutionHandler
实现有 4 种
- AbortPolicy:丢弃任务,抛出运行时异常
- CallerRunsPolicy:由提交任务的线程来执行任务
- DiscardPolicy:丢弃这个任务,但是不抛异常
- DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务
任务执行方法 excute()
源码如下
public void execute(Runnable command) {
// 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
if (command == null)
throw new NullPointerException();
// 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
int c = ctl.get();
// 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
if (workerCountOf(c) < corePoolSize) {
// 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
// addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
if (addWorker(command, true))
return;
// 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
c = ctl.get();
}
// 2. 尝试将任务添加到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查线程池的状态
if (! isRunning(recheck) && remove(command)) // 如果线程池已经停止,从队列中移除任务
reject(command);
// 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
else if (!addWorker(command, false))
// 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
reject(command);
}