在开发spark过程中,遇到需要连接postgres库的场景,可以通过原生的jdbc去连接,也可以使用spark直连。
1.scala 原生jdbc连接
下面是 通过DriverManager连接pg
try{
val conn_prop = new Properties()
conn_prop.put("user","xxxx")
conn_prop.put("password","xxxx")
val conn_url = "jdbc:postgresql://127.0.0.1:5432/xxxx"
val con:Connection = DriverManager.getConnection(conn_url,conn_prop )
val query_stm:Statement = con.createStatement()
val rs1= query_stm.excuteQuery("select id,name,money from test_zero where id<1")
while(rs1.next()){
val r_id = rs1.getInt("id")
val r_name = rs1.getString("name")
val r_sex = rs1.getDouble("money")
println(r_id+","+r_name+","+r_money)
}
val update_stm:Statement = con.createStatement()
val rs2 = update_stm.excuteUpdate("update test_zero set name='zero_update',money=100000.00 where name='zero'")
}
finally{
conn.close()
}
下面是 通过Spark直接连接pg,有两种方式
val conf = new SparkConf().setAppName("conncetPG").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
val conn_prop = new Properties()
conn_prop.put("user","xxxx")
conn_prop.put("password","xxxx")
conn_prop.put("driver","org.postgresql.Driver")
val df1 = spark.read.jdbc(url="jdbc:postgresql://127.0.0.1:5432/xxxx",table="test_zero",conn_prop)
df.select("id","name","money").show()
val df2 = spark.read
.format("jdbc")
.option("url","jdbc:postgresql://127.0.0.1:5432/xxxx")
.option("dbtable","test_zero")
.option("user","xxxx")
.option("password","xxxx")
.load()
val n_df2 = df2.select("id","name","money").filter("name='zero'")
n_df2.createOrReplaceTempView("tmp_test_zero")
spark.sql("select * from tmp_test_zero where name like '%zero%'").show()
|