一、J.U.C-FutureTask-1

FutureTask组件,该组件是JUC中的。但该组件不是 AQS 的子类。

创建一个线程通常有两种方式,一种是直接继承Thread类,另一红就是实现Runnable接口,这俩种方式有一个共同的缺陷,那就是在完成任务之后无法获取执行结果。从Java1.5开始提供了CallableFutureTask,可以在任务完成之后得到任务执行的结果。

1、Callable 与 Runnable 接口对比:

Runnable的代码非常简单,它是一个接口,而且只有一个方法,那就是run(),实现它把一些业务的操作写在里面,然后使用某个线程去执行该Runnable实现类就可以实现多线程;

Callable的代码也非常简单,不同的是它是一个泛型的接口,有一个call 函数,call函数的类型就是我们传进去的类的类型。

Callable和Runnable的功能大致相似,Callable的功能更强大一些。主要是Callable在执行完后可以有返回值,并且抛出异常。

2、Future接口:

Future也是一个接口,对于我们定义的Callable或者Runnable定义的具体的任务,它可以取消。查询的任务是否被取消、查询的过程是否完成以及获取结果等等。

通常程序的线程都是异步执行的,所以通常需要可以直接从其他的线程中得到返回值。这个时候,Future就出场了。Future可以监视目标线程调用call()的情况,当你调用Future的get()方法时,就可以获得它的结果。通常这个时候,线程可能不会直接完成,当前线程就开始阻塞,知道call()方法结束返回结果,线程才继续执行。

总结: Future可以得到其他线程的任务方法的返回值。

3.FutureTask类:

FutureTask的父类是RunnableFuture,,而RunnableFuture实现了Runnable和Future两个接口;由此我们可以知道,FutureTask它最终执行的是callable类型的任务。

如果构造函数参数的类型是callable的话,它会转换成callable类型。

RunnableFuture实现了Runnable和Future两个接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

该组合FutureTask的使用的好处是?

假设有一个很费时的逻辑需要计算,并且需要返回这个值,同时这个值又不是马上需要,那么就可以使用这个组合。用另外一个线程去计算返回值,而当前线程在使用这个返回值之前做其他的操作,等到需要用到这个返回值的时候再通过Future得到。

二、J.U.C-FutureTask-2

1、代码演示Future的使用

@Slf4j
public class FutureExample {
    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("resultï¼{}", result);
    }
}

代码执行结果:

17:28:32.168 [main] INFO com.mmall.concurrency.example.aqs.FutureExample - do something in main
17:28:32.168 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.FutureExample - do something in callable
17:28:37.171 [main] INFO com.mmall.concurrency.example.aqs.FutureExample - resultï¼Done

由执行结果的时间可知,call()方法中打印do something in callable 即线程初始化执行的时间和main方法中执行打印do something in main的时间一直,可说明两者是异步同时执行的。

再由call()线程返回值的打印时间可知,它是在线程初始化以及main初始化5秒后才打印结果result,可以说明future一直被阻塞,直到call()方法中的业务执行完成并返回结果后才接着继续执行get()方法。


2、代码演示FutureTask的使用

@Slf4j
public class FutureTaskExample {

    public static void main(String[] args) throws Exception {
       FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
           @Override
           public String call() throws Exception {
               log.info("do something in callable");
               Thread.sleep(5000);
               return "Done";
           }
       });
       new Thread(futureTask).start();
       log.info("do something in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result:{}",result);
    }
}

代码执行结果:

17:37:57.378 [Thread-0] INFO com.mmall.concurrency.example.aqs.FutureTaskExample - do something in callable
17:37:57.378 [main] INFO com.mmall.concurrency.example.aqs.FutureTaskExample - do something in main
17:38:02.384 [main] INFO com.mmall.concurrency.example.aqs.FutureTaskExample - result:Done

Process finished with exit code 0

分析执行结果:

3、对比Future和FutureTask执行声明与执行结果:

可以知道,FutureTask使用起来要更加方便,定义好任务之后,直接启动任务,什么时候想用,什么时候就可以用它。

4、部分源码分析:

由FutureTask的构造方法可以知道,它支持多种类型的构造函数的。

同时,当传入的是Runnable时,还可以指定返回值的类型

三、J.U.C-ForkJoin

JUC 里面的ForkJoin框架是JAVA7提供的一个用于实现并行任务的框架,它是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的结果的框架

它的思想和Mapreduce的思想比较类似。

从字面上看,Fork就是把一个大任务切割成若干个子任务来并行执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的执行结果。它主要采用的是工作窃取算法。

工作窃取算法的意思是指某个线程从其他队列里窃取任务来执行。

1、工作窃取算法的工作流程图及原理:

从图中分析,该算法从线程1开始执行,然后到达任务4时,会在尾部进入到线程2去窃取属于线程2 的任务来执行。窃取线程的执行顺序自下而上,被窃取线程在执行时是自上而下

这里为什么要使用工作窃取算法呢?

假如我们需要做一个比较大的任务,我们可以把这个任务分割成若干个互不依赖的子任务。为了减少线程间的竞争,于是把这些子任务放到不同的队列里。为每个队列创建一个单独的线程来执行队列里的任务。线程和队列一一对应,比如A线程负责A队列里的任务。

但是,有的线程会先把自己线程里的任务干完。其他线程对应的队列里还有任务等待处理。干完活的线程与其光等着,还不如去帮助其他的线程干活。于是,它就去其他线程的队列里去窃取一个线程来执行,而在这时,它们会访问同一个队列,所以,为了减少窃取任务线程和被窃取任务线程之间的竞争,我们会使用双端队列。被窃取任务的线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部那任务执行。

优点:充分利用了线程进行并行计算,并减少线程之间的竞争。

缺点:某些情况下,还是存在竞争,比如,双端队列里只有一个任务时,同时执行会消耗更多的系统资源。比如,创建了多个线程和多个双端队列。

对于ForkJoin框架而言,当一个任务正在等待它使用join()操作创建的子任务结束时。执行这个任务的工作线程,它的其他未被执行的任务会开始它的执行。通过这种方式,线程充分利用他们的运行时间来提高应用程序的性能。

为了达到这个目标,ForkJoin框架有一些局限性。具体有:

任务始终只能够使用Fork()或Join()操作来作为同步机制。如果使用了为他同步机制,当那个它们在同步工作时,工作线程就不能执行任务了。比如:在ForkJoin框架中,你使任务进入了睡眠,这个睡眠期间内执行其他任务的工作线程将不能执行其他任务了。

第二个局限性,我们所拆分的任务不应该去执行IO操作,如读或写数据文件;

第三个局限性,任务不能抛出或检查异常,它必须通过bug的代码来处理它

ForkJoin框架的核心是两个类,ForkJoinPool和ForkJoinTask.,Pool负责来做实现,包括工作窃取算法,它管理工作线程和提供任务工作的状态和它们的执行信息。而ForkJoinTask主要负责在任务中执行Fork()或者Join()操作的机制。

2、代码演示ForkJoin的使用:

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

代码执行结果:

18:26:35.223 [main] INFO com.mmall.concurrency.example.aqs.ForkJoinTaskExample - result:5050

Process finished with exit code 0

四、J.U.C-BlockingQueue

1、BlockingQueue 的实现流程图及原理

BlockingQueue,在某些情况下,对该队列的访问,可能会造成阻塞。

被阻塞的情况主要有如下两种:

       第一种:当队列满的时候,进行入队列操作;

       第二种:当队列空的时候,进行出队列操作

当一个线程对一个满了的队列进行入队列操作的时候,它就会阻塞,除非有另一个线程做了出队列的操作。同样的,当一个线程对一个空的队列做出队列操作时,它也将被阻塞,除非有一个线程做了如队列操作。

阻塞队列是线程安全的,阻塞队列主要用作生产者和消费者的场景。由图进行分析:负责生产的线程T1不断制造新对象并插入到队列中,知道达到队列的上限值,此时,生产线程将被阻塞,知道消费者线程T2开始对这个队列进行消费。

同理,负责消费的线程不断从队列中消费对象,知道队列为空。当队列为空时,消费线程将会被阻塞,知道队列中有新的线程被插入进来。

2、BlockingQueue主要方法示意图:

3、BlockingQueue的主要实现类:

  •    ArrayBlockingQueue
  •   DelayQueue
  •   LinkedBlockingQueue
  •   PriorityBlockingQueue
  •   SynchronousQueue
12-17 00:45