并发编程-线程池核心原理

一、线程池的业务场景应用

1.1、异步处理

举个栗子,比如发送邮件,需要找smtp服务器,发送短信,需要找四大运营商。将这种允许延迟看到效果,甚至即便失败的也ok的任务,搞成异步的。

一般在项目完成这种操作的时候,咱们不会自己写线程池。直接SpringBoot的@Async就ok了。

这种SpringBoot的@Async的本质,还是将任务投递给线程池处理,只不过任务用的线程池你没关注。

是可以给异步处理的任务追加一个合理的线程池,从而提升效率

@Component
public class TestAsync {

    static AtomicInteger atomicInteger = new AtomicInteger(1);

    @Bean
    public Executor taskPool(){
        ExecutorService pool = Executors.newFixedThreadPool(100, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("task-" + atomicInteger.getAndIncrement());
                return t;
            }
        });
        return pool;
    }


    @Async(value = "taskPool")
    public void task() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ",发送邮件");
    }

}

1.2、并行IO处理

项目非常常见的一个情况,比如一个业务处理,需要走三个服务拿到数据,最后整合的情况,可以合理的实现线程池做多个IO的并行处理,提升效率。

一般都是CountDownLatch + ThreadPoolExecutor去实现。

static ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10,
        10,
        10,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10)
);

public Object test1() {
    CountDownLatch countDownLatch = new CountDownLatch(3);
    // 时间  = MAX(商品,库存,策略)  = 0.8s
    // 调用商品服务 - 拿点数据     0.5s
    executor.execute(() -> {
        Object itemData = itemFeignClient.getData();
        countDownLatch.countDown();
    });

    // 调用库存服务 - 拿点数据     0.8s
    executor.execute(() -> {
        Object stockData = stockFeignClient.getData();
        countDownLatch.countDown();
    });
    // 调用策略服务 - 拿点数据     0.7s
    executor.execute(() -> {
        Object strategyData = strategyFeignClient.getData();
        countDownLatch.countDown();
    });


    try {
        countDownLatch.await(1500, TimeUnit.MILLISECONDS);
        // 业务线程在这汇总    0.2s
        Object result = itemData + stockData + strategyData;

        return result;
    } catch (InterruptedException e) {
        e.printStackTrace();
        // 超时
        throw new RuntimeException("超时啦~");
    }

}

1.3、 其他框架底层的线程池

RabbitMQ,消费者的问题。RabbitMQ默认情况下,消费者是单线程消费!

一种方式,是基于RabbitMQ的配置,prefetch以及maxconcurrent去指定每次拉取消息的数量以及最大并行消费的线程数。

这里是不是也要基于消费者的业务逻辑去考虑线程池的配置。

OpenFeign,底层也需要你去指定线程池的信息,不然OpenFeign慢的要死(新版本我不太清楚)

Tomcat,默认200个核心线程。

二、线程池核心属性&参数扫盲

AtomicInteger ctl

  • 高3位:线程池的状态
    • RUNNING:一切正常,干活!
    • SHUTDOWN:接的活都能干,不接新活!
    • STOP:什么活都不能干了!
    • TIDYING:线程池准备倒闭!
    • TERMINATED:线程池倒闭了!
  • 第29位:工作线程的数量(工作线程,就是线程池中new的Thread然后start起来的那个线程)

7个参数

  • 核心线程数
  • 最大线程数(核心线程数 + 非核心线程数)
  • 最大空闲时间
  • 空间时间单位
  • 阻塞队列
  • 线程工厂
  • 拒绝策略

三、线程池提交任务流程

3.1 构建好的线程池内部,有多少个工作线程?

0个,工作线程是懒加载,随着任务的提交才会构建。

3.2 任务提交到线程池的几种处理方式?(线程池的执行流程/原理)

  • 工作线程数 < 核心线程数: 直接构建核心线程来处理当前任务。只要满足前面的条件,就直接构建核心线程,不存在其他工作线程是否空闲。
  • 工作线程数 ≥ 核心线程数: 将任务投递到阻塞队列,空闲的工作线程都会来当前的阻塞队列获取任务处理。
    • 当任务丢到阻塞队列之后,可能会出现没有工作线程的情况。 此时会构建一个非核心线程去处理阻塞队列中的任务,避免任务长时间在阻塞队列中不被处理!
  • 阻塞队列已满: 会创建非核心线程来处理刚刚投递过来的任务。等刚刚投递过来的任务处理完,才会去阻塞队列找之前排队的任务处理。
  • 工作线程数 ≥ 最大线程数 && 阻塞队列已满: 执行拒绝策略。

3.3 核心线程与非核心线程的区别?

没有区别!

线程池构建线程,也是基于Thread t = new Thread()去构建,虽然在构建前,有一些判断上的不同,但是构建出来的都是线程,不分核心还是非核心!

线程池最终只确保数量满足参数要求即可。无论线程构建之初,被认定为核心还是非核心,到后面都是工作线程,而且线程池再多维护一个工作线程的属性,很麻烦,而且对效率有影响。

四、线程池后续处理任务流程

线程池中的工作线程,在处理完手头的任务之后,都会去阻塞队列中尝试拿新任务。在尝试的过程中,工作线程处于阻塞状态。

尝试的时间略有不同。

  • 有的工作线程会死等,没任务,我就死等,等到有任务。 (核心线程。 工作线程没有超过核心线程数时,都是死等! ) WAITING,take方法。
  • 有的工作线程会等待最大空闲时间,这段时间没拿到任务,这个线程就销毁。(非核心线程。 工作线程超过核心线程数之后,都是等一会! )TIMED_WAITING,poll(最大空闲时间)方法。

有的工作线程没拿到任务就销毁?怎么销毁的?

无论是线程池的线程,还是自己new的Thread都是同一种线程。都是start起来的。

销毁方式很简单,run方法结束。

线程池也是一样的操作,让run方法结束即可,而结束的方式,是让runWorker方法里的while循环结束。(工作线程可以一直干活的原因,就是在一个while循环中不停的去阻塞队列获取任务)

  • 当没有从阻塞队列在指定的时间范围内,拿到数据,线程会销毁。(非核心线程)
  • 等待任务的线程状态,处于WAITING或者TIMED_WAITING,只要线程在这个状态下中断了,就会被立即唤醒,没有拿到任务,这种也会直接被销毁。

原理一样,都是在阻塞队列中尝试获取任务,但是最后没拿到,就被要干掉,干掉的方式,就是run方法结束,线程销毁。

问题:线程池中线程回收的问题?

当咱们在局部位置,方法内部构件一个线程池去处理任务后,即便方法弹栈了,线程池引用没了,工作线程依然无法被正常回收,严重化会导致OOM问题。

在方法内部使用完毕线程池后,一定要基于执行shutdown方法,先让线程池将工作线程回收,否则存活的工作线程会让线程池对象无法被GC回收。

线程池构建的线程,属于GCRoot的一种。

同时工作线程的run方法的参数里传入了Worker对象(Worker属于Thread线程的包装),Worker无法被回收。

Worker是线程池的内部类,内部类还在,外部类无法被回收。

工作线程执行的run方法,是栈桢压栈执行,所以run方法里涉及到的内容都是GCRoot,run方法里涉及到了Worker对象的引用 -------> Worker ---------> ThreadPoolExecutor

@GetMapping("/test")
public void test() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1000,
            1000,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10)
    );
    for (int i = 0; i < 1000; i++) {
        executor.execute(() -> {
            System.out.println(111);
        });
    }

    Thread.sleep(10000);
}

问题:线程池的核心参数如何设置?

1、如果你项目中有业务涉及到了线程池应用,贴合你的业务去聊。(4C8G)

2、项目中没涉及,以这种方式来聊:

  • 核心线程数
    • 任务的类型是CPU密集还是IO密集。
      • CPU密集:你的线程需要CPU尽可能的去调度他,核心线程数一般就要设置到CPU内核数±1。
      • IO密集:很多书上有一些IO密集时,核心线程数设置的公式,但是实际的情况和公式还有出入的,IO密集的时间不确定的。这里一定要基于压测的形式,找到一个合适的中间值,在短时间内可以按照任务的RT时间以及任务数来决定核心线程数的大小。
  • 阻塞队列长度
    • 考虑任务处理允许的延迟时间,基于每秒任务大致多少的数量,来预测队列最长时,任务会延迟多久。
    • JVM内存问题,如果大量的任务扔到阻塞队列中,占用的内存也不小,尽可能不要占用太多。
  • 最大线程数
    • 强烈推荐就跟核心线程数的大小设置为一样的。如果设置的超过核心线程数,反而会造成线程池处理任务的效率降低,因为CPU频繁的上下文切换,导致浪费CPU资源,这个是完全没有必要的。
  • 拒绝策略
    • 这个只能贴近业务去聊,最基本比如记录日志,丢就丢了,采用discard都可以接受。
    • 一些任务要求必须处理完,设置为CallerRuns都可以接收。
    • 或者Abort抛出异常也成,日志有记录的信息,会知道线程池处理的能力不足。
    • 任务允许延迟,但是又不能影响当前效率,记录信息扔到数据库,后面定时处理。
02-20 13:47