前言


 文章

一 CompletionService(完成服务)接口源码及机制详解


 类

    任务的“生产”与“消费”默认是同步的。在正式讲述CompletionService(完成服务)接口的作用之前我们需要先了解所谓任务的“生产”与“消费”是什么概念。在Executor(执行器)框架中任务的“生产”指任务向执行器递交的行为,而“消费”则指将任务从执行器中移除的行为,即执行器不再持有任务的引用,因此“消费”只能在任务执行结束后(完成/异常/取消)后发生。任务的“生产”与“消费”通常被认为是同步的,这是一个会让初学者非常疑惑的点,因为基于大多数执行器接口实现类都采用异步执行方案的原因,并无法保证递交方法会在任务“消费”后返回,因此同步就显得毫无道理。那为什么还会有同步的说法呢?这是因为递交方法同步返回的Future(未来)拥有追踪任务执行状态的能力。因此即使任务被执行器异步执行,也可在需要时通过调用未来的get()方法达到同步“消费”(并获取执行结果/异常)的目的,其效果与任务在递交时即被同步“消费”是等价的,因此任务的“生产”与“消费”本质是一种间接/变相的同步。

    同步“消费”的前提是确定具体需要被“消费”的任务。想要进行同步“消费”就必须先确定具体需要被“消费”的任务,这其中原因是因为我们需要调用该任务关联未来的get()方法。这并没有想象中简单,因为递交返回的未来未必是我们想要的目标未来,即递交的任务未必是我们想“消费”的任务。典型的例子是:如何获取一组递交任务中最早执行结束(完成/异常/取消)任务的结果呢?事实上开发者应该很清楚该问题在使用同步“消费”的情况下是无法/很难解决的,因为无法得知哪个同步未来的代表任务会最早执行结束(完成/异常/取消)。因此在包含上述举例在内的某些场景中我们也希望“消费”可以是异步的,即不通过调用同步未来get()方法的方式来“消费”任务,以达到对任务“消费”的顺序/时间/条件等多项维度进行自定义的目的。

    完成服务接口在定义上提供将任务的“生产”与“消费”从概念上分离的能力。完成服务接口在递交方法的基础上额外定义了移除方法对递交至完成服务的任务进行“消费”(并返回相关的未来),使得任务的“生产”与“消费”之间失去了交集而由原本的同步关系转变为了异步关系。开发者可以通过实现移除方法的方式自定义异步“消费”的逻辑,例如可以按任务执行结束(完成/异常/取消)的顺序“消费”任务以实现上述举例中的需求,事实上完成服务接口唯一的实现类ExecutorCompletionService(执行器完成服务)也确实是如此实现的,该知识点会在相关文章中详述。为了实现获取一组递交任务中最早执行结束(完成/异常/取消)任务结果的需求,开发者需要在任务异步“消费”后调用异步未来的get()方法。此时的get()方法已不再具有同步“消费”的意义,只单纯的被用来获取异步“消费”任务的执行结果。

/**
 * A service that decouples the production of new asynchronous tasks from the consumption of the results of completed
 * tasks. Producers {@code submit} tasks for execution. Consumers {@code take} completed tasks and process their
 * results in the order they complete.  A {@code CompletionService} can for example be used to manage asynchronous I/O,
 * in which tasks that perform reads are submitted in one part of a program or system, and then acted upon in a different
 * part of the program when the reads complete, possibly in a different order than they were requested.
 * 一个用于将新异步任务的产生从已完成任务结果的消费(获取)中分离(解耦)的服务(即将任务执行与结果获取分离,不
 * 再通过等待任务执行结束再获取结果的方式获取结果)。生产者递交任务用于执行。消费者获取已完成的任务并按它们完成
 * 的顺序处理它们的结果。例如,完成服务可以用于管理异步I/O,在这种情况下,执行读取的任务在程序或系统的一部分中提
 * 交,然后在读取完成时在程序的另一部分中执行,可能以与请求顺序不同的顺序执行。
 * <p>
 * Typically, a {@code CompletionService} relies on a separate {@link Executor} to actually execute the tasks, in which
 * case the {@code CompletionService} only manages an internal completion queue. The {@link ExecutorCompletionService}
 * class provides an implementation of this approach.
 * 通常完成服务依赖一个单独的执行去实际执行任务,在任何情况下完成服务只能管理一个内部的已完成队列。执行器已完成
 * 服务类提供这种方法的实现。
 * <p>
 * Memory consistency effects: Actions in a thread prior to submitting a task to a {@code CompletionService}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> actions taken by that task, which in turn
 * <i>happen-before</i> actions following a successful return from the corresponding {@code take()}.
 * 内存一致性影响:递交任务至完成服务之前在线程中的动作先行发生于被该任务采取的动作(即任务的执行),反过来又先
 * 行发生于相关的take()的成功返回之后的行为。
 *
 * @Description: 完成服务接口
 */
public interface CompletionService<V> {
    ...
}

 方法

  • Future<V> submit(Callable<V> task) —— 递交 —— 向当前完成服务递交指定可回调/任务,并返回可追踪/获取指定可回调/任务执行状态/结果/异常的未来,但不推荐使用该未来等待指定可回调/任务执行结束。
    /**
     * Submits a value-returning task for execution and returns a Future representing the pending results of the task. Upon
     * completion, this task may be taken or polled.
     * 递交一个返回值任务用于执行并返回一个代表任务待定结果的未来。当计算完成之后,这个任务可以接受或轮询。
     *
     * @param task the task to submit 用于递交的任务
     * @return a Future representing pending completion of the task 一个代表任务待定结果的未来
     * @throws NullPointerException       if the task is null
     *                                    空指针异常:如果任务为null
     * @throws RejectedExecutionException if the task cannot be scheduled for execution
     *                                    拒绝执行异常:如果不能按计划执行任务
     * @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------
     * @Description: 递交
     * @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------
     * @Description: 向当前完成服务递交指定可回调/任务,并返回可追踪/获取可回调/任务执行状态/结果/异常的未来,但不
     * @Description: 推荐直接使用该未来。
     * @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------
     * @Description: ~
     */
    Future<V> submit(Callable<V> task);
  • Future<V> submit(Runnable task, V result) —— 递交 —— 向当前完成服务递交指定可运行/任务,并返回可追踪/获取指定可运行/任务执行状态/结果/异常的未来,但不推荐使用该未来等待指定可运行/任务执行结束。方法会同步传入用于承载指定可运行/任务执行结果/异常的变量,承载后指定可运行/任务执行结果/异常即可通过变量直接获取,也会向未来传递,因此该方法返回未来的get()方法可获取指定可运行/任务的执行结果/异常。
    /**
     * Submits a Runnable task for execution and returns a Future representing that task.  Upon completion, this task may
     * be taken or polled.
     * 递交一个可运行任务用于执行并返回一个代表任务的Future。当计算完成之后,这个任务可以获取或轮询。
     *
     * @param task   the task to submit 用于递交的任务
     * @param result the result to return upon successful completion 成功计算之后返回的结果
     * @return a Future representing pending completion of the task, and whose {@code get()} method will return the given
     * result value upon completion
     * 一个代表任务待定完成的Future,它的get()方法将在完成时返回指定的结果值。
     * @throws RejectedExecutionException if the task cannot be scheduled for execution
     *                                    拒绝执行任务:如果不能按计划执行任务
     * @throws NullPointerException       if the task is null
     *                                    空指针异常:如果任务为null
     * @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------
     * @Description: 递交
     * @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------
     * @Description: 向当前完成服务递交指定可运行/任务,并返回可追踪/获取可回调/任务执行状态/结果/异常的未来,但不
     * @Description: 推荐直接使用该未来。方法会同步传入用于承载可运行/任务执行结果/异常的变量,承载后可运行/任务执
     * @Description: 行结果/异常即可通过变量直接获取,也会向未来传递,因此该方法返回未来的get()方法可获取可运行/任
     * @Description: 务执行结果/异常。
     * @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------
     * @Description: ~
     */
    Future<V> submit(Runnable task, V result);
  • Future<V> poll() —— 轮询 —— 从当前完成服务中移除并获取下个结束(完成/异常/取消)任务。该方法是移除方法“特殊值”形式的实现,当当前完成服务存在结束(完成/异常/取消)任务时移除并返回下个结束(完成/异常/取消)任务;否则返回null。
    /**
     * Retrieves and removes the Future representing the next completed task, waiting if none are yet present.
     * 检索并移除代表下个已完成任务的Future,如果现在还没有则等待(即阻塞挂起)。
     *
     * @return the Future representing the next completed task 代表下个已完成任务的Future
     * @throws InterruptedException if interrupted while waiting
     *                              中断异常:如果等待时中断
     * @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------
     * @Description: 获取
     * @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------
     * @Description: 从当前完成服务中获取下个已结束(完成/异常/取消)任务的未来,获取的未来会从当前完成服务中移除。
     * @Description: 当没有已结束(完成/异常/取消)任务时方法会无限等待至存在已结束(完成/异常/取消)任务为止。
     * @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------
     * @Description: ~
     */
    Future<V> take() throws InterruptedException;
  • Future<V> take() throws InterruptedException —— 拿取 —— 从当前完成服务中移除并获取下个结束(完成/异常/取消)任务。该方法是移除方法“阻塞”形式的实现,当当前完成服务存在结束(完成/异常/取消)任务时移除并返回下个结束(完成/异常/取消)任务;否则等待至存在结束(完成/异常/取消)任务为止。
    /**
     * Retrieves and removes the Future representing the next completed task, or {@code null} if none are present.
     * 返回并移除代表下个已完成任务的Future,如果现在还没有则返回null。
     *
     * @return the Future representing the next completed task, or {@code null} if none are present
     * 代表下个已完成任务的Future,如果现在还没有则返回null
     * @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------
     * @Description: 轮询
     * @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------
     * @Description: 从当前完成服务中获取下个已结束(完成/异常/取消)任务的未来,获取的未来会从当前完成服务中移除。
     * @Description: 当没有已结束(完成/异常/取消)任务时方法会返回null。
     * @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------
     * @Description: ~
     */
    Future<V> poll();
  • Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException —— 轮询 —— 从当前完成服务中移除并获取下个结束(完成/异常/取消)任务。该方法是移除方法“超时”形式的实现,当当前完成服务存在结束(完成/异常/取消)任务时移除并返回下个结束(完成/异常/取消)任务;否则在指定等待时间内等待至存在结束(完成/异常/取消)任务为止,超出指定等待时间则返回null。
    /**
     * Retrieves and removes the Future representing the next completed task, waiting if necessary up to the specified wait
     * time if none are yet present.
     * 返回并移除代表下个已完成任务的Future,如果需要则等待指定的等待时间(如果现在还没有)。
     *
     * @param timeout how long to wait before giving up, in units of {@code unit} 在放弃之前要等待多久
     * @param unit    a {@code TimeUnit} determining how to interpret the {@code timeout} parameter  超时单位
     * @return the Future representing the next completed task or {@code null} if the specified waiting time elapses before
     * one is present
     * 代表下个已完成任务的Future,如果指定的等待时间过去则为null
     * @throws InterruptedException if interrupted while waiting 如果等待时中断
     * @Description: ------------------------------------------------------------- 名称 -------------------------------------------------------------
     * @Description: 轮询
     * @Description: ------------------------------------------------------------- 作用 -------------------------------------------------------------
     * @Description: 从当前完成服务中获取下个已结束(完成/异常/取消)任务的未来,获取的未来会从当前完成服务中移除。
     * @Description: 当没有已结束(完成/异常/取消)任务时方法会在指定等待时间内有限等待至存在已结束(完成/异常/取
     * @Description: 消)任务为止,超时则返回null。
     * @Description: ------------------------------------------------------------- 逻辑 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 注意 -------------------------------------------------------------
     * @Description: ~
     * @Description: ------------------------------------------------------------- 疑问 -------------------------------------------------------------
     * @Description: ~
     */
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
07-16 15:49