使用idea操作,需要导入依赖包mysql-connector-java-5.1.25.jar
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object SparkToMysql {
def main(args: Array[String]): Unit = {
//创建SparkSession对象
val spark = SparkSession.builder().appName("sparktohive")
.master("local[*]").getOrCreate()
//获取数据库连接,LinuxIp指的是你的虚拟机ip,3306是端口号,并指定某个库
val url="jdbc:mysql://LinuxIP:3306/mybatisdb"
//获取mysql驱动
val driver="com.mysql.jdbc.Driver"
//用户名
val user="****"
//密码
val pwd="****"
//编写配置文件
val properties=new Properties()
properties.setProperty("user",user)
properties.setProperty("password",pwd)
properties.setProperty("driver",driver)
//获得一个dateframe,通过操作dateframe来操作表
val tblsDF: DataFrame = spark.read.jdbc(url,"student",properties)
tblsDF.show()
import org.apache.spark.sql.functions._
//利用算子进行计算
val frame: DataFrame = tblsDF.agg(
max("age").as("maxage"),
min("age").as("minage"),
avg("age").as("avgage")
)
frame.show()
//可以将计算得来的新表再次写入到你的mysql数据库中,其中mode方法中的SavaMode.Overwrite可以覆盖原来的表
// frame.write.mode(SaveMode.Overwrite).jdbc(url,"maxage",properties)
|