IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flinksql自定义mysql sink -> 正文阅读

[大数据]flinksql自定义mysql sink

因为我接下去想尝试搭建基于docker的mysql集群,因此连接的docker版本的mysql,出了几个小问题,一个是关闭ssl认证,一个修改mavn版本提升到8.0.11 否则会报caching_sha2_password 的错误

 <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
            <version>8.0.11</version>
</dependency>

回归正题,自定义jdbcsink难度不大,核心代码如下

class MyJdbcSinkFunc() extends RichSinkFunction[item_count]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _
  var count = 0

  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3309/alibaba?characterEncoding=utf8&useSSL=false", "root", "root")
    insertStmt = conn.prepareStatement("insert into Item_detail (item_id,cate_id,price,sale_count,save_count) values (?, ?,?,?,?)")
    updateStmt = conn.prepareStatement("update Item_detail set sale_count =sale_count +1  where item_id = ?")
  }

  override def invoke(value: item_count): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setLong(1, value.item_id)
    updateStmt.execute()


    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setLong(1, value.item_id)
      insertStmt.setLong(2, value.cate_id)
      insertStmt.setDouble(3, value.price)
      insertStmt.setInt(4, value.sale_count)
      insertStmt.setInt(5,value.save_count)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

全部代码如下

package redo.source



import bag.day01.State.RichMapper
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.util.Collector

import java.sql.{Connection, DriverManager, PreparedStatement, Statement}
import java.util.{Date, Properties}
import scala.util.Random


case class User(user_id:Long,item_id:Long,cate_id:Long,pv:Int,fav:Int,cart:Int,buy:Int,times:Date)
case class item_count(item_id:Long,cate_id:Long,price:Double,sale_count:Int,save_count:Int)
case class fl(item_id:Long,cate_id:Long)
object ClickHouseSources {
  def main(args: Array[String]): Unit = {
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val sort4 =  env.addSource(new mysource_funtions())
    val flu:DataStream[item_count] = sort4.map(x=>{

      val price = scala.util.Random.nextDouble()

      item_count(x.cate_id,x.cate_id,price*1000,1,1000)
    })
    flu.print("avg")
    flu.addSink( new MyJdbcSinkFunc() )

    env.execute("connect clickhouse")







  }
}
class MyJdbcSinkFunc() extends RichSinkFunction[item_count]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _
  var count = 0

  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3309/alibaba?characterEncoding=utf8&useSSL=false", "root", "root")
    insertStmt = conn.prepareStatement("insert into Item_detail (item_id,cate_id,price,sale_count,save_count) values (?, ?,?,?,?)")
    updateStmt = conn.prepareStatement("update Item_detail set sale_count =sale_count +1  where item_id = ?")
  }

  override def invoke(value: item_count): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setLong(1, value.item_id)
    updateStmt.execute()


    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setLong(1, value.item_id)
      insertStmt.setLong(2, value.cate_id)
      insertStmt.setDouble(3, value.price)
      insertStmt.setInt(4, value.sale_count)
      insertStmt.setInt(5,value.save_count)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}
class mysource_funtions extends  SourceFunction[fl]{
  var running = true


  override def cancel(): Unit = running=false
  override def run(ctx: SourceFunction.SourceContext[fl]): Unit = {

    Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
    //连接
    val connection: Connection = DriverManager.getConnection( "jdbc:clickhouse://hadoop102:8123/alibaba")
    val statement: Statement = connection.createStatement()
    //建立查询
    val sql = "select item_id,cate_id from user "
    var res1 = statement.executeQuery(sql)
    while(running){
      while (res1.next()) {

        val item_id = res1.getLong(1)
        val cate_id = res1.getLong(2)


         ctx.collect(fl(item_id,cate_id))
      }
      connection.close()
      cancel()


  }
  }
}


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-06 12:17:51  更:2021-10-06 12:18:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/19 20:25:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码