1.person表
2.score表
3.scala代码
package sparkSql
import org.apache.spark.sql.{DataFrame, DataFrameReader, SaveMode, SparkSession}
import java.util.Properties
object ReadMysqlDataToDF {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().config("spark.sql.shuffle.partitions",1).master("local").appName("ttt").getOrCreate()
session.sparkContext.setLogLevel("Error")
var url = "jdbc:mysql://node1/spark?serverTimezone=GMT%2B8&useSSL=false"
val p = new Properties()
p.setProperty("user","root")
p.setProperty("password","123456")
val people: DataFrame = session.read.jdbc(url, "person", p)
people.show()
people.createTempView("peopleT")
val map = Map[String, String](
"user"->"root",
"password"->"123456",
"url"->url,
"driver"->"com.mysql.jdbc.Driver",
"dbtable"->"score"
)
val score: DataFrame = session.read.format("jdbc").options(map).load()
score.createTempView("scoreT")
val frame: DataFrame = session.sql(
"""
|select p.id,p.name,p.age,s.score FROM peopleT p,scoreT s where s.id=p.id
|""".stripMargin)
frame.show()
frame.write.mode(SaveMode.Append).jdbc(url,"result",p)
val reader: DataFrameReader = session.read.format("jdbc")
.option("user", "root")
.option("password", "123456")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "person")
val person: DataFrame = reader.load()
person.show()
}
}
4.写入MySQL中的result表数据
|