因为我接下去想尝试搭建基于docker的mysql集群,因此连接的docker版本的mysql,出了几个小问题,一个是关闭ssl认证,一个修改mavn版本提升到8.0.11 否则会报caching_sha2_password 的错误
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
<version>8.0.11</version>
</dependency>
回归正题,自定义jdbcsink难度不大,核心代码如下
class MyJdbcSinkFunc() extends RichSinkFunction[item_count]{
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
var count = 0
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://localhost:3309/alibaba?characterEncoding=utf8&useSSL=false", "root", "root")
insertStmt = conn.prepareStatement("insert into Item_detail (item_id,cate_id,price,sale_count,save_count) values (?, ?,?,?,?)")
updateStmt = conn.prepareStatement("update Item_detail set sale_count =sale_count +1 where item_id = ?")
}
override def invoke(value: item_count): Unit = {
updateStmt.setLong(1, value.item_id)
updateStmt.execute()
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setLong(1, value.item_id)
insertStmt.setLong(2, value.cate_id)
insertStmt.setDouble(3, value.price)
insertStmt.setInt(4, value.sale_count)
insertStmt.setInt(5,value.save_count)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
全部代码如下
package redo.source
import bag.day01.State.RichMapper
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.util.Collector
import java.sql.{Connection, DriverManager, PreparedStatement, Statement}
import java.util.{Date, Properties}
import scala.util.Random
case class User(user_id:Long,item_id:Long,cate_id:Long,pv:Int,fav:Int,cart:Int,buy:Int,times:Date)
case class item_count(item_id:Long,cate_id:Long,price:Double,sale_count:Int,save_count:Int)
case class fl(item_id:Long,cate_id:Long)
object ClickHouseSources {
def main(args: Array[String]): Unit = {
val env =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sort4 = env.addSource(new mysource_funtions())
val flu:DataStream[item_count] = sort4.map(x=>{
val price = scala.util.Random.nextDouble()
item_count(x.cate_id,x.cate_id,price*1000,1,1000)
})
flu.print("avg")
flu.addSink( new MyJdbcSinkFunc() )
env.execute("connect clickhouse")
}
}
class MyJdbcSinkFunc() extends RichSinkFunction[item_count]{
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
var count = 0
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://localhost:3309/alibaba?characterEncoding=utf8&useSSL=false", "root", "root")
insertStmt = conn.prepareStatement("insert into Item_detail (item_id,cate_id,price,sale_count,save_count) values (?, ?,?,?,?)")
updateStmt = conn.prepareStatement("update Item_detail set sale_count =sale_count +1 where item_id = ?")
}
override def invoke(value: item_count): Unit = {
updateStmt.setLong(1, value.item_id)
updateStmt.execute()
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setLong(1, value.item_id)
insertStmt.setLong(2, value.cate_id)
insertStmt.setDouble(3, value.price)
insertStmt.setInt(4, value.sale_count)
insertStmt.setInt(5,value.save_count)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
class mysource_funtions extends SourceFunction[fl]{
var running = true
override def cancel(): Unit = running=false
override def run(ctx: SourceFunction.SourceContext[fl]): Unit = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
val connection: Connection = DriverManager.getConnection( "jdbc:clickhouse://hadoop102:8123/alibaba")
val statement: Statement = connection.createStatement()
val sql = "select item_id,cate_id from user "
var res1 = statement.executeQuery(sql)
while(running){
while (res1.next()) {
val item_id = res1.getLong(1)
val cate_id = res1.getLong(2)
ctx.collect(fl(item_id,cate_id))
}
connection.close()
cancel()
}
}
}
|