IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink流处理引擎系统学习(十五) -> 正文阅读

[大数据]Flink流处理引擎系统学习(十五)

前言

上次哪个稍微有点复杂,可能一下就搞2个sink不好理解,这次来个简单的,仅仅做下source源数据做拆解转换,入库存储,模拟设备、设备上报信息同步。


一、场景

  • 同步设备信息
  • 模拟设备采集数据(源是最新数据,没有采集时间点概念)
  • 分类存储转换(设备不能重复存储、设备上报记录有主,采集详情键值对)

二、设计思路

1.表结构设计

我的思路挺简单,同步数据库拆解表设计:

  • 设备信息表:记录设备信息
  • 设备上报主记录:记录设备上报同步时间点信息
  • 设备上报属性详情信息:记录设备上报属性值详情

2.flink编码思路

  • source通源数据库进行表连接查询(属性采集点与设备表)
  • dataStream根据设备编号做分组
  • 设置开窗策略
  • process做数据转换拆解
  • sink负责级联存储(设备信息做验重,上报信息主数据与设备属性采集值做级联)

三、核心代码

之前已经分享过相关工具类,这里我就不再重复分享了,可以见上一篇flink分享。
RoomEngineDeviceTask


import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * 建工机房动环flink任务
 *
 * @author zhengwen
 **/
public class RoomEngineDeviceTask {

  public static void main(String[] args) {

    //获取运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    //开启 Checkpoint,每隔 5 秒钟做一次 CK
    env.enableCheckpointing(5000L);
    //指定 CK 的一致性语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    //设置任务关闭的时候保留最后一次 CK 数据
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //重试策略设置
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));

    //source
    DataStreamSource<JgRoomEngineDeviceRecordResult> source = env.addSource(
        new RoomEngineDeviceAndRecordResultSource(), "roomEngineDeviceAndRecord Source");

    //批处理计算
    DataStream<JgRoomEngineDeviceRecordVo> dataStream = source.keyBy(
            JgRoomEngineDeviceRecordResult::getDeviceid)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
        .process(new RoomEngineDeviceAndRecordFunction());

    dataStream.addSink(new RoomEngineDeviceRecordMysqlSink())
        .name("RoomEngineDeviceRecordMysqlSink");

    try {
      env.execute("机房动环设备和上报数据同步");
    } catch (Exception e) {
      System.out.println("机房动环设备和上报数据同步失败,原因:" + e.getMessage());
      throw new RuntimeException(e);
    }
  }

}

RoomEngineDeviceAndRecordResultSource


import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

/**
 * @author zhengwen
 **/
public class RoomEngineDeviceAndRecordResultSource extends
    RichSourceFunction<JgRoomEngineDeviceRecordResult> {


  private PreparedStatement ps;
  private Connection connection;

  private volatile Boolean isRunning = true;


  private String sql = "SELECT p.devpropid ,p.propname ,p.deviceid ,p.curvalue ,p.unit ,p.valuedescript,d.* FROM 属性采集实时信息表 p left join 设备信息表 d on p.deviceid = d.deviceid WHERE d.devicename LIKE '二楼%' or d.devicename LIKE '%电量仪';";

  private String startTimeKey = "roomEngineDeviceRecordStartTime";
  private int cycleMinute = 15;
  private long sleepTime = cycleMinute * 60 * 1000;

  private Date curTime;

  @Override
  public void run(SourceContext<JgRoomEngineDeviceRecordResult> sourceContext) throws Exception {
    Map<String, Object> param = new HashMap<>();
    param.put("startTimeKey", startTimeKey);
    param.put("dbInfo", DbInfo.SOURCE_ROOM_ENGINE);
    param.put("dbConn", connection);
    param.put("dbConnPs", ps);
    param.put("sql", sql);

    while (isRunning) {
      Map<String, Object> changeSqlMap = DataBaseUtil.changeSqlParam(param, "roomEngine");
      ps = (PreparedStatement) changeSqlMap.get("dbConnPs");
      curTime = (Date) changeSqlMap.get("curTime");
      //查询数据
      ResultSet resultSet = ps.executeQuery();
      while (resultSet.next()) {
        //结果集转json
        JSONObject resultJson = RefUtil.getJsonObjData(resultSet,
            JgRoomEngineDeviceRecordResult.class);

        //json转实体
        JgRoomEngineDeviceRecordResult deviceRecordResult = JSONUtil.toBean(resultJson,
            JgRoomEngineDeviceRecordResult.class);
        //数据补充与清理
        deviceRecordResult.setCollectDate(curTime);

        sourceContext.collect(deviceRecordResult);
      }

      //休息
      Thread.sleep(sleepTime);
    }
  }

  @Override
  public void cancel() {
    this.isRunning = false;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    connection = DataBaseUtil.getDbConnection(DbInfo.SOURCE_ROOM_ENGINE, connection);

  }

  @Override
  public void close() throws Exception {
    super.close();
    super.close();
    if (connection != null) {
      //关闭连接和释放资源
      connection.close();
    }
    if (ps != null) {
      ps.close();
    }
  }

}

RoomEngineDeviceAndRecordFunction


import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author zhengwen
 **/
public class RoomEngineDeviceAndRecordFunction extends
    ProcessWindowFunction<JgRoomEngineDeviceRecordResult, JgRoomEngineDeviceRecordVo, String, TimeWindow> {

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
  }

  @Override
  public void close() throws Exception {
    super.close();
  }

  @Override
  public void process(String s,
      ProcessWindowFunction<JgRoomEngineDeviceRecordResult, JgRoomEngineDeviceRecordVo, String, TimeWindow>.Context context,
      Iterable<JgRoomEngineDeviceRecordResult> iterable,
      Collector<JgRoomEngineDeviceRecordVo> collector) throws Exception {
    //拆解组织数据
    JgRoomEngineDevice device = null;
    List<JgRoomEngineDeviceRecordInfo> infoList = new ArrayList<>();
    Iterator<JgRoomEngineDeviceRecordResult> it = iterable.iterator();
    while (it.hasNext()) {
      JgRoomEngineDeviceRecordResult recordResult = it.next();
      JSONObject recordResultJson = JSONUtil.parseObj(recordResult);
      if (device == null) {
        device = JSONUtil.toBean(recordResultJson, JgRoomEngineDevice.class);
      }
      JgRoomEngineDeviceRecordInfo recordInfo = JSONUtil.toBean(recordResultJson,
          JgRoomEngineDeviceRecordInfo.class);
      infoList.add(recordInfo);
    }

    //汇总
    JgRoomEngineDeviceRecordVo vo = converDeviceRecordVo(device, infoList);

    collector.collect(vo);
  }

  private JgRoomEngineDeviceRecordVo converDeviceRecordVo(JgRoomEngineDevice device,
      List<JgRoomEngineDeviceRecordInfo> infoList) {
    JgRoomEngineDeviceRecordVo vo = new JgRoomEngineDeviceRecordVo();
    vo.setDevice(device);

    if (CollectionUtil.isNotEmpty(infoList)) {
      //上报记录主记录
      JgRoomEngineDeviceRecord record = converDeviceRecord(device, infoList.get(0));
      vo.setDeviceRecord(record);

      infoList.stream().forEach(c -> {
        c.setDeviceId(device.getDeviceid());
        c.setDevicename(device.getDevicename());
        c.setPropId(c.getDevpropid().substring(device.getDeviceid().length()));
      });

      vo.setRecordList(infoList);
    }

    return vo;
  }

  private JgRoomEngineDeviceRecord converDeviceRecord(JgRoomEngineDevice device,
      JgRoomEngineDeviceRecordInfo jgRoomEngineDeviceRecordInfo) {
    JgRoomEngineDeviceRecord jgRoomEngineDeviceRecord = new JgRoomEngineDeviceRecord();
    jgRoomEngineDeviceRecord.setDeviceid(device.getDeviceid());
    jgRoomEngineDeviceRecord.setDevicename(device.getDevicename());
    jgRoomEngineDeviceRecord.setRecordTime(device.getCollectDate());
    jgRoomEngineDeviceRecord.setCreateBy(-1);
    //jgRoomEngineDeviceRecord.setCreateTime(DateUtil.date());
    jgRoomEngineDeviceRecord.setIsDeleted(0);
    return jgRoomEngineDeviceRecord;
  }
}

RoomEngineDeviceRecordMysqlSink


import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author zhengwen
 **/
public class RoomEngineDeviceRecordMysqlSink extends RichSinkFunction<JgRoomEngineDeviceRecordVo> {


  private PreparedStatement ps;
  private Connection connection;

  private final static String deviceSelectSql = "SELECT d.* FROM jg_room_engine_device d where d.deviceid = '#DEVICE_ID#'";

  private final static String deviceInsertStartSql = "INSERT INTO jg_room_engine_device (";
  private final static String recordInsertStartSql = "INSERT INTO jg_room_engine_device_record (";

  private final static String infoInsertStartSql = "INSERT INTO jg_room_engine_device_record_info (";

  private final static String deviceValueSql = ") VALUES( ";
  private final static String recordValueSql = ") VALUES( ";

  private final static String infoValueSql = ") VALUES( ";


  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    Thread.sleep(2000);
    connection = DataBaseUtil.getDbConnection(DbInfo.TARGET_JG_MYSQL, connection);

  }

  @Override
  public void close() throws Exception {
    super.close();
    if (connection != null) {
      connection.close();
    }
    if (ps != null) {
      ps.close();
    }
  }

  @Override
  public void invoke(JgRoomEngineDeviceRecordVo vo, Context context) throws Exception {
    super.invoke(vo, context);

    JgRoomEngineDevice device = vo.getDevice();

    //查询设备信息
    JgRoomEngineDevice dbDevice = selectDevice(device);

    //判断存在
    if (dbDevice == null) {
      //执行插入设备信息
      insertDeviceInfo(device);
    }

    //插入设备上报主记录
    JgRoomEngineDeviceRecord record = vo.getDeviceRecord();
    if (record != null) {
      record.setCreateTime(DateUtil.date());
      Integer recordId = insertDeviceRecord(record);

      //上报数据详情list
      List<JgRoomEngineDeviceRecordInfo> recordInfoList = vo.getRecordList();
      if (CollectionUtil.isNotEmpty(recordInfoList)) {
        recordInfoList.stream().forEach(r -> {
          r.setRecordId(recordId.longValue());
          r.setDeviceId(r.getDeviceId());
          try {
            insertRecordInfo(r);
          } catch (SQLException e) {
            System.out.println("--插入设备上报记录详情报错,原因:" + e.getMessage());
            throw new RuntimeException(e);
          }
        });
      }
    }

    //插入设备上报详情

  }

  /**
   * 插入设备上报记录详情
   *
   * @param recordInfo 设备上报记录详情
   * @return 主键id
   */
  private Long insertRecordInfo(JgRoomEngineDeviceRecordInfo recordInfo) throws SQLException {
    JSONObject dataJson = JSONUtil.parseObj(recordInfo);
    String recordInsertSql = RefUtil.initInsertSql(infoInsertStartSql, infoValueSql, dataJson,
        recordInfo.getClass());
    PreparedStatement pstmt = connection.prepareStatement(recordInsertSql,
        Statement.RETURN_GENERATED_KEYS);
    pstmt.executeUpdate();

    // 检索由于执行此 Statement 对象而创建的所有自动生成的键
    Long id = null;
    ResultSet rs = pstmt.getGeneratedKeys();
    if (rs.next()) {
      id = rs.getLong(1);
      System.out.println("上报记录详情数据主键:" + id);
    }
    return id;
  }

  /**
   * 插入设备上报主记录
   *
   * @param record 上报主记录
   * @return 上报主记录id
   * @throws SQLException
   */
  private Integer insertDeviceRecord(JgRoomEngineDeviceRecord record) throws SQLException {
    JSONObject dataJson = JSONUtil.parseObj(record);
    String recordInsertSql = RefUtil.initInsertSql(recordInsertStartSql, recordValueSql, dataJson,
        record.getClass());
    PreparedStatement pstmt = connection.prepareStatement(recordInsertSql,
        Statement.RETURN_GENERATED_KEYS);
    pstmt.executeUpdate();

    // 检索由于执行此 Statement 对象而创建的所有自动生成的键
    Integer id = null;
    ResultSet rs = pstmt.getGeneratedKeys();
    if (rs.next()) {
      id = rs.getInt(1);
      System.out.println("设备上报记录数据主键:" + id);
    }
    return id;
  }

  /**
   * 插入设备信息
   *
   * @param device 设备信息
   * @return 返回设备信息主键
   * @throws SQLException
   */
  private Integer insertDeviceInfo(JgRoomEngineDevice device) throws SQLException {
    JSONObject dataJson = JSONUtil.parseObj(device);
    String deviceInsertSql = RefUtil.initInsertSql(deviceInsertStartSql, deviceValueSql, dataJson,
        device.getClass());
    PreparedStatement pstmt = connection.prepareStatement(deviceInsertSql,
        Statement.RETURN_GENERATED_KEYS);
    pstmt.executeUpdate();

    // 检索由于执行此 Statement 对象而创建的所有自动生成的键
    Integer id = null;
    ResultSet rs = pstmt.getGeneratedKeys();
    if (rs.next()) {
      id = rs.getInt(1);
      System.out.println("设备数据主键:" + id);
    }
    return id;
  }

  /**
   * 查询设备信息
   *
   * @param device 设备信息
   * @return 返回数据库设备信息
   * @throws SQLException 异常
   */
  private JgRoomEngineDevice selectDevice(JgRoomEngineDevice device) throws SQLException {
    JgRoomEngineDevice dbDevice = null;
    String selSql = deviceSelectSql.replaceAll("#DEVICE_ID#", device.getDeviceid());
    ps = connection.prepareStatement(selSql);
    ResultSet resultSet = ps.executeQuery();
    while (resultSet.next()) {
      JSONObject dbDeviceJson = RefUtil.getJsonObjData(resultSet,
          JgRoomEngineDevice.class);
      dbDevice = JSONUtil.toBean(dbDeviceJson, JgRoomEngineDevice.class);
    }
    return dbDevice;
  }

}

PS: - JgRoomEngineDeviceRecordResult对应source源数据库sql查询的字段 - device、deviceRecord、deviceRecordInfo分别就是设备信息、设备上报信息主表、设备上报信息详情(属性采集值) - JgRoomEngineDeviceRecordVo是合并信息,里面有device、deviceRecord、List - 其他就看注释,另外操作就是原始JDBC、只不过是利用json + 反射生成sql而已

总结

  • 一点要理解dataStream的操作后其实还是一个dataStream,只是里面的元素数据结构是可以变的,就是通过map、fun、process的
  • 里面有一个地方很特殊,就是在sink里获取数据库连接所里一个sleep,大家可以不加试试(其实我怼这个诡异也还没完全理解,进度比较紧急,先这样处理,后面再研究)
    早上就写到这里,开始板砖,up!
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-26 15:19:52  更:2022-05-26 15:20:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 3:50:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码