线程池(Executor)

什么是线程池?

Java5引入了新的称为Executor框架的并发API,以简化程序员的工作。它简化了多线程应用程序的设计和开发。它主要由ExecutorExecutorService接口和ThreadPoolExecutor类组成,ThreadPoolExecutor类同时实现ExecutorExecutorService接口。ThreadPoolExecutor类提供线程池的实现。我们将在教程的后面部分了解更多。

线程池继承关系图

为什么我们需要线程池?

当我们创建一个简单的多线程应用程序时,我们创建Runnable对象,并使用Runnable构造线程对象,我们需要创建、执行和管理线程。我们可能很难做到这一点。Executor框架为您做这件事。它负责创建、执行和管理线程,不仅如此,它还提高了应用程序的性能。

当您为每个任务创建一个新线程,然后如果系统高度过载,您将出现内存不足错误,系统将失败,甚至抛出oom异常。如果使用ThreadPoolExecutor,则不会为新任务创建线程。将任务分配给有限数量的线程只去执行Runnable,一旦线程完成一个任务,他将会去阻塞队列中获取Runnable去执行。

如何创建线程池?


public interface Executor {
 void execute(Runnable command);
}
 

还有另一个名为ExecutorService的接口,它扩展了Executor接口。它可以被称为Executor,它提供了可以控制终止的方法和可以生成未来跟踪一个或多个异步任务进度的方法。它有提交、关机、立即关机等方法。

ThreadPoolExecutor是ThreadPool的实际实现。它扩展了实现ExecutorService接口的AbstractThreadPoolExecutor。可以从Executor类的工厂方法创建ThreadPoolExecutor。建议使用一种方法获取ThreadPoolExecutor的实例。

  • 使用Executors工厂方法去创建线程池:

Executors类中有4个工厂方法可用于获取ThreadPoolExecutor的实例。我们正在使用Executors的newFixedThreadPool获取ThreadPoolExecutor的一个实例。

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
  • 自定义ThreadPoolExecutor的创建线程池

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue workQueue ,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;

ThreadPoolExecutor源码分析

  • 线程池内部状态

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 获取线程状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
     // 获取work线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 制定状态&线程数 获取ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl变量利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:

  • RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
  • TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated

状态转换图

下面带大家分析下ThreadPoolExecutor内部几个核心方法:

  • 添加任务:execute(Runnable command)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
    if (workerCountOf(c) < corePoolSize) {  
       if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // double check: c, recheck
    // 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
        //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //线程池处于running状态,但是没有线程,则创建线程去执行队列的任务。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 往线程池中创建新的线程失败,则reject任务
    else if (!addWorker(command, false))
        reject(command);
}

添加任务流程图

  • 添加工作队列 addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();        //读取ctl的值
        int rs = runStateOf(c);   //获取线程池的运行状态
        
        /*判断当前线程池还是否需要执行任务
         *如果当前线程池的状态为RUNNING态则不会返回false
         *返回false的条件(大前提:当前线程池状态不是RUNNING态),在此基础下下面三个条件有任何一个不成立都会直接返回,而不新建工作线程:
         *         1.当前线程池的状态为SHUTDOWN态
         *         2.所提交任务为null
         *         3.阻塞队列非空
         */
        if (rs >= SHUTDOWN && 
                !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
 
        for (;;) {
            //获取当前池中线程个数
            int wc = workerCountOf(c);
            /*
             *若当前池中线程个数 >= 2的29次方减1,则无法创建新线程。池中最大线程数量为2的29次方减1个
             *如果core为true则于核心先称数量进行比较,否则与最大线程数量进行比较
             */ 
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //将workerCount的值加1,并跳出外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;     
   
            //如果线程状态被修改,则再次执行外层循环
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        /*
         *此处创建Worker实例,并将任务firstTask设置进去
         *注意Worker类中有两个特殊的字段:1. Runnable firstTask     2. final Thread thread
         *Worker类本身也继承了Runnable接口,实现了其run()方法
         */
        w = new Worker(firstTask);
        //这里的t是w本身表示的线程对象,而非firstTask。
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //获取当前线程池的运行状态rs
                int rs = runStateOf(ctl.get());
 
                /*
                 *rs < SHUTDOWN的状态只有RUNNING态
                 *能进去下面if的条件:
                 *    1. 当前线程池运行状态为RUNNING
                 *    2.当前线程池状态为SHUTDOWN而且firstTask为null
                 */    
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //HashSet<Worker> workers线程池中利用HashSet保存的worker对象
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize用来记录线程池中最大的线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //任务添加成功(线程创建成功)
                    workerAdded = true;
                }
            }finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动工作线程,这里调用的是Worker类中的run()方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
 
    return workerStarted;
}
  • 执行任务: runWorker(Worker w)

final void runWorker(Worker w) {
    //获取当前执行的线程对象
    Thread wt = Thread.currentThread();
    //获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // task任务不为空 或者 getTask()获取任务不为空时候进入循环
        while (task != null || (task = getTask()) != null) {
            w.lock();  
            // 如果线程状态>STOP 或者当前线程被中断时候 这时候调用wt.interrupt()去中断worker线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                //在ThreadPoolExecutor中该方法是一个空方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务。
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //任务计数器加1
                w.completedTasks++;
                //释放锁
                w.unlock();
            }
        }
        //如果执行任务的过程中没有发生异常,则completedAbruptly会被赋值为false
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
  • 获取task任务: getTask()

private Runnable getTask() {
    boolean timedOut = false; 
 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        /*
         *若当前线程池的工作状态为RUNNING则不会进入下面if。
         *1.若状态为STOP、TIDYING、TERMINATED则当前工作线程不能执行任务。
         *2.若状态为SHUTDOWN,且阻塞队列为空,则获取任务为null
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //workerCount的值减1
            decrementWorkerCount();
            return null;
        }
        //获取工作线程数量
        int wc = workerCountOf(c);
        
        //若allowCoreThreadTimeOut设置为true 或者 当前池中工作线程数量大于核心线程数量 则timed为true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //若当前工作线程数量已经超过最大线程数量,则也获取不到任务,会从该方法中返回null,进而结束该工作线程
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
 
        try {
            /*
             *若allowCoreThreadTimeOut设置为true 或者 当前池中工作线程数量大于核心线程数量
             *    则:在指定的时间内从阻塞队列中获取任务,若取不到则返回null
             *若allowCoreThreadTimeOut设置为false 而且 当前池中工作线程数量小于核心线程数量
             *    则:在指定的时间内从阻塞队列中获取任务,若取不到则一直阻塞
             */
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            //若r == null,则此处timedOut的值被设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            //如果阻塞等待过程中线程发生中断,则将timeOut设置为false,进入下一次循环
            timedOut = false;
        }
    }
  • 关闭线程: shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检测是否有关闭线程池的权限
        checkShutdownAccess();
        //将线程池状态设置为SHUTDOWN态
        advanceRunState(SHUTDOWN);
        //中断空闲线程(没有执行任务的线程)
        interruptIdleWorkers();
        //该方法在ThreadPoolExecutor中是一个空方法
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    //尝试将线程池状态设置为TERMINATED状态。
    tryTerminate();
  • 立即关闭线程: shutdownNow()

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        // 加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 直接设置STOP状态
            advanceRunState(STOP);
            interruptWorkers();
            // 丢弃未执行的task,返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

线程池使用注意事项

  • 使用ThreadLocal

  • 设置合理的线程数

  1. cpu密集型

    coreSize == cpu核心数+1

  2. Io密集型

    coreSize == 2*cpu核心数

结束

点赞是认可,在看是支持

阅读更多文章

06-11 05:00