前言

  本文主要是结合源码去线程池执行任务的过程,基于JDK 11,整个过程基本与JDK 8相同。

  个人水平有限,文中若有表达有误的,欢迎大伙留言指出,谢谢了!

一、线程池简介

  1.1 使用线程池的优点

    1)通过复用已创建的线程,降低资源的消耗(线程的创建/销毁是要消耗资源的)、提高响应速度;

    2)管理线程的个数,线程的个数在初始化线程池的时候指定;

    3)统一管理线程,比如停止,stop()方法;

  1.2 线程池执行任务过程

    线程池执行任务的过程如下图所示,主要分为以下4步,其中参数的含义会在后面详细讲解:

    1)判断工作的线程是否小于核心线程数据(workerCountOf(c) < corePoolSize),若小于则会新建一个线程去执行任务,这一步仅仅的是根据线程个数决定;

    2)若核心线程池满了,就会判断线程池的状态,若是running状态,则尝试加入任务队列,若加入成功后还会做一些事情,后面详细说;

    3)若任务队列满了,则加入失败,此时会判断整个线程池线程是否满,若没有则创建非核心线程执行任务;

    4)若线程池满了,则根据拒绝测试处理无法执行的任务;

    整体过程如下图:

并发系列(一)——线程池源码(ThreadPoolExecutor类)简析-LMLPHP

二、ThreadPoolExecutor类解析

  2.1 ThreadPoolExecutor的构造函数

    ThreadPoolExecutor类一共提供了4个构造函数,涉及5~7个参数,下面就5个必备参数的构造函数进行说明:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

    1)corePoolSize :初始化核心线程池中线程个数的大小;

    2)maxmumPoolSize:线程池中线程大小;

    3)keepAliveTime:非核心线程的超时时长;

      非核心线程空闲时常大于该值就会被终止。

    4)unit :keepAliveTime的单位,类型可以参见TimeUnit类;

    5)BlockingQueue workQueue:阻塞队列,维护等待执行的任务;

  2.2  私有类Worker

    在ThreadPoolExecutor类中有两个集合类型比较重要,一个是用于放置等待任务的workQueue,其类型是阻塞对列;一个是用于用于存放工作线程的works,其是Set类型,其中存放的类型是Worker。

    进一步简化线程池执行过程,可以理解为works中的工作线程不停的去阻塞对列中取任务,执行结束,线程重新加入大works中。

    为此,有必要简单了解一下Work类型的组成。

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
//工作线程,由线程的工厂类初始化
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
//不可重入的锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
} .......
}

    Worker类继承于队列同步器(AbstractQueueSynchronizer),队列同步器是采取锁或其他同步组件的基础框架,其主要结构是自旋获取锁的同步队列和等待唤醒的等待队列,其方法因此可以分为两类:对state改变的方法 和 入、出队列的方法,即获取获取锁的资格的变化(可能描述的不准确)。关于队列同步器后续博客会详细分析,此处不展开讨论。

    Work类中通过CAS设置状态失败后直接返回false,而不是判断当前线程是否已获取锁来实现不可重入的锁,源码注释中解释这样做的原因是因为避免work tash重新获取到控制线程池全局的方法,如setCorePoolSize。

  2.3  拒绝策略类

    ThreadPoolExecutor的拒绝策略类是以私有类的方式实现的,有四种策略:

    1)AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认拒绝处理策略)。

      2)DiscardPolicy:抛弃新来的任务,但是不抛出异常。

      3)DiscardOldestPolicy:抛弃等待队列头部(最旧的)的任务,然后重新尝试执行程序(失败则会重复此过程)。

      4)CallerRunsPolicy:由调用线程处理该任务。

    其代码相对简单,可以参考源码。

三、任务执行过程分析

  3.1 execute(Runnable)方法

    execute(Runnable)方法的整体过程如上文1.2所述,其实现方式如下:

public void execute(Runnable command) {
//执行的任务为空,直接抛出异常
if (command == null)
throw new NullPointerException();
//ctl是ThreadPoolExecutor中很关键的一个AtomicInteger,主线程池的控制状态
int c = ctl.get();
//1、判断是否小于核心线程池的大小,若是则直接尝试新建一个work线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2、大于核心线程池的大小或新建work失败(如创建thread失败),会先判断线程池是否是running状态,若是则加入阻塞对列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新验证线程池是否为running,若否,则尝试从对列中删除,成功后执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//若线程池的状态为shutdown则,尝试去执行完阻塞对列中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3、新建非核心线程去执行任务,若失败,则采取拒绝策略
else if (!addWorker(command, false))
reject(command);
}

  3.2 addWorker(Runnable,boole)方法

    execute(Runnable)方法中,新建(非)核心线程执行任务主要是通过addWorker方法实现的,其执行过程如下:

private boolean addWorker(Runnable firstTask, boolean core) {
//此处反复检查线程池的状态以及工作线程是否超过给定的值
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false; for (;;) {
//核心和非核心线程的区别
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
} boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
//通过工厂方法初始化,可能失败,即可能为null
final Thread t = w.thread;
if (t != null) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
//线程池处于running状态
//或shutdown状态但无需要执行的task,个人理解为用于去阻塞队列中取任务执行
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//执行任务,这里会执行thread的firstTask获取阻塞对列中取任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//开始失败,则会从workers中删除新建的work,work数量减1,尝试关闭线程池,这些过程会获取全局锁
addWorkerFailed(w);
}
return workerStarted;
}

  3.3  runWorker(this) 方法

    在3.2 中当新建的worker线程加入在workers中成功后,就会启动对应任务,其调用的是Worker类中的run()方法,即调用runWorker(this)方法,其过程如下:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//while()循环中,前者是新建线程执行firstTask,对应线程个数小于核心线程和阻塞队列满的情况,
//getTask()则是从阻塞对列中取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//仅线程池状态为stop时,线程响应中断,这里也就解释了调用shutdown时,正在工作的线程会继续工作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
//执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
//完成的个数+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理后续工作
processWorkerExit(w, completedAbruptly);
}
}

  3.4 processWorkerExit(Worker,boole)方法

    当任务执行结果后,在满足一定条件下会新增一个worker线程,代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//对工作线程的增减需要加全局锁
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate(); int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//线程不是中断,会维持最小的个数
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//执行完任务后,线程重新加入workers中
addWorker(null, false);
}
}

  至此,线程池执行任务的过程分析结束,其他方法的实现过程可以参考源码。

Ref:

[1]http://concurrent.redspider.group/article/03/12.html

[2]《Java并发编程的艺术》

04-14 00:24