StandaloneAppClient是什么?这个很容易搞混淆。其实StandaloneAppClient不是SparkApplication,它主要是用在ScheduleBackend中的。

独立集群环境中,ScheduleBackend是用的StandaloneScheduleBackend,它继承了CoarseGrainedSchedulerBackend类。

StandaloneScheduleBackend里面用了一个叫StandaloneAppClient的类,这个StandaloneAppClient很具有迷惑性,其实它的主要功能是替换CoarseGrainedSchedulerBackend的资源申请的方法,改为向Master申请资源,我们看看相关代码片段就行了。

先看他启动的时候:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
          }
        })
      }
    }

向Master发送RegisterApplication消息,将本appDesc注册给Master,这个和DriverDescription注册到Master是有点区别的。

再比如资源申请的代码:

 def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
    if (endpoint.get != null && appId.get != null) {
      endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
    } else {
      logWarning("Attempted to request executors before driver fully initialized.")
      Future.successful(false)
    }
  }

就是向Master发送RequestExecutor消息申请Executor资源。

这里为啥要注册Application到Master呢?主要是当Master失效或者Master更改时,能通知到Application,这样就能重新连接新的Master了,重新运行spark程序。否则就很脆弱,很容易崩溃,这是我的理解哦,不一定正确,^~^

09-25 00:21