roman_日积跬步-终至千里

roman_日积跬步-终至千里


现在我们已经知道Flink中RPC通信框架的底层设计与实现,接下来通过具体的实例了解集群运行时中组件如何基于RPC通信框架构建相互之间的调用关系。

1. 集群内部通讯方法概述

通过RegisteredRpcConnection进行连接注册与通讯(维护心跳等)

如下图,集群运行时中各组件的注册连接主要如下三种RegisteredRpcConnection的实现。

如下图再有:

  1. 调用rpcService.connect(targetAddress, targetType) ,返回RpcGateway的代理对象,通过RpcGateway连接到目标RpcEndpoint上。
  2. 在RetryingRegistration中会提供invokeRegistration()抽象方法,用于实现子类的RPC注册操作。
  1. 调用onRegistrationSuccess()方法继续后续操作,例如在JobManagerRegisteredRpcConnection中会向jobLeaderListener添加当前的jobId等信息。
  2. 如果当前组件没有成功到注册至目标组件时,会调用onRegistrationFailure()抽象方法继续后续操作,包括连接重连或停止整个RpcEndpoint对应的服务。

【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信-LMLPHP

接着以TaskManager向ResourceManager注册RPC服务为例,介绍整个RPC连接的注册过程。
 

2. TaskManager向ResourceManager注册RPC服务

TaskManager向ResourceManager注册RPC服务的过程如图所示。
【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信-LMLPHP

  1. 创建与ResourceManager组件的RPC网络连接
  1. 创建TaskExecutorRegistration对象
  1. 正式建立网络连接
  1. 创建新的创建新的Registration与其他组件的RPC连接
  1. 调用RetryingRegistration.startRegistration()方法注册具体的RPC连接,实际上会调用AkkaRpcService.connect()方法创建并获取ResourceManager对应的RpcGateway接口。
  2. 调用ResourceManagerGateway.registerTaskExecutor()方法,最终完成在ResourceManager中注册TaskManager的操作。创建的TaskExecutorRegistration同时会传递给ResourceManager。
  3. 当ResourceManager接收到TaskManager的注册信息后,会在本地维护TaskManager的注册信息,并建立与TaskManager组件之间的心跳连接,至此完成了TaskManager启动后向ResourceManager进行RPC网络连接注册的全部流程。

如代码所示

private void connectToResourceManager() {
   assert(resourceManagerAddress != null);
   assert(establishedResourceManagerConnection == null);
   assert(resourceManagerConnection == null);
   log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
   // TaskExecutor注册信息
   final TaskExecutorRegistration taskExecutorRegistration = 
       new TaskExecutorRegistration(
      getAddress(),
      getResourceID(),
      taskManagerLocation.dataPort(),
      hardwareDescription,
      taskManagerConfiguration.getDefaultSlotResourceProfile(),
      taskManagerConfiguration.getTotalResourceProfile()
   );
   resourceManagerConnection =
      new TaskExecutorToResourceManagerConnection(
         log,
         getRpcService(),
         taskManagerConfiguration.getRetryingRegistrationConfiguration(),
         resourceManagerAddress.getAddress(),
         resourceManagerAddress.getResourceManagerId(),
         getMainThreadExecutor(),
         new ResourceManagerRegistrationListener(),
         taskExecutorRegistration);
   resourceManagerConnection.start();
}

接着看RegisteredRpcConnection.start()的代码逻辑,如代码所示。

public void start() {
   checkState(!closed, "The RPC connection is already closed");
   checkState(!isConnected() && pendingRegistration == null, 
              "The RPC connection is already started");
   // 创建RetryingRegistration
   final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
     // 启动RetryingRegistration
   if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
      newRegistration.startRegistration();
   } else {
      // 并行启动后,直接取消当前Registration
      newRegistration.cancel();
   }
}

关注:RetryingRegistration.startRegistration()逻辑。

public void startRegistration() {
        if (canceled) {
            return;
        }
        try {
            final CompletableFuture<G> rpcGatewayFuture;
            // 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway
            if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
                rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
                    targetAddress,
                    fencingToken,
                    targetType.asSubclass(FencedRpcGateway.class));
            } else {
                rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
            }
            // 成功获取网关后,尝试注册操作
            CompletableFuture<Void> rpcGatewayAcceptFuture = 
                rpcGatewayFuture.thenAcceptAsync(
                (G rpcGateway) -> {
                    log.info("Resolved {} address, beginning registration", 
                       targetName);
                    register(rpcGateway, 1, retryingRegistrationConfiguration.
                       getInitialRegistrationTimeoutMillis());
                },
                rpcService.getExecutor());
            // 如果注册失败,则进行Retry操作,除非取消操作
            rpcGatewayAcceptFuture.whenCompleteAsync(
                (Void v, Throwable failure) -> {
                    if (failure != null && !canceled) {
                        final Throwable strippedFailure =
                            ExceptionUtils.stripCompletionException(failure);
                        // 间隔指定时间后再次注册
                        startRegistrationLater(retryingRegistrationConfiguration.
                           getErrorDelayMillis());
                    }
                },
                rpcService.getExecutor());
        }
        catch (Throwable t) {
            completionFuture.completeExceptionally(t);
            cancel();
        }
    }

继续了解ResourceManagerRegistration.invokeRegistration()的具体实现。

protected CompletableFuture<RegistrationResponse> invokeRegistration(
      ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, 
    long timeoutMillis) throws Exception {
   Time timeout = Time.milliseconds(timeoutMillis);
   return resourceManager.registerTaskExecutor(
      taskExecutorRegistration,
      timeout);
}

对于其他组件之间的RpcConnection注册操作,例如TaskManager与JobMaster之间的RPC连接注册,基本上和ResourceManagerRegistration一样,这里暂不介绍。

接下来我们看JobMaster是如何向ResourceManager申请Slot计算资源的。

 

3. JobMaster向ResourceManager申请Slot计算资源

如代码所示

//从SlotPoolImpl.connectToResourceManager()可以看出,方法中分别遍历
//waitingForResourceManager集合中的PendingRequest,
//然后就每个PendingRequest调用requestSlotFromResourceManager()方法向
//ResourceManager申请PendingRequest中指定的Slot计算资源。
public void connectToResourceManager(
    @Nonnull ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
        for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
            requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        waitingForResourceManager.clear();
}

继续看SlotPoolImpl.requestSlotFromResourceManager()方法的实现,如下代码所示。

private void requestSlotFromResourceManager(
            final ResourceManagerGateway resourceManagerGateway,
            final PendingRequest pendingRequest) {
        checkNotNull(resourceManagerGateway);
        checkNotNull(pendingRequest);
        log.info("Requesting new slot [{}] and profile {} from resource manager.", 
                 pendingRequest.getSlotRequestId(), pendingRequest.
                    getResourceProfile());
        final AllocationID allocationId = new AllocationID();
        pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId,
                            pendingRequest);
        pendingRequest.getAllocatedSlotFuture().whenComplete(
            (AllocatedSlot allocatedSlot, Throwable throwable) -> {
                if (throwable != null 
                    || !allocationId.equals(allocatedSlot.getAllocationId())) {
                    resourceManagerGateway.cancelSlotRequest(allocationId);
                }
            });
        CompletableFuture<Acknowledge> rmResponse = 
            resourceManagerGateway.requestSlot(
            jobMasterId,
            new SlotRequest(jobId, allocationId, 
                            pendingRequest.getResourceProfile(), jobManagerAddress),
            rpcTimeout);
        FutureUtils.whenCompleteAsyncIfNotDone(
            rmResponse,
            componentMainThreadExecutor,
            (Acknowledge ignored, Throwable failure) -> {
                if (failure != null) {
                    slotRequestToResourceManagerFailed(pendingRequest.
                                                     getSlotRequestId(), failure);
                }
            });
}

从以上实例可以看出,集群运行时中各个组件之间都是基于RPC通信框架相互访问的。RpcEndpoint组件会创建与其他RpcEndpoint之间的RegisteredRpcConnection,并通过RpcGateway接口的动态代理类与其他组件进行通信。

 
参考:《Flink设计与实现:核心原理与源码解析》–张利兵

02-23 10:17