IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink大数据实时标签实时ETL --03加载规则类 (source Mysql) -> 正文阅读

[大数据]Flink大数据实时标签实时ETL --03加载规则类 (source Mysql)

1、项目架构

在这里插入图片描述
写到这里我将不进行项目介绍了。只要明白架构以及现在该文章主要进行的是什么操作就可以了。

2、加载mysql规则

package com.func

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.conf.BaseConf
import com.utils.StringUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.slf4j.LoggerFactory

import scala.collection.immutable._

/** 连接mysql加载数据源
 *
 */
class MysqlSourceFunc(conf: BaseConf) extends RichSourceFunction[HashMap[String, Tuple4[String,String,String,String]]] {

    private val logger = LoggerFactory.getLogger(classOf[MysqlSourceFunc])
    private val jdbcUrl = conf.jdbcUrl
    private val username = conf.mysqlUserName
    private val password = conf.mysqlPassword
    private val driverName = "com.mysql.jdbc.Driver"
    private val duration = 1 * 60 * 1000L
    var conn: Connection = _
    var ps: PreparedStatement = _
    var running = true
    val querySqlTimeLabel = "select label_id,event_name,execute_sql,period_value from time_label where label_status = 3 and delete_status=0"
    val updateSqlTimeLabel = "update time_label set label_status=3 where label_status=2 and label_id in "

    override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        logger.info("mysql init...")
    }

    //   HashMap  [event_id ,  bean]
    override def run(ctx: SourceFunction.SourceContext[HashMap[String, Tuple4[String,String,String,String]]]): Unit = {
        try {
            while (running) {
                var output: HashMap[String, Tuple4[String, String, String, String]] = new HashMap()
                logger.info("Mysql Connecting...")
                Class.forName(driverName)
                conn = DriverManager.getConnection(jdbcUrl, username, password)
                ps = conn.prepareStatement(querySqlTimeLabel)
                val rs = ps.executeQuery()
                var labelStr = ""
                while (rs.next()) {
                    val labelId = rs.getString("label_id")  //标签ID
                    val eventName = rs.getString("event_name")  //埋点ID
                    val executeSql = rs.getString("execute_sql") //执行SQL
                    val failureDuration = rs.getString("period_value")  //失效时长
                    labelStr +=",'"+labelId+"'"
                    output += (labelId -> Tuple4(labelId, eventName, executeSql, failureDuration))
                }
//                //组装label_id.是否有需要更新的新规则,
//                if (StringUtils.isNotEmpty(labelStr)) {
//                    val upSql = updateSqlTimeLabel  + "(" +labelStr.substring(1, labelStr.length()) + ")"
//                    logger.info(upSql)
//                    val upInt: Int = conn.prepareStatement(upSql).executeUpdate()
//                    logger.info("new add rule: "+upInt+" pcs")
//                }
                ctx.collect(output)
                logger.info("mysql close...")
                //释放资源
                close()
                //停止一分钟
                Thread.sleep(duration)
            }
        }catch {
            case e:Exception=>
                logger.error("source run:",e)
        }
    }

    override def cancel(): Unit = {
        running = false
    }

    override def close(): Unit = {
        try {
            super.close()
            if (conn != null) conn.close()
            if (ps != null) ps.close()
        } catch {
            case e: Exception =>
                logger.error("source close:",e)
        }
    }
}

3、主类介绍

主要介绍流程以及该类中的一些注释以及后续还可以优化的点。

1、自定义Source Function

需要继承extends RichSourceFunction
之所以没有使用SourceFunction还是考虑多线程的问题。

2、hashMap

HashMap[String, Tuple4[String,String,String,String]] 主要是为了下游的使用, key采用的是标签的id, Tuple[] 就是mysql的一些规则SQL和失效时间,当然如果用bean对象会更好。之所以写成tuple主要是为了方便获取。

3、mysql连接信息

之所以连接信息没有写在open方法里面主要是为了循环调用,以及关闭,因为规则一分钟加载一次。对于优化点还是有很多,可以写一个JDBCUtil类

4、传送门

整体来说该来还是很简单,只需要将mysql 的SQL查询出来,解析然后下发即可。
后续也将继续更新后面的代码

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-25 11:38:09  更:2022-05-25 11:38:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 3:33:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码