前言
上次哪个稍微有点复杂,可能一下就搞2个sink不好理解,这次来个简单的,仅仅做下source源数据做拆解转换,入库存储,模拟设备、设备上报信息同步。
一、场景
- 同步设备信息
- 模拟设备采集数据(源是最新数据,没有采集时间点概念)
- 分类存储转换(设备不能重复存储、设备上报记录有主,采集详情键值对)
二、设计思路
1.表结构设计
我的思路挺简单,同步数据库拆解表设计:
- 设备信息表:记录设备信息
- 设备上报主记录:记录设备上报同步时间点信息
- 设备上报属性详情信息:记录设备上报属性值详情
2.flink编码思路
- source通源数据库进行表连接查询(属性采集点与设备表)
- dataStream根据设备编号做分组
- 设置开窗策略
- process做数据转换拆解
- sink负责级联存储(设备信息做验重,上报信息主数据与设备属性采集值做级联)
三、核心代码
之前已经分享过相关工具类,这里我就不再重复分享了,可以见上一篇flink分享。 RoomEngineDeviceTask
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class RoomEngineDeviceTask {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
DataStreamSource<JgRoomEngineDeviceRecordResult> source = env.addSource(
new RoomEngineDeviceAndRecordResultSource(), "roomEngineDeviceAndRecord Source");
DataStream<JgRoomEngineDeviceRecordVo> dataStream = source.keyBy(
JgRoomEngineDeviceRecordResult::getDeviceid)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new RoomEngineDeviceAndRecordFunction());
dataStream.addSink(new RoomEngineDeviceRecordMysqlSink())
.name("RoomEngineDeviceRecordMysqlSink");
try {
env.execute("机房动环设备和上报数据同步");
} catch (Exception e) {
System.out.println("机房动环设备和上报数据同步失败,原因:" + e.getMessage());
throw new RuntimeException(e);
}
}
}
RoomEngineDeviceAndRecordResultSource
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class RoomEngineDeviceAndRecordResultSource extends
RichSourceFunction<JgRoomEngineDeviceRecordResult> {
private PreparedStatement ps;
private Connection connection;
private volatile Boolean isRunning = true;
private String sql = "SELECT p.devpropid ,p.propname ,p.deviceid ,p.curvalue ,p.unit ,p.valuedescript,d.* FROM 属性采集实时信息表 p left join 设备信息表 d on p.deviceid = d.deviceid WHERE d.devicename LIKE '二楼%' or d.devicename LIKE '%电量仪';";
private String startTimeKey = "roomEngineDeviceRecordStartTime";
private int cycleMinute = 15;
private long sleepTime = cycleMinute * 60 * 1000;
private Date curTime;
@Override
public void run(SourceContext<JgRoomEngineDeviceRecordResult> sourceContext) throws Exception {
Map<String, Object> param = new HashMap<>();
param.put("startTimeKey", startTimeKey);
param.put("dbInfo", DbInfo.SOURCE_ROOM_ENGINE);
param.put("dbConn", connection);
param.put("dbConnPs", ps);
param.put("sql", sql);
while (isRunning) {
Map<String, Object> changeSqlMap = DataBaseUtil.changeSqlParam(param, "roomEngine");
ps = (PreparedStatement) changeSqlMap.get("dbConnPs");
curTime = (Date) changeSqlMap.get("curTime");
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
JSONObject resultJson = RefUtil.getJsonObjData(resultSet,
JgRoomEngineDeviceRecordResult.class);
JgRoomEngineDeviceRecordResult deviceRecordResult = JSONUtil.toBean(resultJson,
JgRoomEngineDeviceRecordResult.class);
deviceRecordResult.setCollectDate(curTime);
sourceContext.collect(deviceRecordResult);
}
Thread.sleep(sleepTime);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = DataBaseUtil.getDbConnection(DbInfo.SOURCE_ROOM_ENGINE, connection);
}
@Override
public void close() throws Exception {
super.close();
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
}
RoomEngineDeviceAndRecordFunction
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class RoomEngineDeviceAndRecordFunction extends
ProcessWindowFunction<JgRoomEngineDeviceRecordResult, JgRoomEngineDeviceRecordVo, String, TimeWindow> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void process(String s,
ProcessWindowFunction<JgRoomEngineDeviceRecordResult, JgRoomEngineDeviceRecordVo, String, TimeWindow>.Context context,
Iterable<JgRoomEngineDeviceRecordResult> iterable,
Collector<JgRoomEngineDeviceRecordVo> collector) throws Exception {
JgRoomEngineDevice device = null;
List<JgRoomEngineDeviceRecordInfo> infoList = new ArrayList<>();
Iterator<JgRoomEngineDeviceRecordResult> it = iterable.iterator();
while (it.hasNext()) {
JgRoomEngineDeviceRecordResult recordResult = it.next();
JSONObject recordResultJson = JSONUtil.parseObj(recordResult);
if (device == null) {
device = JSONUtil.toBean(recordResultJson, JgRoomEngineDevice.class);
}
JgRoomEngineDeviceRecordInfo recordInfo = JSONUtil.toBean(recordResultJson,
JgRoomEngineDeviceRecordInfo.class);
infoList.add(recordInfo);
}
JgRoomEngineDeviceRecordVo vo = converDeviceRecordVo(device, infoList);
collector.collect(vo);
}
private JgRoomEngineDeviceRecordVo converDeviceRecordVo(JgRoomEngineDevice device,
List<JgRoomEngineDeviceRecordInfo> infoList) {
JgRoomEngineDeviceRecordVo vo = new JgRoomEngineDeviceRecordVo();
vo.setDevice(device);
if (CollectionUtil.isNotEmpty(infoList)) {
JgRoomEngineDeviceRecord record = converDeviceRecord(device, infoList.get(0));
vo.setDeviceRecord(record);
infoList.stream().forEach(c -> {
c.setDeviceId(device.getDeviceid());
c.setDevicename(device.getDevicename());
c.setPropId(c.getDevpropid().substring(device.getDeviceid().length()));
});
vo.setRecordList(infoList);
}
return vo;
}
private JgRoomEngineDeviceRecord converDeviceRecord(JgRoomEngineDevice device,
JgRoomEngineDeviceRecordInfo jgRoomEngineDeviceRecordInfo) {
JgRoomEngineDeviceRecord jgRoomEngineDeviceRecord = new JgRoomEngineDeviceRecord();
jgRoomEngineDeviceRecord.setDeviceid(device.getDeviceid());
jgRoomEngineDeviceRecord.setDevicename(device.getDevicename());
jgRoomEngineDeviceRecord.setRecordTime(device.getCollectDate());
jgRoomEngineDeviceRecord.setCreateBy(-1);
jgRoomEngineDeviceRecord.setIsDeleted(0);
return jgRoomEngineDeviceRecord;
}
}
RoomEngineDeviceRecordMysqlSink
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class RoomEngineDeviceRecordMysqlSink extends RichSinkFunction<JgRoomEngineDeviceRecordVo> {
private PreparedStatement ps;
private Connection connection;
private final static String deviceSelectSql = "SELECT d.* FROM jg_room_engine_device d where d.deviceid = '#DEVICE_ID#'";
private final static String deviceInsertStartSql = "INSERT INTO jg_room_engine_device (";
private final static String recordInsertStartSql = "INSERT INTO jg_room_engine_device_record (";
private final static String infoInsertStartSql = "INSERT INTO jg_room_engine_device_record_info (";
private final static String deviceValueSql = ") VALUES( ";
private final static String recordValueSql = ") VALUES( ";
private final static String infoValueSql = ") VALUES( ";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Thread.sleep(2000);
connection = DataBaseUtil.getDbConnection(DbInfo.TARGET_JG_MYSQL, connection);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(JgRoomEngineDeviceRecordVo vo, Context context) throws Exception {
super.invoke(vo, context);
JgRoomEngineDevice device = vo.getDevice();
JgRoomEngineDevice dbDevice = selectDevice(device);
if (dbDevice == null) {
insertDeviceInfo(device);
}
JgRoomEngineDeviceRecord record = vo.getDeviceRecord();
if (record != null) {
record.setCreateTime(DateUtil.date());
Integer recordId = insertDeviceRecord(record);
List<JgRoomEngineDeviceRecordInfo> recordInfoList = vo.getRecordList();
if (CollectionUtil.isNotEmpty(recordInfoList)) {
recordInfoList.stream().forEach(r -> {
r.setRecordId(recordId.longValue());
r.setDeviceId(r.getDeviceId());
try {
insertRecordInfo(r);
} catch (SQLException e) {
System.out.println("--插入设备上报记录详情报错,原因:" + e.getMessage());
throw new RuntimeException(e);
}
});
}
}
}
private Long insertRecordInfo(JgRoomEngineDeviceRecordInfo recordInfo) throws SQLException {
JSONObject dataJson = JSONUtil.parseObj(recordInfo);
String recordInsertSql = RefUtil.initInsertSql(infoInsertStartSql, infoValueSql, dataJson,
recordInfo.getClass());
PreparedStatement pstmt = connection.prepareStatement(recordInsertSql,
Statement.RETURN_GENERATED_KEYS);
pstmt.executeUpdate();
Long id = null;
ResultSet rs = pstmt.getGeneratedKeys();
if (rs.next()) {
id = rs.getLong(1);
System.out.println("上报记录详情数据主键:" + id);
}
return id;
}
private Integer insertDeviceRecord(JgRoomEngineDeviceRecord record) throws SQLException {
JSONObject dataJson = JSONUtil.parseObj(record);
String recordInsertSql = RefUtil.initInsertSql(recordInsertStartSql, recordValueSql, dataJson,
record.getClass());
PreparedStatement pstmt = connection.prepareStatement(recordInsertSql,
Statement.RETURN_GENERATED_KEYS);
pstmt.executeUpdate();
Integer id = null;
ResultSet rs = pstmt.getGeneratedKeys();
if (rs.next()) {
id = rs.getInt(1);
System.out.println("设备上报记录数据主键:" + id);
}
return id;
}
private Integer insertDeviceInfo(JgRoomEngineDevice device) throws SQLException {
JSONObject dataJson = JSONUtil.parseObj(device);
String deviceInsertSql = RefUtil.initInsertSql(deviceInsertStartSql, deviceValueSql, dataJson,
device.getClass());
PreparedStatement pstmt = connection.prepareStatement(deviceInsertSql,
Statement.RETURN_GENERATED_KEYS);
pstmt.executeUpdate();
Integer id = null;
ResultSet rs = pstmt.getGeneratedKeys();
if (rs.next()) {
id = rs.getInt(1);
System.out.println("设备数据主键:" + id);
}
return id;
}
private JgRoomEngineDevice selectDevice(JgRoomEngineDevice device) throws SQLException {
JgRoomEngineDevice dbDevice = null;
String selSql = deviceSelectSql.replaceAll("#DEVICE_ID#", device.getDeviceid());
ps = connection.prepareStatement(selSql);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
JSONObject dbDeviceJson = RefUtil.getJsonObjData(resultSet,
JgRoomEngineDevice.class);
dbDevice = JSONUtil.toBean(dbDeviceJson, JgRoomEngineDevice.class);
}
return dbDevice;
}
}
PS: - JgRoomEngineDeviceRecordResult对应source源数据库sql查询的字段 - device、deviceRecord、deviceRecordInfo分别就是设备信息、设备上报信息主表、设备上报信息详情(属性采集值) - JgRoomEngineDeviceRecordVo是合并信息,里面有device、deviceRecord、List - 其他就看注释,另外操作就是原始JDBC、只不过是利用json + 反射生成sql而已
总结
- 一点要理解dataStream的操作后其实还是一个dataStream,只是里面的元素数据结构是可以变的,就是通过map、fun、process的
- 里面有一个地方很特殊,就是在sink里获取数据库连接所里一个sleep,大家可以不加试试(其实我怼这个诡异也还没完全理解,进度比较紧急,先这样处理,后面再研究)
早上就写到这里,开始板砖,up!
|