Spark创建ApplicationMaster源码解析

原文链接:https://www.leahy.club/archives/spark-core-applicationmaster

源文件:SparkSubmit.scala
SparkSubmit是一个伴生对象,可以静态地访问其属性和方法。SparkSubmit是Spark程序运行起来之后或者打开Spark Shell之后启动的第一个进程。可以通过jps查看,后台中是存在SparkSubmit进程的。

针对Yarn集群部署的cluster模式

-- SparkSubmit.scala
//启动进程
-- main(args:Array[String]) //args参数就是我们提交应用程序时提交的参数
	-- new SparkSubmitArguments() //对args进行解析、封装
	-- submit() //提交
		-- prepareSubmitEnvironment //准备提交环境
			-- childMainClass = "org.apache.spark.deploy.yarn.Client" //Yarn集群部署cluster模式
			-- childMainClass = orgs.mainClass //Yarn集群部署client模式,那么这个class实际就是我们在提交spark任务时配置的主类
		-- doRunMain(runMain()) //运行主程序
			-- mainClass = Utils.classForName(childMainClass) //反射加载类,Yarn client的主类或者提交的任务的主类
			-- mainclass.getMethod("main", new Array[String](0).getClass) //反射获取main方法
			-- mainMethod.invoke() //反射执行main方法
			
-- Client.scala //运行mainMethod.invoke()之后
-- main()
	-- new ClientArguments(argStrings) //封装运行参数
	-- new Client() //创建client对象
		-- yarnClient = YarnClient.createYarnClient()
	-- client.run()
		-- submitApplication() //运行client并提交应用
			// 封装指令 command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster (cluster模式)
			// 或者	 command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher (client模式)
			-- createContainerLaunchContext
			-- createApplicationSubmissionContext
			-- yarnClient.submitApplication(appContext) //Yarn客户端向Yarn提交应用(实质上是提交给RM了)
			


结合源码详解:

Spark应用程序的入口是SparkSubmit.scala,其main方法如下所示:

override def main(args: Array[String]): Unit = {
    // 对提交的应用程序的参数进行封装
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      //打印参数信息
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
        //对appArgs中的action参数进行模式匹配
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

首先来看看main方法中的SparkSubmitArguments.scala,如下代码所示。在参数配置类中可以看到master:设置集群管理方式;executorCores:设置Executor的核心数等参数。

private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser {
  var master: String = null
  var deployMode: String = null
  var executorMemory: String = null
  var executorCores: String = null
  var totalExecutorCores: String = null
....

接下来看看SparkSubmitAction.SUBMIT模式对应的submit方法具体内容。如下代码所示。其中关键的看runMain方法,因为代码无论怎么执行都会进入这个方法。

private def submit(args: SparkSubmitArguments): Unit = {
    //准备submit的参数
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              // scalastyle:off println
              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              // scalastyle:on println
              exitFn(1)
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }

     // In standalone cluster mode, there are two submission gateways:
     //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
     //   (2) The new REST-based gateway introduced in Spark 1.3
     // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
     // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    if (args.isStandaloneCluster && args.useRest) {
      try {
        // scalastyle:off println
        printStream.println("Running Spark using the REST application submission protocol.")
        // scalastyle:on println
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args)
      }
    // In all other modes, just run the main class as prepared
    } else {
      doRunMain()
    }
  }

runMain方法中最核心的代码是利用反射来获取args参数中获取得到的childMainClass的具体类型,并获取其类加载器和main方法。所以需要进入到prepareSubmitEnvironment中再次查看childMainClass类具体是什么类。

private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {

    //获取类加载器
    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    try {
        //获取childMainClass的实体
      mainClass = Utils.classForName(childMainClass)

    //获取main方法
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

    //运行main方法
    try {
      mainMethod.invoke(null, childArgs.toArray)
    

进入到prepareSubmitEnvironment中可以看到,关于childMainClass有一系列的判断。

如果采用Yarn的Client模式那么对应的childMainClass就是args.mainClass,在Cluster模式下就是"org.apache.spark.deploy.yarn.Client"。

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
      : (Seq[String], Seq[String], Map[String, String], String) = {
    // Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sysProps = new HashMap[String, String]()
    var childMainClass = ""
    ...
    //判断部署模式,如果是client模式就是直接赋值为args.mainClass
    if (deployMode == CLIENT || isYarnCluster) {
      childMainClass = args.mainClass
      if (isUserJar(args.primaryResource)) {
        childClasspath += args.primaryResource
      }
      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
    }
    ...
    //
    if (isYarnCluster) {
      childMainClass = "org.apache.spark.deploy.yarn.Client"
          

由于我们讨论的是Yarn的Cluster部署模式,在反射获取到Client类之后,就会被invoke运行。那么我们下一步进入到org.apache.spark.deploy.yarn.Client类中。从下代码中可以看出有一个ClientArgument方法进行参数封装,类似于SparkSubmitArgument。之后会运行CLientrun方法。下面我们再进入到run方法中。

private object Client extends Logging {

  def main(argStrings: Array[String]) {
    if (!sys.props.contains("SPARK_SUBMIT")) {
      logWarning("WARNING: This client is deprecated and will be removed in a " +
        "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
    }

    // Set an env variable indicating we are running in YARN mode.
    // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
    System.setProperty("SPARK_YARN_MODE", "true")
    val sparkConf = new SparkConf
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    sparkConf.remove("spark.jars")
    sparkConf.remove("spark.files")
      //参数封装
    val args = new ClientArguments(argStrings)
      //运行client
    new Client(args, sparkConf).run()
  }

run方法中的第一步是submitApplication方法,我们再进入。

def run(): Unit = {
    this.appId = submitApplication()

下面的这段代码非常关键:

createContainerLaunchContext中会新建一个Container并在Container中新建一个AM进程。

def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    
	...
    
      // Set up the appropriate contexts to launch our AM
      //启动AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

        //提交任务到RM中
      yarnClient.submitApplication(appContext)
     

如下是createContainerLaunchContext中的代码。

可以看到在Cluster模式下,AM就是ApplicationMaster类,Client模式下就是Executor类。

private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
    
    val appId = newAppResponse.getApplicationId
   
    //一个Container运行一个JVM进程,设置JVM运行参数,可以看到采用了CMS GC
    if (useConcurrentAndIncrementalGC) {
      // In our expts, using (default) throughput collector has severe perf ramifications in
      // multi-tenant machines
      javaOpts += "-XX:+UseConcMarkSweepGC"
      javaOpts += "-XX:MaxTenuringThreshold=31"
      javaOpts += "-XX:SurvivorRatio=8"
      javaOpts += "-XX:+CMSIncrementalMode"
      javaOpts += "-XX:+CMSIncrementalPacing"
      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
    }
    ...
    //反射获取AM的类
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

AM是一个进程,启动AM之后的代码下一节分析。