依赖类 org.apache.spark.launcher.SparkLauncher
启动类:
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
object SparkLuncherTest {
def main(args: Array[String]): Unit = {
val launcher = new SparkLauncher()
.setAppName("myTest")
.setSparkHome("C:\\spark-2.4.7-bin-3.0.0\\")
.setMaster("local")
.setAppResource("D:\\workspace\\data_analysis\\offlineJob\\target\\offlineJob.jar")
.setMainClass("com.test.ValidationSet.SparkTest")
// .setDeployMode("cluster")
.startApplication(new SparkAppHandle.Listener {
override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = {
println("state change")
}
override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {
println("info change")
}
})
while(!"FINISHED".equalsIgnoreCase(launcher.getState.toString)&& !"FAILED".equalsIgnoreCase(launcher.getState.toString)){
println(launcher.getState.toString)
Thread.sleep(1000*200L)
}
println(launcher.getState.toString)
}
}
逻辑执行代码:
?
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
// .setAppName("aaa")
// .setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val value: RDD[String] = sc.parallelize(Seq("aaa","bbb"))
println("mt:"+value.collect().toBuffer)
sc.stop()
}
}
?
开始执行:执行命令
java -cp D:\workspace\data_analysis\offlineJob\target\offlineJob.jar com.sobot.ValidationSet.SparkLuncherTest
打印日志:
如下打印了SparkTest类的println?
|