线程池的使用

1、线程池的使用场景

2、线程池的关键参数说明

一般情况下我们是通过ThreadPoolExecutor来构造我们的线程池对象的。

* 阿里巴巴的开发规范文档是禁止直接使用Executors静态工厂类来创建线程池的,原因是

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

  }

参数说明:

3、线程池的分类

3.1、newCachedThreadPool
public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}
3.2、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
3.3、ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
3.4、SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
3.5、总结

4、使用线程池容易出现的问题

5、线程池的监控

5.1、为什么要对线程池进行监控

5.2、监控的原理

5.3、实现的细节

定义线程池并启动监控

 /**
     * 定义线程池的队列的长度
     */
    private final Integer queueSize = 1000;

    /**
     * 定义一个定长的线程池
     */
    private ExecutorService executorService;

    @PostConstruct
    private void initExecutorService() {
        log.info(
                "executorService init with param: threadcount:{} ,queuesize:{}",
                systemConfig.getThreadCount(),
                systemConfig.getThreadQueueSize());
        executorService =
                new ThreadPoolExecutor(
                        systemConfig.getThreadCount(),
                        systemConfig.getThreadCount(),
                        0,
                        TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue(systemConfig.getThreadQueueSize()),
                        new BasicThreadFactory.Builder()
                                .namingPattern("async-sign-thread-%d")
                                .build(),
                        (r, executor) -> log.error("the async executor pool is full!!"));

        /** 启动线程池的监控 */
        ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring();
        threadPoolMonitoring.init();
    }

线程池的监控

/**
     * 功能说明:线程池监控
     *
     * @params
     * @return <br>
     *     修改历史<br>
     *     [2019年06月14日 10:20:10 10:20] 创建方法by fengqingyang
     */
    public class ThreadPoolMonitoring {
        /** 用于周期性监控线程池的运行状态 */
        private final ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor(
                        new BasicThreadFactory.Builder()
                                .namingPattern("async thread executor monitor")
                                .build());

        /**
         * 功能说明:自动运行监控
         *
         * @return <br>
         *     修改历史<br>
         *     [2019年06月14日 10:26:51 10:26] 创建方法by fengqingyang
         * @params
         */
        public void init() {
            scheduledExecutorService.scheduleAtFixedRate(
                    () -> {
                        try {
                            ThreadPoolExecutor threadPoolExecutor =
                                    (ThreadPoolExecutor) executorService;
                            /** 线程池需要执行的任务数 */
                            long taskCount = threadPoolExecutor.getTaskCount();
                            /** 线程池在运行过程中已完成的任务数 */
                            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                            /** 曾经创建过的最大线程数 */
                            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                            /** 线程池里的线程数量 */
                            long poolSize = threadPoolExecutor.getPoolSize();
                            /** 线程池里活跃的线程数量 */
                            long activeCount = threadPoolExecutor.getActiveCount();
                            /** 当前排队线程数 */
                            int queueSize = threadPoolExecutor.getQueue().size();
                            log.info(
                                    "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                    taskCount,
                                    completedTaskCount,
                                    largestPoolSize,
                                    poolSize,
                                    activeCount,
                                    queueSize);

                            /** 超过阀值的80%报警 */
                            if (activeCount >= systemConfig.getThreadCount() * 0.8) {
                                log.error(
                                        "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                        taskCount,
                                        completedTaskCount,
                                        largestPoolSize,
                                        poolSize,
                                        activeCount,
                                        queueSize);
                                ;
                            }
                        } catch (Exception ex) {
                            log.error("ThreadPoolMonitoring service error,{}", ex.getMessage());
                        }
                    },
                    0,
                    30,
                    TimeUnit.SECONDS);
        }
    }

6、需要注意的事项

7、后续

更多精彩,敬请关注, 程序员导航网 https://chenzhuofan.top

07-09 04:17