Spark三种连接方式
1、Spark core
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("name")
val sparkContext = new SparkContext(sparkConf)
val value: RDD[String] = sparkContext.textFile("src/main/resources/pvuvdata")
val value: RDD[String] = sparkContext.parallelize(arr)
value.foreach(println)
sparkContext.stop()
2、spark SQL
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("hello01SparkSql").getOrCreate()
val value: RDD[String] = sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
val dFt1: DataFrame = sparkSession.read.format("格式").load("Path")
val dFt2: Dataset[Row] = sparkSession.read.格式("Path").as("Class类")
df.show()
df.write().mode(SaveMode.Overwrite).format("格式").save("path");
df.write().mode(SaveMode.Overwrite).格式("path");
sparkSession.stop()
2.1. 普通文本
sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
empSal.saveAsTextFile("src/main/resources/t_emp_sal.txt")
2.2. json
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("hello01_sql").getOrCreate()
val dFt1: DataFrame = sparkSession.read.json("src/main/resources/t_emp1.txt").as("Emp")
dFt1.write.mode(SaveMode.Overwrite).json("src/main/resources/t_emp.txt")
dFt1.show()
}
2.3. parquet
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
val dFt2: Dataset[Row] = sparkSession.read.parquet("src/main/resources/t_emp.txt").as("Emp")
dFt2.write.mode(SaveMode.Overwrite).parquet("src/main/resources/t_emp1.txt")
dFt2.show()
}
2.4. jdbc
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
val map = new mutable.HashMap[String, String]()
map.put("url", "jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8")
map.put("driver", "com.mysql.cj.jdbc.Driver")
map.put("user", "root")
map.put("password", "root")
map.put("dbtable", "emp")
val dFt1: DataFrame = sparkSession.read.format("jdbc").options(map).load()
val properties = new Properties()
properties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
properties.setProperty("user", "root")
properties.setProperty("password", "root")
val dFt2: DataFrame = sparkSession.read
.jdbc("jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8", "emp", properties)
dFt2.write.mode(SaveMode.Overwrite)
.jdbc("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false","t_emp",properties)
map.put("dbtable", "t_emp3")
dFt1.write.mode(SaveMode.Overwrite).format("jdbc")
.options(map).save("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false")
dFt1.show()
dFt2.show()
}
2.5. hive
- 拷贝配置文件,从linux上拷贝hadoop、hive配置
- hdfs-site.xml
- core-site.xml
- hive-site.xml
package com.yjxxtimport org.apache.spark.sql.SparkSession
object HelloSourceHive {
def main(args: Array[String]): Unit = {
val spark =SparkSession.builder().master("local").appName("HelloSourceHive").enableHiveSupport().getOrCreate()
spark.sql("use yjx")
val dataFrame = spark.sql("select * from t_user")
dataFrame.show()
sparkSession.stop()
}
}
3、spark streaming
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(5))
val value: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 9999)
val dStream: DStream[(String, Int)] = value.flatMap(_.split("\\s")).map((_, 1)).reduceByKey((x, y) => x + y)
dStream.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop(false)
}
3.2. streaming+kafka
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("stream05_kafka")
.set("spark.streaming.stopGracefullyOnShutdown","true")
val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(3))
val kafkaPar: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "yjx_bigdata",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: lang.Boolean)
)
val topics: Array[String] = Array("userlog")
val kfDStream: InputDStream[ConsumerRecord[String, String]]
= KafkaUtils.createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaPar))
val result: DStream[(String, Int)] = kfDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop(false)
}
|