引言

在这之前,我一直都没有讲过 Scheduler 的作用,那么本章就开始讲解 Scheduler 的设计思路和基本结构。RxJS 的存在是为了处理异步 IO,而异步 IO 所包含的一系列 API 肯定也是要经过进一步的封装才能让 RxJS 中的异步操作使用。

可以看到,它主要还是根据 JS 的所能够提供的异步能力来设计这些基本结构。

  • AsyncScheduler:异步调度器,使用 setInterval 实现。
  • QueueScheduler:队列异步调度器,继承了 AsyncScheduler,但是 QueueAction 是一种链式结构,使得调度以迭代器的形式进行。
  • AnimationFrameScheduler:使用 reqeustAnimationFrame 实现了帧调度器。
  • AsapScheduler:使用 Promise.resolve().then() 实现的微任务调度器。

SchedulerLike 、 Scheduler & Action

首先,SchedulerLike 提供了以下两个接口。

export interface SchedulerLike {
  // 标记当前时间
  now(): number;


  // 开启调度的基础接口
  schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription;
}

Scheduler 则实现了这些接口。

export class Scheduler implements SchedulerLike {


  // 获取当前时间戳
  public static now: () => number = () => Date.now();


  constructor(
    private SchedulerAction: typeof Action,
    now: () => number = Scheduler.now
) {
    this.now = now;
  }


  public now: () => number;
  // 直接调用 action 的 schedule
  public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new this.SchedulerAction<T>(this, work).schedule(state, delay);
  }
}

Scheduler 为后续的继承它的调度器定义了创建方式,通过传入一个 Action 工厂,使得内部可以构造特定的 Action 。而 Action 继承了 Subscription,意味着 Action 实际上是一种的订阅器。

export class Action<T> extends Subscription {
  constructor(scheduler: Scheduler, work: (this: SchedulerAction<T>, state?: T) => void) {
    super();
  }
  // Action 开始调度
  public schedule(state?: T, delay: number = 0): Subscription {
    return this;
  }
}

上面的设计是一种名为 Template Method 的设计模式,这种方法有效地约束了后续的不同的 Scheduler 的实现。

异步调度器

先来了解一下 Scheduler 的子类 AsyncScheduler,余下所有的 Scheduler 都会继承它。在这里,先不急着进行源码分析,我们需要先为了弄清楚调度器的运行原理,了解调度器是如何对异步 API 进行封装的。

首先,调度器本身也是基于观察者模式来进行设计,但是它又独立于 Rxjs 的 Observable。一般来说, AsyncScheduler 是这样调用的。

const scheduler = AsyncScheduler(AsyncAction);
const subscription = async.schedule(function (counter) {
    console.log(counter);
    // this 绑定了 AsyncAction
    this.schedule(counter + 1, 1000);
}, 1000, 1);


// subscription.unsubscribe();

它的调用栈是这样的。

AsyncScheduler.schedule
AsyncAction.schedule
AsyncAction.requestAsyncId
listOnTimeout // 原生事件
processTimers // 原生事件
AsyncScheduler.flush
AsyncAction.execute
AsyncAction.\_execute
AsyncAction.work

AsyncAction.schedule

跟着调用栈分析源码来溯源,在 AsyncScheduler 的 schedule 方法中,它先构造了 AsyncAction ,然后调用它的 schedule 。在这个方法中,实际上是对 Action 的内部状态进行更新,所以此处关注的地方就是在于 schedule 如何触发异步 API。

class AsyncAction<T> extends Action<T> {
  constructor(
    protected scheduler: AsyncScheduler,
    protected work: (this: SchedulerAction<T>, state?: T) => void
  ) {
    super(scheduler, work);
  }


  public schedule(state?: T, delay: number = 0): Subscription {
    if (this.closed) {
      return this;
    }
    this.state = state;
    const id = this.id;
    const scheduler = this.scheduler;
    // 需要对相应的异步 API 进行取消操作
    if (id != null) {
      this.id = this.recycleAsyncId(scheduler, id, delay);
    }
    this.pending = true;
    this.delay = delay;
    // 重新配置异步 API
    this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);


    return this;
  }
}

可以看到,从 scheduler 传入的回调函数最终会被 Action 持有,所以调用栈最终执行的 work 实际上就是回调函数。

AsyncAction.requestAsyncId

requestAsyncId 是调用异步 API 的方法,这个方法在 AsyncAction 最终触发了 setInterval 这一异步 API。那么实际上,根据 Template Method 的设计,所有继承 AsyncAction 的 Action 都会通过这个方法实现相对应的异步 API 。

至于 AsyncAction 为什么会使用 setInterval 而不是 setTimeout,源代码里是这样说明的。

class AsyncAction<T> extends Action<T> {
  protected requestAsyncId(
    scheduler: AsyncScheduler,
    id?: any,
    delay: number = 0
  ): any {
    // 绑定 scheduler,并且把当前的 AsyncAction 当作参数传入。
    return setInterval(scheduler.flush.bind(scheduler, this), delay);
  }
}

AsyncScheduler.flush

所以,在 AsyncScheduler 中,新增的 flush 方法实际上是为 setInterval 服务的,它作为异步 API 的回调函数,主要步骤如下。

  • 如果存在运行中的 Action ,它会保存所用调用它的 Action。
  • 如果不存在运行中的 Action,它会执行所有调用队列中的 Action.execute
  • 处理 Action.execute 的运行错误。
export class AsyncScheduler extends Scheduler {


  public flush(action: AsyncAction<any>): void {


    const {actions} = this;


    if (this.active) {
      // 使用了一个队列保存所有输入的 Actions
      actions.push(action);
      return;
    }


    let error: any;
    this.active = true;
    // 默认 action 也是队列中的一员
    // 将所有队列中的 Action 进行调用。
    do {
      if (error = action.execute(action.state, action.delay)) {
        break;
      }
    } while (action = actions.shift());


    this.active = false;


    // 出现错误时,取消所有未运行 action 的订阅
    if (error) {
      // 注意,此处不会重复取消订阅,因为执行错误的Action会先退出队列,再执行循环。
      while (action = actions.shift()) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

AsyncAction.execute

上述的 flush 调用了 action 的 execute 方法。该方法也是通过处理 action 的内部状态来获得执行结果,其中会调用 _execute 这一内部方法,这个内部方法主要作用是调用 AsyncAction.work ,并处理它出现的异常。

class AsyncAction<T> extends Action<T> {
  public execute(state: T, delay: number): any {
    if (this.closed) {
      return new Error('executing a cancelled action');
    }
    this.pending = false;
    // 获取异常错误
    const error = this.\_execute(state, delay);
    if (error) {
      return error;
    } else if (this.pending === false && this.id != null) {
      this.id = this.recycleAsyncId(this.scheduler, this.id, null);
    }
  }


  protected \_execute(state: T, delay: number): any {
    let errored: boolean = false;
    let errorValue: any = undefined;
    try {
      // work
      this.work(state);
    } catch (e) {
      errored = true;
      errorValue = !!e && e || new Error(e);
    }
    if (errored) {
      this.unsubscribe();
      return errorValue;
    }
  }
}

AsyncAction.recycleAsyncId

在分析到 Action.schedule 的时候,引用了源码内部的注释,其中有一句话很重要,那就是 “如果 action 以相同的时延调度本身,那么当前定时器不会被取消”,所以 recycleAsyncId 这个方法是需要处理这种情况。

class AsyncAction<T> extends Action<T> {
  protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: number = 0): any {
    // this.delay === delay 处理了这种情况。
    if (delay !== null && this.delay === delay && this.pending === false) {
      return id;
    }
    // 取消当前的定时器
    clearInterval(id);
    return undefined;
  }
}

运用 Template Method

AsyncScheduler 可以说已经把所有的地基都打好了,它可以直接拿来用,也可以继承并重写一些相关的接口把相应的异步 API 进行替换。

队列调度器

队列调度器根据调用者传入的时延来决定使用同步方式的调度还是 setInterval 方式的调度。

QueueScheduler 单纯继承了 AsyncScheduler,其主要实现在 QueueAction 中,通过重写 scheduleexecute 以及 requestAsyncId 等方法来实现这种功能。

export class QueueAction<T> extends AsyncAction<T> {
  public schedule(state?: T, delay: number = 0): Subscription {
    // delay > 0 ,执行异步调度
    if (delay > 0) {
      return super.schedule(state, delay);
    }
    this.delay = delay;
    this.state = state;
    // 否则直接执行同步调度
    this.scheduler.flush(this);
    return this;
  }


  public execute(state: T, delay: number): any {
    // 根据传入的 delay 判断是否直接执行 work (同步执行)
    return (delay > 0 || this.closed) ?
      super.execute(state, delay) :
      this.\_execute(state, delay) ;
  }


  protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any {
    // 根据传入的 delay 以及本身的 delay 来决定是否使用异步
    if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    // delay 为 0,直接同步调度
    return scheduler.flush(this);
  }
}

帧调度器 与 微任务调度器

帧调度器根据调用者传入的时延来决定使用 requestAnimationFrame 还是 setInterval ,微任务调度器则是根据时延来决定使用 Promise.reslove().then() 还是 setInterval

两者的调用类似,以至于可以结合起来分析。

Action

它们的 action 方法均重写了requestAsyncIdrecycleAsyncId, 主要还是为了处理不同异步 API 。

protected requestAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
  if (delay !== null && delay > 0) {
    return super.requestAsyncId(scheduler, id, delay);
  }
  // 把当前action 加入到 actions 队列末端
  scheduler.actions.push(this);


  if (!scheduler.scheduled) {
      // AsapAction 的情况
      const scheduled = Immediate.setImmediate(scheduler.flush.bind(scheduler, null));


      // AnimationFrameAction 的情况
      const scheduled = requestAnimationFrame(scheduler.flush.bind(scheduler, null));


      scheduler.scheduled = scheduled;
  }
  return scheduler.scheduled;
}


protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
  if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
    return super.recycleAsyncId(scheduler, id, delay);
  }
  if (scheduler.actions.length === 0) {
    // AsapAction
    Immediate.clearImmediate(id);
    // AnimationFrameAction
    cancelAnimationFrame(id);


    scheduler.scheduled = undefined;
  }
  return undefined;
}

Scheduler

它们的 flush,跟 AsyncScheduler 的 flush 实现思路差不多,依旧是轮询 actions 队列调用 action.execute ,只是它们的 flush 需要去处理额外的以下细节。

  • action 传入可能为空。
  • 处理 actions 的状态。
  • 清空 scheduled,使得 scheduler 能够进行下一次调度。
// export class AnimationFrameScheduler extends AsyncScheduler {
export class AsapScheduler extends AsyncScheduler {
  public flush(action?: AsyncAction<any>): void {
    this.active = true;
    this.scheduled = undefined;


    const {actions} = this;
    let error: any;
    let index: number = -1;
    // 此处顺序不能打乱,因为这样
    action = action || actions.shift()!;
    let count: number = actions.length;


    do {
      if (error = action.execute(action.state, action.delay)) {
        break;
      }
    } while (++index < count && (action = actions.shift()));


    this.active = false;


    if (error) {
      while (++index < count && (action = actions.shift())) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

Immediate

这里很有意思的一点, AsapScheduler 并没有直接通过 Promise.reslove().then() 来实现。而是把它封装成 Immediate,形成 setImmediateclearImmediate 两个 API ,这样就使得微任务的调用其他的定时 API 无异。

内部实现是通过一个 Map 保存标记当前的是第几个微任务,这里并不直接保存 Promise,因为 Promise 执行完毕后就自行释放了,所以它需要的只是一个标记。

let nextHandle = 1;
const RESOLVED = (() => Promise.resolve())();
const activeHandles: { \[key: number\]: any } = {};


function findAndClearHandle(handle: number): boolean {
  if (handle in activeHandles) {
    delete activeHandles\[handle\];
    return true;
  }
  return false;
}


export const Immediate = {
  setImmediate(cb: () => void): number {
    const handle = nextHandle++;
    activeHandles\[handle\] = true;
    RESOLVED.then(() => findAndClearHandle(handle) && cb());
    return handle;
  },


  clearImmediate(handle: number): void {
    findAndClearHandle(handle);
  },
};

总结

本篇分析了 RxJS 的调度器相关的一系列内容,通过封装 JS 异步 API ,调度器实现相对应的异步功能,增强了 RxJS 对异步 IO 的掌控。

加入我们

我们是DevUI团队,欢迎来这里和我们一起打造优雅高效的人机设计/研发体系。招聘邮箱:muyang2@huawei.com

作者:zcx(公众号:Coder写字的地方)

原文链接:https://mp.weixin.qq.com/s/vG0aaQmDy7Cqfv0CwJ_d0Q

往期文章推荐

《RxJS 源码解析(五)—— Operator III》

《Web界面深色模式和主题化开发》

《手把手教你搭建一个灰度发布环境》

03-05 21:23