最近研究了一下java线程池ThreadPoolExecutor的源码,打算把我能理解的都总结一下,如有错误,欢迎指出!

在解析线程池源码之前,先来回顾一下有关线程池的一些概念和执行流程,方便后续对源码的理解。

回顾线程池:

public ThreadPoolExecutor(int corePoolSize,    //核心线程的数量
                          int maximumPoolSize,    //最大线程数量
                          long keepAliveTime,    //超出核心线程数量以外的线程空余存活时间
                          TimeUnit unit,    //存活时间的单位
                          BlockingQueue<Runnable> workQueue,    //保存待执行任务的队列
                          ThreadFactory threadFactory,    //创建新线程使用的工厂
                          RejectedExecutionHandler handler // 当任务无法执行时的处理器
                          ) {...}

 

线程池的工作流程:

  1. 默认情况下,创建完线程池后并不会立即创建线程, 而是等到有任务提交时才会创建线程来进行处理。(除非调用prestartCoreThread或prestartAllCoreThreads方法) 
  2. 当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前有线程处于空闲状态,直到当前线程数达到核心线程数。  
  3. 当前线程数达到核心线程数时,如果这个时候还提交任务,这些任务会被放到队列里,等到线程处理完了手头的任务后,会来队列中取任务处理。  
  4. 当前线程数达到核心线程数并且队列也满了,如果这个时候还提交任务,则会继续创建线程来处理,直到线程数达到最大线程数。
  5. 当前线程数达到最大线程数并且队列也满了,如果这个时候还提交任务,则会触发饱和策略。 
  6. 如果某个线程的控线时间超过了keepAliveTime,那么将被标记为可回收的,并且当前线程池的当前大小超过了核心线程数

下图即可描述这一过程:

线程池源码解析-LMLPHP

源码解析:

  • 为了更好的阅读源码,我提前写了一段简单的代码,作为程序的入口点,如下:
package com.tongtong;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ThreadPoolExextorsTest {

    public static void main(String args[]){

        final ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.submit(()->log.info("submit:{}",executorService));

    }
}

在上述代码中打断点,一步一步的进入ThreadPoolExecutor的源码中,如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

corePoolSize:核心线程数量

maxinumPoolSize:最大线程数量

TimeUnit:一般与KeepAliveTime一起使用,让线程池判断是否满足关闭线程的条件

BlockingQueue<Runnable>:缓冲队列

ThreadFactory:线程工厂,DefaultFactory是其实现类

RejectedExcutionHandler:拒绝处理任务

 

ThreadExecutor的execute方法

  public void execute(Runnable command) {
        if (command == null)
            //如果没传入Runnable任务,则抛出异常
            throw new NullPointerException();

        int c = ctl.get();

        //1、当前池中线程数小于核心线程数,新建一个线程执行任务
        if (workerCountOf(c) < corePoolSize) {
         //直接开启新的线程,并将Runnable传入作为第一个要执行的任务,成功返回true,失败返回false
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        //2、线程池是Running状态且核心池已满,但任务队列未满,添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //核心池和队列均已满,尝试创建一个新线程
        else if (!addWorker(command, false))
            reject(command); //若创建失败,说明线程池被关闭或者线程池满了,拒绝任务
    }

流程总结:

  1. 如果线程池中的线程数量少于corePoolSize(核心线程数量),那么会直接开启一个新的核心线程来执行任务,即使此时有空闲线程存在.
  2. 如果线程池中线程数量大于等于corePoolSize(核心线程数量),那么任务会被插入到任务队列中排队,等待被执行.此时并不添加新的线程.
  3. 如果在步骤2中由于任务队列已满导致无法将新任务进行排队,这个时候有两种情况:
  4. 线程数量 [未] 达到maximumPoolSize(线程池最大线程数) , 立刻启动一个非核心线程来执行任务.
  5. 线程数量 [已] 达到maximumPoolSize(线程池最大线程数) , 拒绝执行此任务.ThreadPoolExecutor会通过RejectedExecutionHandler,抛出RejectExecutionException异常.

 

线程池数量及线程池状态

在上述分析中ct1.get()频繁出现,你那么它到底代表什么呢?查看源码

//创建AtomicInteger对象
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 = 29
//最大线程容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //将1的二进制向右位移29位,再减1

//运行状态保存在int值的高3位 (所有数值左移29位)
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

//运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//是否正在运行
private static boolean isRunning(int c) { return c < SHUTDOWN;}

整理如下:

  • clt是一个AtomicInteger对象,(提供原子操作进行Integer的使用,适用于高并发场景.该AtomicInteger的value可以自动刷新,确保在高并发环境下的唯一性.),而ctl.get()获取的就是该value值.
  • 线程池用一个AtomicInteger来保存 [线程数量] 和 [线程池状态] ,一个int数值一共有32位,高3位用于保存运行状态,低29位用于保存线程数量
  • 系统默认的线程容量就是(2^29)-1 , 大约5亿条线程。

由此可见,频繁的调用c = ctl.get();是为了获取该AtomicInteger的最新值,进而通过位运算获取线程池的最新运行状态,线程数量.

线程池状态:

  • RUNNING: 接收新任务,并执行队列中的任务
  • SHUTDOWN: 不接收新任务,但是执行队列中的任务
  • STOP: 不接收新任务,不执行队列中的任务,中断正在执行中的任务
  • TIDYING: 所有的任务都已结束,线程数量为0,处于该状态的线程池即将调用terminated()方法
  • TERMINATED: terminated()方法执行完成

创建新线程:

/**
 * 往线程池中添加Worker对象
 * @param  firstTask 线程中第一个要执行的任务
 * @param  core      是否为核心线程
 * @return           添加是否成功
 */
 private boolean addWorker(Runnable firstTask, boolean core) {
    //这里有两层[死循环],外循环:不停的判断线程池的状态
    retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //一系列判断条件:线程池关闭,Runnable为空,队列为空,则直接return false,代表Runnable添加失败
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
                return false;
            }

            //内循环:不停的检查线程容量
            for (;;) {
                int wc = workerCountOf(c);
                //超过线程数限制,则return false
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                    return false;
                }
                //★ 添加线程成功,则直接跳出两层循环,继续往下执行.
                //注意:这里只是把线程数成功添加到了AtomicInteger记录的线程池数量中,真正的Runnable添加,在下面的代码中进行
                if (compareAndIncrementWorkerCount(c)){
                    break retry;
                }
                //再次判断线程池最新状态,如果状态改变了(内循环和外循环记录的状态不符),则重新开始外层死循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs){
                    continue retry;
                }
            }
        }

    //结束循环之后,开始真正的创建线程.
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个Worker对象,并将Runnable当做参数传入
        w = new Worker(firstTask);
        //从worker对象中取出线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //拿到锁
            mainLock.lock();
            try {
                //再次检查线程池最新状态
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    //检查准备执行Runnable的Thread的状态,如果该Thread已处于启动状态,则抛出状态异常(因为目前还没启动呢)
                    if (t.isAlive()){
                        throw new IllegalThreadStateException();
                    }
                    //将新创建的worker,添加到worker集合
                    workers.add(w);
                    ...
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //★Thread开始启动
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //添加worker失败
        if (! workerStarted){
            addWorkerFailed(w);
        }
    }
    return workerStarted;
}

 

总结:

  • 先判断线程池状态和线程池中线程的容量,如果满足线程添加的条件,则先把AtomicInteger中记录的线程数量+1.然后再进行线程添加的工作.
  • 创建worker对象,并将Runnable作为参数传递进去,并从worker中取出Thread对象,进行一系列条件判断后.开启Thread的start()方法,线程开始运行.所以worker对象中必然包含了一个Thread和一个要被执行的Runnable.

 

//ThreadPoolExecutor的内部finial类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    //当前worker要执行任务所用的线程(如果创建失败,则可能是null)
    final Thread thread;
    //第一个要执行的任务(可能是null)
    Runnable firstTask;
    //当前线程执行完的任务总数
    volatile long completedTasks;

    //通过构造传入Runnable任务
    Worker(Runnable firstTask) {
        ...
        this.firstTask = firstTask;
        //通过ThreadFactory()创建新线程
        this.thread = getThreadFactory().newThread(this);
    }

    //调用外部类runWorker()方法
    public void run() {
        runWorker(this);
    }
    ...
}

总结:

  • 每个worker,都是一条线程,同时里面包含了一个firstTask,即初始化时要被首先执行的任务.
  • 最终执行任务的,是runWorker()方法

线程的复用

//ThreadPoolExecutor的final类,该方法由内部类Worker的run()方法调用
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //取出Worker对象中的Runnable任务
    Runnable task = w.firstTask;
    boolean completedAbruptly = true;
    ...
    try {
        //★注意这个while循环,在这里实现了 [线程复用]
        while (task != null || (task = getTask()) != null) {
            //上锁
            w.lock();
            //检查Thread状态的代码
            ...
            try {
                ...
                try {
                    //执行Worker中的Runnable任务
                    task.run();
                } catch (...) {
                   ...catch各种异常
                }
            } finally {
                //置空任务(这样下次循环开始时,task依然为null,需要再通过getTask()取) + 记录该
                  Worker完成任务数量 + 解锁
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        //该线程已经从队列中取不到任务了,改变标记
         completedAbruptly = false;
    } finally {
        //线程移除
        processWorkerExit(w, completedAbruptly);
    }
}

通过上面的源码,发现通过一个while循环,不断的getTask()取任务出来执行,以这种方式实现了线程的复用.

线程复用逻辑:

  • 如果task不为空,则开始执行task
  • 如果task为空,则通过getTask()再去取任务,并赋值给task,如果取到的Runnable不为空,则执行该任务
  • 执行完毕后,通过while循环继续getTask()取任务
  • 如果getTask()取到的任务依然是空,那么整个runWorker()方法执行完毕

 

private Runnable getTask() {
    ...
    for (;;) {
        ...
        // 如果线程池已关闭 或 任务队列为空,则AtomicInteger中记录的线程数量-1,并return null,
           结束本方法
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //获取当前线程池中的总线程数
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut参数是使用者自行设置的(默认false),用来设置:是否允许核心线程有
          超时策略
        //条件1:核心线程超时 条件2:当前线程数 > 核心线程数,满足任何一个条件则timed标记为true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //超过最大线程数 或 超时 或 任务队列为空...  线程数量-1 + return null
        ...
        try {
            //根据timed标记,使用不同的方式(限时等待 or 阻塞)从BlockingQueue<Runnable>
              workQueue 队列中取任务
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null){
                //如果取到了,就将Runnable返回
                return r;
            }
            //如果没取到,则重新for循环
            ...
        }
    }
}

信息整理如下:

  • 线程池使用BlockingQueue来管理整个线程池中的Runnable任务,变量workQueue存放的都是待执行的任务
  • BlockingQueue是个阻塞队列,BlockingQueue.take()方法如果得到的是空,则进入等待状态,直到BlockingQueue有新的对象被加入时,才可以正常将Runnable取出并返回,线程开始正常运转,正常执行Runnable任务。
/**
 * 先进先出的阻塞队列
 */
public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 检索并移除队列的顶部元素,如果该元素不可用则等待,直至元素可用
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take()
    ...
}

整理上面这部分的逻辑:

  • execute()方法执行之后,进行一系列的逻辑判断来控制线程池中的线程数量,并通过addWorker()方法创建新线程
  • 一旦Worker里的Thread开始start()之后,执行的其实是Worker里的run()方法,run()方法调用runWorker(Worker w)方法.
  • 在runWorker()方法里面通过getTask()方法不停的取workQueue队列中的任务来执行,如果取到了就执行,如果没取到就等待.

结论:

  • 一旦一个线程开启之后,会一直执行下去,直至任务队列中的任务执行完毕,达成了线程的复用
  • 以Runnable队列为目标的worker虽然是串行操作,但是由于可以通过addWorker()添加多个worker,并且多个worker取的是同一个BlockingQueue中的Runnable,所以就实现了并行处理.

线程的移除

在runWorker()方法中有如下代码:

final void runWorker(Worker w) {
    boolean completedAbruptly = true;
    ...
    try {
        while (getTask()...) {
            ...
            处理任务
        }
        //该线程已经从队列中取不到任务了,改变标记,该标记表示:该线程是否因用户因素导致的异常而终止
         completedAbruptly = false;
    } finally {
        //线程移除
        processWorkerExit(w, completedAbruptly);
    }
}

进入processWorkerExit()方法中:

/**
 * @param w the worker 线程
 * @param completedAbruptly 该线程是否因用户因素导致的异常而终止
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    ...
    try {
        //记录该线程完成任务的总数
        completedTaskCount += w.completedTasks;
        //从worker集合中移除本worker(线程)
        workers.remove(w);
    }
    ...
    //如果在runWoker()中正常执行任务完毕,这里completedAbruptly传入的就是false
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        //如果线程池里最少线程数为0,但是此时任务队列里依然还有任务
        if (min == 0 && ! workQueue.isEmpty()){
            //那么必须保留一条线程,所以将最小值设置为1
            min = 1;
        }
        //如果当前线程数>= 最小线程数,则直接return
        if (workerCountOf(c) >= min){
            return;
        }
    }
    //否则添加一条新线程,来替代当前线程,继续去执行队列中的任务.
    addWorker(null, false);
}

移除步骤:

  • 先移除传入的Worker(线程)
  • 判断线程池里的最少线程数,如果最少线程数为0条,但是队列里依然有任务未执行完毕.那么必须确保线程池中至少有1条线程.(将最小线程数置为1)
  • 如果当前线程数 > 最小线程数,本方法结束,不再往下执行
  • 否则添加一条新线程,来替代当前线程,继续去执行队列中的任务.

 

先写到这,再续.......

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

10-03 21:13