1、项目架构
写到这里我将不进行项目介绍了。只要明白架构以及现在该文章主要进行的是什么操作就可以了。
2、加载mysql规则
package com.func
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.conf.BaseConf
import com.utils.StringUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.slf4j.LoggerFactory
import scala.collection.immutable._
class MysqlSourceFunc(conf: BaseConf) extends RichSourceFunction[HashMap[String, Tuple4[String,String,String,String]]] {
private val logger = LoggerFactory.getLogger(classOf[MysqlSourceFunc])
private val jdbcUrl = conf.jdbcUrl
private val username = conf.mysqlUserName
private val password = conf.mysqlPassword
private val driverName = "com.mysql.jdbc.Driver"
private val duration = 1 * 60 * 1000L
var conn: Connection = _
var ps: PreparedStatement = _
var running = true
val querySqlTimeLabel = "select label_id,event_name,execute_sql,period_value from time_label where label_status = 3 and delete_status=0"
val updateSqlTimeLabel = "update time_label set label_status=3 where label_status=2 and label_id in "
override def open(parameters: Configuration): Unit = {
super.open(parameters)
logger.info("mysql init...")
}
override def run(ctx: SourceFunction.SourceContext[HashMap[String, Tuple4[String,String,String,String]]]): Unit = {
try {
while (running) {
var output: HashMap[String, Tuple4[String, String, String, String]] = new HashMap()
logger.info("Mysql Connecting...")
Class.forName(driverName)
conn = DriverManager.getConnection(jdbcUrl, username, password)
ps = conn.prepareStatement(querySqlTimeLabel)
val rs = ps.executeQuery()
var labelStr = ""
while (rs.next()) {
val labelId = rs.getString("label_id")
val eventName = rs.getString("event_name")
val executeSql = rs.getString("execute_sql")
val failureDuration = rs.getString("period_value")
labelStr +=",'"+labelId+"'"
output += (labelId -> Tuple4(labelId, eventName, executeSql, failureDuration))
}
ctx.collect(output)
logger.info("mysql close...")
close()
Thread.sleep(duration)
}
}catch {
case e:Exception=>
logger.error("source run:",e)
}
}
override def cancel(): Unit = {
running = false
}
override def close(): Unit = {
try {
super.close()
if (conn != null) conn.close()
if (ps != null) ps.close()
} catch {
case e: Exception =>
logger.error("source close:",e)
}
}
}
3、主类介绍
主要介绍流程以及该类中的一些注释以及后续还可以优化的点。
1、自定义Source Function
需要继承extends RichSourceFunction 之所以没有使用SourceFunction还是考虑多线程的问题。
2、hashMap
HashMap[String, Tuple4[String,String,String,String]] 主要是为了下游的使用, key采用的是标签的id, Tuple[] 就是mysql的一些规则SQL和失效时间,当然如果用bean对象会更好。之所以写成tuple主要是为了方便获取。
3、mysql连接信息
之所以连接信息没有写在open方法里面主要是为了循环调用,以及关闭,因为规则一分钟加载一次。对于优化点还是有很多,可以写一个JDBCUtil类
4、传送门
整体来说该来还是很简单,只需要将mysql 的SQL查询出来,解析然后下发即可。 后续也将继续更新后面的代码
|