网易一面:如何设计线程池?请手写一个简单线程池?
▌说在前面:
在40岁老架构师 尼恩的读者社区(50+)中,最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易的面试资格,遇到了几个很重要的面试题:
如何设计线程池?
与之类似的、其他小伙伴遇到过的问题还有:
请手写一个简单线程池?
线程池的知识,既是面试的核心知识,又是开发的核心知识。
所以,这里尼恩给大家做一下系统化、体系化的线程池梳理,使得大家可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”。
也一并把这个题目以及参考答案,收入咱们的 《尼恩Java面试宝典》V62版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。
注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请从公众号 【技术自由圈】获取。
▌为什么要使用线程池?
多线程编程是在开发过程中非常基础且非常重要的一个环节,基本上任何一家软件公司或者项目中都会使用多线程。主要有三个原因:
- 降低资源的消耗。降低线程创建和销毁的资源消耗。
- 提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间
- 提高线程的可管理性
总之:线程池是一种常用的并发编程工具,它可以帮助我们更好地管理和复用线程资源,提高程序的性能和稳定性。
线程池也是 3高架构的基础技术。
▌JAVA中的线程池组件:
在Java中,我们可以使用 java.util.concurrent 包中提供的ThreadPoolExecutor类来创建和使用线程池。
ThreadPoolExecutor 是非常高频,非常常用的组件。
对于 ThreadPoolExecutor 的底层原理和源码,大家要做到非常深入的掌握。大家一定要深入看ThreadPoolExecutor线程池源码,了解其执行过程。
另外,看懂线程池执行流程和源码设计有助于提升我们多线程编程技术和解决工作中遇到的问题。
▌手写线程池的重要性:
很多小伙伴给尼恩反馈, 说线程池的源码太难,看不懂。
怎么办呢?
大家可以先易后难。
可以手撸一个简单版的线程池加强一下对执行流程的理解。然后再深入源码去历险记。
或者说,如果我们想要更好地理解线程池的工作原理,那么自己动手实现一个简单的线程池是一个很好的选择。
接下来,我将带领大家一步一步地实现一个简单的线程池。
我们将从最基本的功能开始,逐步添加更多的功能和优化,最终实现一个完整的线程池。
▌线程池的实现原理:
线程池是一个典型的生产者-消费者模型。下图所示为线程池的实现原理:
- 调用方不断向线程池中提交任务; (生产者)
- 线程池中有一组线程,不断地从队列中取任务,(消费者)
- 线程池管理一个任务队列,对 异步任务进行缓冲 (缓冲区)
要实现一个线程池,有几个问题需要考虑:
- 队列设置多长?如果是无界的,调用方不断往队列中方任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
- 线程池中的线程个数是固定的,还是动态变化的?
- 每次提交新任务,是放入队列?还是开新线程
- 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
针对问题4,有3种做法:
- 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
- 不使用阻塞队列,但在队列外部,线程池内部实现了阻塞/唤醒机制
- 使用阻塞队列
很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来的资源消耗和延迟。
现在来带大家手写一个简单的线程池,让大家更加理解线程池的工作原理
▌手写一个简单线程池:
▌第一步:定义线程池接口
首先,我们需要定义一个线程池接口,用来表示线程池应该具备哪些功能。
一个简单的线程池应该至少具备以下几个功能:
- 添加任务并执行
- 关闭线程池
- 强制关闭线程池
因此,我们可以定义一个ThreadPool接口,它包含三个方法:execute、shutdown和shutdownNow。
import java.util.List;
// 线程池接口
public interface ThreadPool {
// 提交任务到线程池
void execute(Runnable task);
// 优雅关闭
void shutdown();
//立即关闭
List<Runnable> shutdownNow();
}
其中:
- execute方法用来添加任务并执行;
- shutdown方法用来关闭线程池,它会等待已经提交到线程池中的任务都执行完毕后再关闭;
- shutdownNow方法用来强制关闭线程池,它会立即停止所有正在执行或等待执行的任务,并返回未执行的任务列表。
▌第二步:实现线程的池化管理
接下来,我们需要实现一个简单的线程池类,它实现了ThreadPool接口,并提供了基本的功能。
为了简化代码,先实现一个v1版本,这是一个基础版本,一个简单的实现示例。
在v1版本中,我们先不考虑拒绝策略、自动调节线程资源等高级功能。
下面是一个简单的实现示例:
首先定义一个工作线程类:
// 定义一个工作线程类
public class WorkerThread extends Thread {
// 用于从任务队列中取出并执行任务
private BlockingQueue<Runnable> taskQueue;
// 构造方法,传入任务队列
public WorkerThread(BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
}
// 重写run方法
@Override
public void run() {
// 循环执行,直到线程被中断
while (!Thread.currentThread().isInterrupted() && !taskQueue.isEmpty()) {
try {
// 从任务队列中取出一个任务,如果队列为空,则阻塞等待
Runnable task = taskQueue.take();
// 执行任务
task.run();
} catch (Exception e) {
e.printStackTrace();
// 如果线程被中断,则退出循环
break;
}
}
}
}
然后, 基于一个线程池接口,实现一个简单的线程池,
// 简单的线程池实现
public class SimpleThreadPool implements ThreadPool {
// 线程池初始化时的线程数量
private int initialSize;
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 用于存放和管理工作线程的集合
private List<WorkerThread> threads;
// 是否已经被shutdown标志
private volatile boolean isShutdown = false;
public SimpleThreadPool(int initialSize) {
this.initialSize = initialSize;
taskQueue = new LinkedBlockingQueue<>();
threads = new ArrayList<>(initialSize);
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread(taskQueue);
workerThread.start();
threads.add(workerThread);
}
}
// 实现execute方法,用于将任务加入到任务队列,并通知工作线程来执行
@Override
public void execute(Runnable task) {
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown");
}
taskQueue.offer(task);
}
// 关闭线程池, 等待所有线程执行完毕
@Override
public void shutdown() {
// 修改状态
isShutdown = true;
for (WorkerThread thread : threads) {
// 中断线程
thread.interrupt();
}
}
@Override
public List<Runnable> shutdownNow() {
// 修改状态
isShutdown = true;
// 清空队列
List<Runnable> remainingTasks = new ArrayList<>();
taskQueue.drainTo(remainingTasks);
// 中断所有线程
for (WorkerThread thread : threads) {
thread.interrupt();
}
// 返回未执行任务集合
return remainingTasks;
}
}
这个版本的线程池实现了基本的添加任务并执行、关闭线程池和强制关闭线程池等功能。
它在构造方法中接收一个初始化线程池大小参数,用于初始化任务队列和工作线程集合,并创建一定数量的工作线程。
▌第三步:自定义线程池的基本参数
在上一步中,我们实现了一个简单的线程池,它具备了基本的功能。
但是,它存在一个问题:任务队列没有指定容量大小,是个无界队列,其次只指定了初始的线程池大小,应该要提供根据不同的应用场景来调整线程池的大小参数,以提高性能和资源利用率。
因此线程池实现类需要实现自定义初始大小、最大大小以及核心大小的功能。
- 初始大小是指线程池初始化时创建的工作线程数量
- 最大大小是指线程池能够容纳的最多的工作线程数量
- 核心大小是指线程池在没有任务时保持存活的工作线程数量。
这三个参数需要在基本的线程池实现类中定义为成员变量,并在构造方法中传入并赋值。
同时,还需要在execute方法中根据这三个参数来动态地调整工作线程的数量,例如:
- 当活跃的工作线程数量小于核心大小时,尝试创建并启动一个新的工作线程来执行任务;
- 当活跃的工作线程数量大于等于核心大小时,将任务加入到任务队列,等待空闲的工作线程来执行;
- 当任务队列已满时,尝试创建并启动一个新的工作线程来执行任务,
- 当活跃的工作线程数量达到最大大小时,无法再创建新的工作线程。
我们还需要在构造方法里提供一个参数queueSize,用于限制队列大小。
下面我们就对类进行改造:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class SimpleThreadPool implements ThreadPool {
// 线程池初始化时的线程数量
private int initialSize;
// 线程池最大线程数量
private int maxSize;
// 线程池核心线程数量
private int coreSize;
// 队列大小
private int queueSize;
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 用于存放和管理工作线程的集合
private List<WorkerThread> threads;
// 是否已经被shutdown标志
private volatile boolean isShutdown = false;
public SimpleThreadPool(int initialSize, int maxSize, int coreSiz, int queueSizee) {
// 初始化参数
this.initialSize = initialSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
taskQueue = new LinkedBlockingQueue<>(queueSize);
threads = new ArrayList<>(initialSize);
// 初始化方法,创建一定数量的工作线程,并启动它们
for (int i = 0; i < initialSize; i++) {
WorkerThread workerThread = new WorkerThread();
workerThread.start(taskQueue);
threads.add(workerThread);
}
}
@Override
public void execute(Runnable task) {
if (isShutdown) {
throw new IllegalStateException("ThreadPool is shutdown");
}
// 当线程数量小于核心线程数时,创建新的线程
if (threads.size() < coreSize) {
addWorkerThread(task);
} else if (!taskQueue.offer(task)) {
// 当队列已满时,且线程数量小于最大线程数量时,创建新的线程
if (threads.size() < maxSize) {
addWorkerThread(task);
} else {
throw new IllegalStateException("执行任务失败");
}
}
}
// 创建新的线程,并执行任务
private void addWorkerThread(Runnable task) {
WorkerThread workerThread = new WorkerThread();
workerThread.start(taskQueue);
threads.add(workerThread);
// 任务放入队列
taskQueue.offer(task);
}
////省略其它代码
}
这一步,我们在 SimpleThreadPool里新增了initialSize,maxSize, coreSize 三个变量,在构造方法里传入对应三个参数,同时在execute方法里,当有任务进入时,先判断当前线程池数量是否满足不同条件,进而执行不同的处理逻辑。
▌第四步:设计饱和拒绝策略
这个功能是为了处理当任务队列已满且无法再创建新的工作线程时,也是就线程池的工作量饱和时,如何处理被拒绝的任务。
不同的场景可能需要不同的拒绝策略,例如
- 直接抛出异常
- 忽略任务
- 阻塞当前线程
- 等等
为了让用户可以自定义拒绝策略,需要
- 定义一个拒绝策略接口,声明一个方法,用于处理被拒绝的任务。
- 然后需要在基本的线程池实现类中定义一个拒绝策略成员变量,并在构造方法中传入并赋值。
- 最后,在execute方法中,在无法创建新的工作线程时,调用拒绝策略来处理该任务。
下面是一个简单的实现示例,
我们首先定义了一个RejectedExecutionHandler接口,用来表示拒绝策略。用户可以根据需要实现这个接口,并在构造线程池时传入自己的拒绝策略。
public interface RejectedExecutionHandler {
// 参数:r 代表被拒绝的任务,executor 代表线程池对象
void rejectedExecution(Runnable r, ThreadPool executor);
}