这篇文章主要讲解了“Standalone client模式下怎么提交spark程序”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Standalone client模式下怎么提交spark程序”吧!
standalone client模式下,使用ClientApp提交spark程序。
此类在deploy/Client.scala文件中。
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
代码很简单。start方法就是创建一个ClientEndpoint,然后与Master交互。
ClientEndpoint的主要功能和方法:
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = config.DRIVER_CLASS_PATH.key
val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = config.DRIVER_LIBRARY_PATH.key
val libraryPathEntries = getProperty(libraryPathConf, conf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key
val extraJavaOpts = getProperty(extraJavaOptsConf, conf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf,
config.SPARK_DRIVER_PREFIX)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command,
driverResourceReqs)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
封装一个org.apache.spark.deploy.worker.DriverWrapper类,然后将DriverDescription消息发送给Master,在Master上启动这个DriverWrapper。DriverWrapper很简单了,这里就不细说了,作用就是起一个线程,执行我们的spark程序的main方法。
感谢各位的阅读,以上就是“Standalone client模式下怎么提交spark程序”的内容了,经过本文的学习后,相信大家对Standalone client模式下怎么提交spark程序这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!