前言
???????最近都没有时间循序渐进的撸Flink的基础知识了跟大家分享了,今天就直接跟大家分享最近写的FlinkTask吧,我们在实践中强大。不废话,我最近也没有时间跟大家废话。
一、使用场景
???????场景其实挺简单,就是同步别人系统的数据,存储记录并做统计计算存储统计结果。我这里是同步客户系统的访客信息存储、并统计,统计结果存库。
二、方案选型
1.基于日志
???????首先因为要做计算,肯定就考虑Flink,而且领导也定了就用Flink。 ???????那么基于日志又要求用Flink,同时要汇总统计,那肯定就是Flink CDC,但是因为是客户系统的数据库,客户的原支持方都已经退场了,不可能做支持修改数据库的相关配置。而我们自己也不敢处理,万一修改配置了导致数据库重启失败怎么办?所以肯定是放弃这个方案,尽管上次预研这个Flink CDC不是一般的香(感兴趣的可以见上次的分享十三)。
2.基于JDBC
???????于是就只能做数据的主动查询,做周期查询,使用Flink的批处理。 ???????其实在写这个分享前,看到官网还有一个基于Table的JDBC用法,但是这个已经写完了额,就只能下次再撸了。flink官网关于Table的JDBC使用:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/
三、方案设计
1.表结构设计
业务记录表: 表结构就与客户系统的业务记录表一致,我就增加几个字段:
- collect_time:采集时间,这个字段确定周期同步时间点
- storage_time:存储时间,这个记录数据转存存储时间
- collect_start_time:采集数据开始时间,这个用于区分周期采集数据的批次开始时间
- collect_end_time:采集数据结束时间,这个用于区分周期采集数据的批次结束时间
- sourceId:原始id,源业务表的主键,方便后面对照验证
其实这个采集时间、采集开始、结束时间也是为了方便后期验证flink的采集连贯性、数据统计计算的准确性。因为涉密就不给大家分享具体表结构了,大家有同样场景的加这几个字段就对了。
统计信息表: 那就根据自己的统计维度做设计了,后面的flink计算也是跟这个表有关。比如我这里是统计人流量,那么实际就是一个记录的数量。
2.思路梳理
- flink使用mysql数据源source,周期性查询客户业务表数据,做为批处理source来源。
- source做流处理,转换2个流出来,主流还是原始采集到的数据,旁路流做数据转换补充我们增加的几个业务字段值设置。
- 一个旁路流去做存储,主流通过采集时间分组(实际上也不会有2组采集时间相同的流数据),这里其实应该也可以流先做数据存储,再做统计。
- 分组后开窗,这里其实就是拆解计算到槽位(不开窗,不能用process),其实这种批处理应该用DataSet的,我这里用的DataStream
- 最后计算的结果发到sink,mysql记录到表
这里面唯一要说的一个机制就是因为我这是分享的历史数据同步并计算,所以不能与实时的重复统计了,所以引入了redis缓存开始时间、结束时间,再处理没批次查询数据的时间段时做结束时间比较。这个结束时间就是开始运行时的时间(也可以人为设置再代码里) 另外因为jdbc连接可能遇到连接丢失、查询异常等,所以flink的这个Task重启了,开始时间是丢失的,所以也需要缓存起来。这样设置还原点、重启策略后也不用担心数据重复统计,每次都从开始时间统计的问题了。这个机制在下面的代码分享里面再细细感悟吧。 实际上实时同步的task也需要这个机制,以防万一重启了,前面的数据实际再重启后依然会进行处理(flink的迷人之处之一)
四、核心代码
-
这里因为涉密不分享实体,不分享数据库连接信息、不分享表结构。整个task的代码都尽量分享: -
枚举类设置了源、目标、redis的连接信息,这个就不分享了,大家按照使用的字段设置枚举对象的字段就行。 -
sql的组织是用反射 + 实体转json做的,给大家也看不出来字段。哈哈。下面上码: 依赖引入:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>
flink的其他包,大家自行引入。
RedisUtil:
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
public class RedisUtil {
public static Jedis getJedis(String url, Integer port, String username, String password) {
if (port == null) {
port = 6379;
}
HostAndPort hp = new HostAndPort(url, port);
Jedis jedis = new Jedis(hp);
if (StringUtils.isNotBlank(password)) {
jedis.auth(password);
}
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
jedis.auth(username, password);
}
System.out.println("获取redis的jedis成功");
return jedis;
}
public static String getString(Jedis jedis, String key) throws Exception {
if (jedis == null) {
throw new Exception("redis连接已断开");
}
if (StringUtils.isNotBlank(key)) {
return jedis.get(key);
}
return null;
}
public static void setString(Jedis jedis, String key, String value) throws Exception {
if (jedis == null) {
throw new Exception("redis连接已断开");
}
jedis.set(key, value);
}
}
RefUtil
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.json.JSONObject;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RefUtil {
private static Pattern UNDERLINE_PATTERN = Pattern.compile("_([a-z])");
private static Pattern HUMP_PATTERN = Pattern.compile("[A-Z]");
public static Map toMap(Object obj) throws Exception {
Class clazz = obj.getClass();
Field[] fields = obj.getClass().getDeclaredFields();
HashMap hashMap = new HashMap(fields.length);
for (Field field : fields) {
PropertyDescriptor pd = new PropertyDescriptor(field.getName(), clazz);
Method getMethod = pd.getReadMethod();
Object exeValue = getMethod.invoke(obj);
String key = field.getName();
Class<?> fieldClass = field.getType();
log.debug("----{}类型:{}", key, fieldClass);
switch (fieldClass.getSimpleName()) {
case "int":
log.debug("--基础类型int");
break;
case "String":
break;
default:
break;
}
hashMap.put(key, exeValue);
}
return hashMap;
}
public static Map<String, Object> objectToMap(Object obj) throws Exception {
if (obj == null) {
return null;
}
Map<String, Object> map = new HashMap<String, Object>();
Field[] declaredFields = obj.getClass().getDeclaredFields();
for (Field field : declaredFields) {
field.setAccessible(true);
map.put(field.getName(), field.get(obj));
}
return map;
}
public static String underlineToHump(String str) {
Matcher matcher = UNDERLINE_PATTERN.matcher(str);
StringBuffer sb = new StringBuffer(str);
if (matcher.find()) {
sb = new StringBuffer();
matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
matcher.appendTail(sb);
} else {
return sb.toString().replaceAll("_", "");
}
return underlineToHump(sb.toString());
}
public static String humpToLine(String str) {
Matcher matcher = HUMP_PATTERN.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
}
matcher.appendTail(sb);
return sb.toString();
}
public static JSONObject getJsonObjData(ResultSet resultSet, Class clazz) {
JSONObject jsonObj = new JSONObject();
Field[] fields = ReflectUtil.getFields(clazz);
if (ArrayUtil.isNotEmpty(fields)) {
Arrays.stream(fields).forEach(c -> {
String name = c.getName();
String lineName = RefUtil.humpToLine(name);
try {
jsonObj.putOpt(name, resultSet.getObject(lineName));
} catch (Exception e) {
log.error("查询结果没有字段{}或转换异常,原因:{}", lineName, e.getMessage());
}
});
}
return jsonObj;
}
public static String initInsertSql(String insertStartSql, String valueSql, JSONObject dataJson,
Class clazz) {
String sql = null;
Field[] fields = ReflectUtil.getFields(clazz);
if (ArrayUtil.isNotEmpty(fields)) {
StringBuffer insertBeforeSbf = new StringBuffer(insertStartSql);
StringBuffer valueSbf = new StringBuffer(valueSql);
int len = fields.length;
for (int i = 0; i < len; i++) {
Field field = fields[i];
String name = field.getName();
if (name.equals("serialVersionUID") || name.equals("entityClass")) {
break;
}
String lineName = RefUtil.humpToLine(name);
Object valObj = dataJson.get(name);
if (valObj != null) {
insertBeforeSbf.append(lineName).append(",");
Class type = field.getType();
String typeName = type.getSimpleName();
switch (typeName) {
case "String":
valueSbf.append("'").append(valObj).append("'");
break;
case "Byte":
valueSbf.append("'").append(valObj).append("'");
break;
case "Integer":
valueSbf.append(valObj);
break;
case "Date":
if (valObj.toString().contains("-") && valObj.toString().contains(":")) {
valueSbf.append("'").append(valObj.toString()).append("'");
} else {
Date date = null;
if (NumberUtil.isNumber(valObj.toString())) {
date = new Date(Long.parseLong(valObj.toString()));
} else {
try {
date = new Date(Long.parseLong(valObj.toString()));
} catch (Exception e) {
date = DateUtil.parseCST(valObj.toString());
}
}
valueSbf.append("'").append(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss"))
.append("'");
}
break;
default:
valueSbf.append(valObj);
break;
}
valueSbf.append(",");
}
}
String finValueSql = valueSbf.toString();
if (finValueSql.endsWith(",")) {
finValueSql = finValueSql.substring(0, finValueSql.length() - 1);
}
String insertSql = insertBeforeSbf.toString();
if (insertSql.endsWith(",")) {
insertSql = insertSql.substring(0, insertSql.length() - 1);
}
sql = insertSql + finValueSql + ");";
}
return sql;
}
}
flinkTask
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class HisRecordSyncTask {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
DataStream<FkRegisterVisitor> source = env.addSource(
new RegisterVisitorJdbcSourceHis(), "history source");
final OutputTag<JgFkRegisterVisitorRecord> outputTag = new OutputTag<JgFkRegisterVisitorRecord>(
"side-output") {
};
SingleOutputStreamOperator<FkRegisterVisitor> mainDataStream = source
.process(new ProcessFunction<FkRegisterVisitor, FkRegisterVisitor>() {
@Override
public void processElement(FkRegisterVisitor oldValue,
Context context,
Collector<FkRegisterVisitor> collector) throws Exception {
collector.collect(oldValue);
JgFkRegisterVisitorRecord jgFkRegisterVisitorRecord = new JgFkRegisterVisitorRecord();
BeanUtil.copyProperties(oldValue, jgFkRegisterVisitorRecord,
CopyOptions.create().setIgnoreNullValue(Boolean.TRUE));
context.output(outputTag, jgFkRegisterVisitorRecord);
}
});
DataStream<JgFkRegisterVisitorRecord> sideOutputStream = mainDataStream.getSideOutput(
outputTag);
sideOutputStream.addSink(new VisitorRecordStorageMysqlSink())
.name("RecordStorageMysqlSink");
DataStream<JgFkVisitorStatisticsInfo> sumStream =
mainDataStream.map(new VisitorCountRichMapFunction()).keyBy(
new KeySelector<Tuple2<String, JgFkVisitorStatisticsInfo>, String>() {
@Override
public String getKey(
Tuple2<String, JgFkVisitorStatisticsInfo> tuple2)
throws Exception {
return tuple2.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new TotalVisitorCountFunction());
sumStream.addSink(new VisitorStatisticsMysqlSink()).name("StatisticsMysqlSink");
try {
env.execute("历史记录同步计算");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
RegisterVisitorJdbcSourceHis:
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import redis.clients.jedis.Jedis;
@Slf4j
public class RegisterVisitorJdbcSourceHis extends RichSourceFunction<FkRegisterVisitor> {
private PreparedStatement ps;
private Connection connection;
private volatile Boolean isRunning = true;
private Date lastTime = null;
private Date curTime = null;
private String startTimeKey = "StartTimeHis";
private String endTimeKey = "EndTimeHis";
private final static String sql = "select frv.* from 源业务表 frv where frv.create_time >= '#START_TIME#' and frv.create_time <= '#END_TIME#';";
private int cycleMinute = 1;
private int stepMinute = 15;
private long sleepTime = cycleMinute * 60 * 1000;
private String endTimeStr = "2019-12-31 23:59:59";
private String startTimeStr = "2019-05-18 13:30:00";
@Override
public void run(SourceContext<FkRegisterVisitor> sourceContext) throws Exception {
while (isRunning) {
changeSqlParam();
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
JSONObject jsonObj = RefUtil.getJsonObjData(resultSet, FkRegisterVisitor.class);
FkRegisterVisitor fkRegisterVisitor = JSONUtil.toBean(jsonObj, FkRegisterVisitor.class);
fkRegisterVisitor.setSourceId(fkRegisterVisitor.getId());
fkRegisterVisitor.setCollectDate(curTime);
fkRegisterVisitor.setCollectStartTime(lastTime);
fkRegisterVisitor.setCollectEndTime(curTime);
fkRegisterVisitor.setId(null);
sourceContext.collect(fkRegisterVisitor);
}
Thread.sleep(sleepTime);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
}
private void changeSqlParam() throws Exception {
String ymdHmsFormat = "yyyy-MM-dd HH:mm:ss";
Jedis jedis = RedisUtil.getJedis(DbInfo.TARGET_REDIS.getJdbcUrl(),
DbInfo.TARGET_REDIS.getPort(), DbInfo.TARGET_REDIS.getUsername(),
DbInfo.TARGET_REDIS.getPassword());
String startTimeCacheStr = RedisUtil.getString(jedis, startTimeKey);
if (StringUtils.isBlank(startTimeCacheStr)) {
startTimeCacheStr = startTimeStr;
}
Date startTime = DateUtil.parse(startTimeCacheStr,
DatePattern.NORM_DATETIME_MINUTE_FORMAT.getPattern());
Date endTime = DateUtil.offsetMinute(startTime, stepMinute);
String endTimeStr = RedisUtil.getString(jedis, endTimeKey);
if (StringUtils.isNotBlank(endTimeStr)) {
Date planEndTime = DateUtil.parseDate(endTimeStr);
if (planEndTime.before(endTime)) {
endTime = planEndTime;
}
}
if (lastTime == null) {
lastTime = startTime;
curTime = endTime;
} else {
lastTime = curTime;
curTime = DateUtil.offsetMinute(lastTime, stepMinute);
}
RedisUtil.setString(jedis, startTimeKey, DateUtil.formatDateTime(lastTime));
RedisUtil.setString(jedis, endTimeKey,
DateUtil.formatDateTime(DateUtil.endOfSecond(new Date())));
jedis.close();
String curSql = sql.replaceAll("#START_TIME#", DateUtil.format(lastTime, ymdHmsFormat))
.replaceAll("#END_TIME#", DateUtil.format(curTime, ymdHmsFormat));
System.out.println("---Sql:" + curSql);
if (this.connection == null) {
this.connection = getConnection();
}
ps = this.connection.prepareStatement(curSql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName(DbInfo.SOURCE_JG_VISITOR_MJ.getClassName());
con = DriverManager.getConnection(DbInfo.SOURCE_JG_VISITOR_MJ.getJdbcUrl(),
DbInfo.SOURCE_JG_VISITOR_MJ.getUsername(), DbInfo.SOURCE_JG_VISITOR_MJ.getPassword());
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
VisitorRecordStorageMysqlSink
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class VisitorRecordStorageMysqlSink extends RichSinkFunction<JgFkRegisterVisitorRecord> {
private PreparedStatement ps;
private Connection connection;
private final static String insertStartSql = "INSERT INTO 目标记录表 (";
private final static String valueSql = ") VALUES( ";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(JgFkRegisterVisitorRecord record, Context context) throws Exception {
super.invoke(record, context);
record.setStorageDate(DateUtil.date());
JSONObject dataJson = JSONUtil.parseObj(record);
String sql = RefUtil.initInsertSql(insertStartSql, valueSql, dataJson, record.getClass());
if (connection == null) {
connection = getConnection();
}
ps = connection.prepareStatement(sql);
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName(DbInfo.TARGET_JG_MYSQL.getClassName());
con = DriverManager.getConnection(DbInfo.TARGET_JG_MYSQL.getJdbcUrl(),
DbInfo.TARGET_JG_MYSQL.getUsername(), DbInfo.TARGET_JG_MYSQL.getPassword());
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
VisitorCountRichMapFunction
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
public class VisitorCountRichMapFunction extends
RichMapFunction<FkRegisterVisitor, Tuple2<String, JgFkVisitorStatisticsInfo>> {
@Override
public Tuple2<String, JgFkVisitorStatisticsInfo> map(FkRegisterVisitor fkRegisterVisitor)
throws Exception {
JgFkVisitorStatisticsInfo jgFkVisitorStatisticsInfo = new JgFkVisitorStatisticsInfo();
jgFkVisitorStatisticsInfo.setStatisticsType(1);
jgFkVisitorStatisticsInfo.setCollectDate(fkRegisterVisitor.getCollectDate());
jgFkVisitorStatisticsInfo.setCollectStartTime(fkRegisterVisitor.getCollectStartTime());
jgFkVisitorStatisticsInfo.setCollectEndTime(fkRegisterVisitor.getCollectEndTime());
jgFkVisitorStatisticsInfo.setStatisticsNum(1);
String key = fkRegisterVisitor.getCollectDate().toString();
return new Tuple2<>(key, jgFkVisitorStatisticsInfo);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
}
TotalVisitorCountFunction
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import java.util.Iterator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class TotalVisitorCountFunction extends
ProcessWindowFunction<Tuple2<String, JgFkVisitorStatisticsInfo>, JgFkVisitorStatisticsInfo, String, TimeWindow> {
private IntCounter visitorNum = new IntCounter();
private ValueState<Long> totalCountState;
private ValueState<String> collectTimeStr;
@Override
public void process(String key,
Context context,
Iterable<Tuple2<String, JgFkVisitorStatisticsInfo>> iterable,
Collector<JgFkVisitorStatisticsInfo> out) throws Exception {
JgFkVisitorStatisticsInfo vsi = new JgFkVisitorStatisticsInfo();
Iterator<Tuple2<String, JgFkVisitorStatisticsInfo>> it = iterable.iterator();
while (it.hasNext()) {
visitorNum.add(1);
Tuple2<String, JgFkVisitorStatisticsInfo> tuple2 = it.next();
String tpKey = tuple2.f0;
if (tpKey.equals(key)) {
JgFkVisitorStatisticsInfo tmpVsi = tuple2.f1;
BeanUtil.copyProperties(tmpVsi, vsi,
CopyOptions.create().setIgnoreNullValue(Boolean.TRUE));
if (totalCountState.value() == null){
totalCountState.update(0L);
}
Long tmpCount = totalCountState.value() + 1;
totalCountState.update(tmpCount);
vsi.setStatisticsNum(visitorNum.getLocalValue());
} else {
totalCountState.clear();
collectTimeStr.clear();
}
}
visitorNum.resetLocal();
out.collect(vsi);
totalCountState.update(vsi.getStatisticsNum().longValue());
collectTimeStr.update(key);
}
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("visitorNum", this.visitorNum);
super.open(parameters);
totalCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("totalCountState", Long.class));
collectTimeStr = getRuntimeContext().getState(
new ValueStateDescriptor<>("collectTimeStr", String.class));
}
@Override
public void close() throws Exception {
super.close();
}
}
这里要说下,其实这个用计数器也行,不过用Value会更好,重启也不会丢失。 VisitorStatisticsMysqlSink
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Date;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class VisitorStatisticsMysqlSink extends RichSinkFunction<JgFkVisitorStatisticsInfo> {
private PreparedStatement ps;
private Connection connection;
private final static String insertStartSql = "INSERT INTO 目标统计表 (";
private final static String valueSql = ") VALUES( ";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(JgFkVisitorStatisticsInfo statisticsInfo, Context context) throws Exception {
super.invoke(statisticsInfo, context);
statisticsInfo.setStorageDate(DateUtil.date());
JSONObject dataJson = JSONUtil.parseObj(statisticsInfo);
String sql = RefUtil.initInsertSql(insertStartSql, valueSql, dataJson,
statisticsInfo.getClass());
if (connection == null) {
connection = getConnection();
}
ps = connection.prepareStatement(sql);
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName(DbInfo.TARGET_JG_MYSQL.getClassName());
con = DriverManager.getConnection(DbInfo.TARGET_JG_MYSQL.getJdbcUrl(),
DbInfo.TARGET_JG_MYSQL.getUsername(), DbInfo.TARGET_JG_MYSQL.getPassword());
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
???????整流程的能分享的都分享了,主结构,source、function、sink,都分享了。里面的注释也写的还算多。几乎是保姆级别的一个Task梳理了,当然里面还有很多奇淫巧计还没有用,后期我有机会跟大家再分享。
五、经典报错
- The last packet successfully received from the server was 301,032 milliseconds ago. The last packet sent successfully to the server was 0 milliseconds ago
这个问题跟数据库的wait_time有一定关系,这里有重启策略,有这个错误也不怕。如果有条件改数据库配置,可以改改试试。 - 通过Flink管理端上传启动提示
注意在总览检查下有没有槽位、在TaskManagers看看有没有Task管理实例。
总结
???????没啥可总结的,感觉就是香,用了这个感觉用sql做统计都不香了。 ???????过程中也遇到一点坑,就差一步就ok的,坑的时间有点长,怕影响进度,同事也没遇到过,最后还问了下部门领导。不得不说领导还是强啊,年龄比我大,感觉学习能力、接受新事物还是棒棒哒啊。 ???????时间也不早了,最近都是下班搞会,提加班太麻烦,还总被人事打回来,还不如下班了总结下近日所得,写出来也是一次对运用的巩固与梳理,就分享到这里。Up!
|