IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink流处理引擎系统学习(十四) -> 正文阅读

[大数据]Flink流处理引擎系统学习(十四)

前言

???????最近都没有时间循序渐进的撸Flink的基础知识了跟大家分享了,今天就直接跟大家分享最近写的FlinkTask吧,我们在实践中强大。不废话,我最近也没有时间跟大家废话。


一、使用场景

???????场景其实挺简单,就是同步别人系统的数据,存储记录并做统计计算存储统计结果。我这里是同步客户系统的访客信息存储、并统计,统计结果存库。


二、方案选型

1.基于日志

???????首先因为要做计算,肯定就考虑Flink,而且领导也定了就用Flink。
???????那么基于日志又要求用Flink,同时要汇总统计,那肯定就是Flink CDC,但是因为是客户系统的数据库,客户的原支持方都已经退场了,不可能做支持修改数据库的相关配置。而我们自己也不敢处理,万一修改配置了导致数据库重启失败怎么办?所以肯定是放弃这个方案,尽管上次预研这个Flink CDC不是一般的香(感兴趣的可以见上次的分享十三)。

2.基于JDBC

???????于是就只能做数据的主动查询,做周期查询,使用Flink的批处理。
???????其实在写这个分享前,看到官网还有一个基于Table的JDBC用法,但是这个已经写完了额,就只能下次再撸了。flink官网关于Table的JDBC使用:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/


三、方案设计

1.表结构设计

业务记录表:
表结构就与客户系统的业务记录表一致,我就增加几个字段:

  • collect_time:采集时间,这个字段确定周期同步时间点
  • storage_time:存储时间,这个记录数据转存存储时间
  • collect_start_time:采集数据开始时间,这个用于区分周期采集数据的批次开始时间
  • collect_end_time:采集数据结束时间,这个用于区分周期采集数据的批次结束时间
  • sourceId:原始id,源业务表的主键,方便后面对照验证
    其实这个采集时间、采集开始、结束时间也是为了方便后期验证flink的采集连贯性、数据统计计算的准确性。因为涉密就不给大家分享具体表结构了,大家有同样场景的加这几个字段就对了。

统计信息表:
那就根据自己的统计维度做设计了,后面的flink计算也是跟这个表有关。比如我这里是统计人流量,那么实际就是一个记录的数量。

2.思路梳理

  • flink使用mysql数据源source,周期性查询客户业务表数据,做为批处理source来源。
  • source做流处理,转换2个流出来,主流还是原始采集到的数据,旁路流做数据转换补充我们增加的几个业务字段值设置。
  • 一个旁路流去做存储,主流通过采集时间分组(实际上也不会有2组采集时间相同的流数据),这里其实应该也可以流先做数据存储,再做统计。
  • 分组后开窗,这里其实就是拆解计算到槽位(不开窗,不能用process),其实这种批处理应该用DataSet的,我这里用的DataStream
  • 最后计算的结果发到sink,mysql记录到表

    这里面唯一要说的一个机制就是因为我这是分享的历史数据同步并计算,所以不能与实时的重复统计了,所以引入了redis缓存开始时间、结束时间,再处理没批次查询数据的时间段时做结束时间比较。这个结束时间就是开始运行时的时间(也可以人为设置再代码里)
    另外因为jdbc连接可能遇到连接丢失、查询异常等,所以flink的这个Task重启了,开始时间是丢失的,所以也需要缓存起来。这样设置还原点、重启策略后也不用担心数据重复统计,每次都从开始时间统计的问题了。这个机制在下面的代码分享里面再细细感悟吧。
    实际上实时同步的task也需要这个机制,以防万一重启了,前面的数据实际再重启后依然会进行处理(flink的迷人之处之一)

四、核心代码

  • 这里因为涉密不分享实体,不分享数据库连接信息、不分享表结构。整个task的代码都尽量分享:

  • 枚举类设置了源、目标、redis的连接信息,这个就不分享了,大家按照使用的字段设置枚举对象的字段就行。

  • sql的组织是用反射 + 实体转json做的,给大家也看不出来字段。哈哈。下面上码:
    依赖引入:

<!--mysql-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>4.0.1</version>
    </dependency>
    <!--工具包开始-->
    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.7.22</version>
    </dependency>

    <!--工具包结束-->

flink的其他包,大家自行引入。

RedisUtil:


import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

/**
 * redis工具类
 *
 * @author zhengwen
 **/
public class RedisUtil {

  /**
   * 获取redis连接jedis
   *
   * @param url      reids服务url
   * @param port     端口
   * @param username 用户名
   * @param password 密码
   * @return Jedis对象
   */
  public static Jedis getJedis(String url, Integer port, String username, String password) {
    if (port == null) {
      port = 6379;
    }
    //组织建立连接参数
    HostAndPort hp = new HostAndPort(url, port);
    Jedis jedis = new Jedis(hp);
    //redis连接权限设置
    if (StringUtils.isNotBlank(password)) {
      jedis.auth(password);
    }
    if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
      jedis.auth(username, password);
    }
    System.out.println("获取redis的jedis成功");
    return jedis;
  }

  public static String getString(Jedis jedis, String key) throws Exception {
    if (jedis == null) {
      throw new Exception("redis连接已断开");
    }
    if (StringUtils.isNotBlank(key)) {
      return jedis.get(key);
    }
    return null;
  }


  /**
   * 设置字符串值
   *
   * @param jedis jedis对象
   * @param key   键
   * @param value 值
   * @throws Exception 异常
   */
  public static void setString(Jedis jedis, String key, String value) throws Exception {
    if (jedis == null) {
      throw new Exception("redis连接已断开");
    }
    jedis.set(key, value);
  }
}

RefUtil


import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.json.JSONObject;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;

/**
 * 自定义反射工具类
 *
 * @author zhengwen
 */
@Slf4j
public class RefUtil {

  private static Pattern UNDERLINE_PATTERN = Pattern.compile("_([a-z])");


  private static Pattern HUMP_PATTERN = Pattern.compile("[A-Z]");

  /**
   * 对象转map,(暂不支持迭代)
   *
   * @param obj
   * @return
   * @throws Exception
   */
  public static Map toMap(Object obj) throws Exception {
    Class clazz = obj.getClass();
    //获得属性
    Field[] fields = obj.getClass().getDeclaredFields();
    HashMap hashMap = new HashMap(fields.length);
    for (Field field : fields) {
      PropertyDescriptor pd = new PropertyDescriptor(field.getName(), clazz);
      //获得get方法
      Method getMethod = pd.getReadMethod();
      //执行get方法返回一个Object
      Object exeValue = getMethod.invoke(obj);
      String key = field.getName();
      //TODO 可以优化,加入迭代
      Class<?> fieldClass = field.getType();
      log.debug("----{}类型:{}", key, fieldClass);
      switch (fieldClass.getSimpleName()) {
        case "int":
          log.debug("--基础类型int");
          break;
        case "String":
          break;
        default:
          break;
      }
      hashMap.put(key, exeValue);
    }
    return hashMap;
  }


  public static Map<String, Object> objectToMap(Object obj) throws Exception {
    if (obj == null) {
      return null;
    }

    Map<String, Object> map = new HashMap<String, Object>();

    Field[] declaredFields = obj.getClass().getDeclaredFields();
    for (Field field : declaredFields) {
      field.setAccessible(true);
      map.put(field.getName(), field.get(obj));
    }

    return map;
  }


  /**
   * 根据传入的带下划线的字符串转化为驼峰格式
   *
   * @param str
   * @return
   */
  public static String underlineToHump(String str) {

    //正则匹配下划线及后一个字符,删除下划线并将匹配的字符转成大写
    Matcher matcher = UNDERLINE_PATTERN.matcher(str);
    StringBuffer sb = new StringBuffer(str);
    if (matcher.find()) {
      sb = new StringBuffer();
      //将当前匹配的子串替换成指定字符串,并且将替换后的子串及之前到上次匹配的子串之后的字符串添加到StringBuffer对象中
      //正则之前的字符和被替换的字符
      matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
      //把之后的字符串也添加到StringBuffer对象中
      matcher.appendTail(sb);
    } else {
      //去除除字母之外的前面带的下划线
      return sb.toString().replaceAll("_", "");
    }
    return underlineToHump(sb.toString());
  }

  /**
   * 驼峰转下划线,最后转为大写
   *
   * @param str
   * @return
   */
  public static String humpToLine(String str) {
    Matcher matcher = HUMP_PATTERN.matcher(str);
    StringBuffer sb = new StringBuffer();
    while (matcher.find()) {
      matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
    }
    matcher.appendTail(sb);
    return sb.toString();
  }

  public static JSONObject getJsonObjData(ResultSet resultSet, Class clazz) {
    JSONObject jsonObj = new JSONObject();
    Field[] fields = ReflectUtil.getFields(clazz);
    if (ArrayUtil.isNotEmpty(fields)) {

      Arrays.stream(fields).forEach(c -> {
        String name = c.getName();
        String lineName = RefUtil.humpToLine(name);
        try {
          jsonObj.putOpt(name, resultSet.getObject(lineName));
        } catch (Exception e) {
          log.error("查询结果没有字段{}或转换异常,原因:{}", lineName, e.getMessage());
        }

      });
    }
    return jsonObj;
  }


  /**
   * 利用反射组织插入语句
   *
   * @param insertStartSql 插入语句前半部分
   * @param valueSql       插入语句值部分
   * @param dataJson       数据
   * @param clazz          数据对象class
   * @return 插入语句
   */
  public static String initInsertSql(String insertStartSql, String valueSql, JSONObject dataJson,
      Class clazz) {
    String sql = null;
    Field[] fields = ReflectUtil.getFields(clazz);
    if (ArrayUtil.isNotEmpty(fields)) {
      StringBuffer insertBeforeSbf = new StringBuffer(insertStartSql);

      StringBuffer valueSbf = new StringBuffer(valueSql);

      int len = fields.length;
      for (int i = 0; i < len; i++) {
        Field field = fields[i];
        String name = field.getName();

        if (name.equals("serialVersionUID") || name.equals("entityClass")) {
          break;
        }

        String lineName = RefUtil.humpToLine(name);

        Object valObj = dataJson.get(name);

        if (valObj != null) {
          insertBeforeSbf.append(lineName).append(",");
          Class type = field.getType();
          String typeName = type.getSimpleName();
          switch (typeName) {
            case "String":
              valueSbf.append("'").append(valObj).append("'");
              break;
            case "Byte":
              valueSbf.append("'").append(valObj).append("'");
              break;
            case "Integer":
              valueSbf.append(valObj);
              break;
            case "Date":
              if (valObj.toString().contains("-") && valObj.toString().contains(":")) {
                valueSbf.append("'").append(valObj.toString()).append("'");
              } else {
                Date date = null;
                if (NumberUtil.isNumber(valObj.toString())) {
                  date = new Date(Long.parseLong(valObj.toString()));
                } else {
                  try {
                    date = new Date(Long.parseLong(valObj.toString()));
                  } catch (Exception e) {
                    date = DateUtil.parseCST(valObj.toString());
                  }
                }
                valueSbf.append("'").append(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss"))
                    .append("'");
              }
              break;
            default:
              valueSbf.append(valObj);
              break;
          }
          valueSbf.append(",");
        }
      }
      String finValueSql = valueSbf.toString();
      if (finValueSql.endsWith(",")) {
        finValueSql = finValueSql.substring(0, finValueSql.length() - 1);
      }
      String insertSql = insertBeforeSbf.toString();
      if (insertSql.endsWith(",")) {
        insertSql = insertSql.substring(0, insertSql.length() - 1);
      }
      sql = insertSql + finValueSql + ");";
    }
    return sql;
  }
}

flinkTask


import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 历史记录同步计算task任务
 *
 * @author zhengwen
 **/
public class HisRecordSyncTask {

  public static void main(String[] args) {

    //获取运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //开启 Checkpoint,每隔 5 秒钟做一次 CK
    env.enableCheckpointing(5000L);
    //指定 CK 的一致性语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    //设置任务关闭的时候保留最后一次 CK 数据
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //重试策略设置
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));

    //source
    DataStream<FkRegisterVisitor> source = env.addSource(
        new RegisterVisitorJdbcSourceHis(), "history source");

    //旁路输出流
    final OutputTag<JgFkRegisterVisitorRecord> outputTag = new OutputTag<JgFkRegisterVisitorRecord>(
        "side-output") {
    };
    SingleOutputStreamOperator<FkRegisterVisitor> mainDataStream = source
        .process(new ProcessFunction<FkRegisterVisitor, FkRegisterVisitor>() {
          @Override
          public void processElement(FkRegisterVisitor oldValue,
              Context context,
              Collector<FkRegisterVisitor> collector) throws Exception {
            // 发送数据到主要的输出
            collector.collect(oldValue);

            // 发送数据到旁路输出
            JgFkRegisterVisitorRecord jgFkRegisterVisitorRecord = new JgFkRegisterVisitorRecord();
            BeanUtil.copyProperties(oldValue, jgFkRegisterVisitorRecord,
                CopyOptions.create().setIgnoreNullValue(Boolean.TRUE));
            context.output(outputTag, jgFkRegisterVisitorRecord);
          }
        });
    //获取旁路输出流
    DataStream<JgFkRegisterVisitorRecord> sideOutputStream = mainDataStream.getSideOutput(
        outputTag);

    //数据转存
    sideOutputStream.addSink(new VisitorRecordStorageMysqlSink())
        .name("RecordStorageMysqlSink");

    //批处理计算
    DataStream<JgFkVisitorStatisticsInfo> sumStream =
        mainDataStream.map(new VisitorCountRichMapFunction()).keyBy(
                new KeySelector<Tuple2<String, JgFkVisitorStatisticsInfo>, String>() {
                  @Override
                  public String getKey(
                      Tuple2<String, JgFkVisitorStatisticsInfo> tuple2)
                      throws Exception {
                    return tuple2.f0;
                  }
                }).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .process(new TotalVisitorCountFunction());

    sumStream.addSink(new VisitorStatisticsMysqlSink()).name("StatisticsMysqlSink");

    try {
      env.execute("历史记录同步计算");
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }


}

RegisterVisitorJdbcSourceHis:


import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import redis.clients.jedis.Jedis;

/**
 * 记录mysql source
 *
 * @author zhengwen
 **/
@Slf4j
public class RegisterVisitorJdbcSourceHis extends RichSourceFunction<FkRegisterVisitor> {

  private PreparedStatement ps;
  private Connection connection;

  private volatile Boolean isRunning = true;

  private Date lastTime = null;

  private Date curTime = null;

  private String startTimeKey = "StartTimeHis";

  private String endTimeKey = "EndTimeHis";

  private final static String sql = "select frv.* from 源业务表 frv where  frv.create_time >= '#START_TIME#' and frv.create_time <= '#END_TIME#';";


  private int cycleMinute = 1;

  private int stepMinute = 15;
  private long sleepTime = cycleMinute * 60 * 1000;

  private String endTimeStr = "2019-12-31 23:59:59";

  private String startTimeStr = "2019-05-18 13:30:00";

  @Override
  public void run(SourceContext<FkRegisterVisitor> sourceContext) throws Exception {

    while (isRunning) {

      changeSqlParam();
      //查询数据
      ResultSet resultSet = ps.executeQuery();
      while (resultSet.next()) {
        //结果集转json
        JSONObject jsonObj = RefUtil.getJsonObjData(resultSet, FkRegisterVisitor.class);

        //json转实体
        FkRegisterVisitor fkRegisterVisitor = JSONUtil.toBean(jsonObj, FkRegisterVisitor.class);

        //数据补充与清理
        fkRegisterVisitor.setSourceId(fkRegisterVisitor.getId());
        fkRegisterVisitor.setCollectDate(curTime);
        fkRegisterVisitor.setCollectStartTime(lastTime);
        fkRegisterVisitor.setCollectEndTime(curTime);
        fkRegisterVisitor.setId(null);

        //2个sink,一个数据存储,一个计算

        sourceContext.collect(fkRegisterVisitor);
      }

      //休息
      Thread.sleep(sleepTime);
    }
  }

  @Override
  public void cancel() {
    this.isRunning = false;
  }


  /**
   * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
   *
   * @param parameters
   * @throws Exception
   */
  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    connection = getConnection();

    //changeSqlParam();

  }

  /**
   * 修改sql:过滤数据时间段
   *
   * @throws Exception
   */
  private void changeSqlParam() throws Exception {
    //时间格式
    String ymdHmsFormat = "yyyy-MM-dd HH:mm:ss";

    Jedis jedis = RedisUtil.getJedis(DbInfo.TARGET_REDIS.getJdbcUrl(),
        DbInfo.TARGET_REDIS.getPort(), DbInfo.TARGET_REDIS.getUsername(),
        DbInfo.TARGET_REDIS.getPassword());
    String startTimeCacheStr = RedisUtil.getString(jedis, startTimeKey);
    if (StringUtils.isBlank(startTimeCacheStr)) {
      startTimeCacheStr = startTimeStr;
    }

    Date startTime = DateUtil.parse(startTimeCacheStr,
        DatePattern.NORM_DATETIME_MINUTE_FORMAT.getPattern());
    Date endTime = DateUtil.offsetMinute(startTime, stepMinute);

    //历史时间同步结束时间点处理
    String endTimeStr = RedisUtil.getString(jedis, endTimeKey);
    if (StringUtils.isNotBlank(endTimeStr)) {
      Date planEndTime = DateUtil.parseDate(endTimeStr);
      if (planEndTime.before(endTime)) {
        endTime = planEndTime;
      }
    }

    if (lastTime == null) {
      lastTime = startTime;
      curTime = endTime;
    } else {
      lastTime = curTime;
      curTime = DateUtil.offsetMinute(lastTime, stepMinute);
    }
    //缓存下次开时间
    RedisUtil.setString(jedis, startTimeKey, DateUtil.formatDateTime(lastTime));

    //结束时间
    RedisUtil.setString(jedis, endTimeKey,
        DateUtil.formatDateTime(DateUtil.endOfSecond(new Date())));

    jedis.close();

    //组织查询条件
    String curSql = sql.replaceAll("#START_TIME#", DateUtil.format(lastTime, ymdHmsFormat))
        .replaceAll("#END_TIME#", DateUtil.format(curTime, ymdHmsFormat));
    System.out.println("---Sql:" + curSql);

    // 编写具体逻辑代码
    if (this.connection == null) {
      this.connection = getConnection();
    }
    ps = this.connection.prepareStatement(curSql);
  }

  /**
   * 程序执行完毕就可以进行,关闭连接和释放资源的动作了
   *
   * @throws Exception 异常
   */
  @Override
  public void close() throws Exception {
    super.close();
    if (connection != null) {
      //关闭连接和释放资源
      connection.close();
    }
    if (ps != null) {
      ps.close();
    }
  }

  /**
   * 获取数据库连接
   *
   * @return
   */
  private static Connection getConnection() {
    Connection con = null;
    try {
      Class.forName(DbInfo.SOURCE_JG_VISITOR_MJ.getClassName());
      con = DriverManager.getConnection(DbInfo.SOURCE_JG_VISITOR_MJ.getJdbcUrl(),
          DbInfo.SOURCE_JG_VISITOR_MJ.getUsername(), DbInfo.SOURCE_JG_VISITOR_MJ.getPassword());
    } catch (Exception e) {
      System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
    }
    return con;
  }


}

VisitorRecordStorageMysqlSink


import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * 记录mysql sink
 *
 * @author zhengwen
 **/
public class VisitorRecordStorageMysqlSink extends RichSinkFunction<JgFkRegisterVisitorRecord> {

  private PreparedStatement ps;
  private Connection connection;


  private final static String insertStartSql = "INSERT INTO 目标记录表 (";

  private final static String valueSql = ") VALUES( ";


  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    connection = getConnection();
    /*String sql = "insert into test_db.test_csv (col_1,col_2,col_3,col_4)" +
        "values (?,?,?,'what');";
    ps = connection.prepareStatement(sql);*/
  }

  @Override
  public void close() throws Exception {
    super.close();
    if (connection != null) {
      connection.close();
    }
    if (ps != null) {
      ps.close();
    }
  }

  @Override
  public void invoke(JgFkRegisterVisitorRecord record, Context context) throws Exception {
    super.invoke(record, context);

    //Long generatedKey = SqlExecutor.executeForGeneratedKey(connection, "insert " + TABLE_NAME + " set field1 = ? where id = ?", 0, 0);
    //log.info("主键:{}", generatedKey);

    //设置存储时间
    record.setStorageDate(DateUtil.date());

    //组织sql,利用反射
    JSONObject dataJson = JSONUtil.parseObj(record);
    String sql = RefUtil.initInsertSql(insertStartSql, valueSql, dataJson, record.getClass());

    if (connection == null) {
      connection = getConnection();
    }

    ps = connection.prepareStatement(sql);
    ps.executeUpdate();
  }


  /**
   * 获取mysql数据库连接
   *
   * @return
   */
  private static Connection getConnection() {
    Connection con = null;
    try {
      Class.forName(DbInfo.TARGET_JG_MYSQL.getClassName());
      con = DriverManager.getConnection(DbInfo.TARGET_JG_MYSQL.getJdbcUrl(),
          DbInfo.TARGET_JG_MYSQL.getUsername(), DbInfo.TARGET_JG_MYSQL.getPassword());
    } catch (Exception e) {
      System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
    }
    return con;
  }

}

VisitorCountRichMapFunction


import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

/**
 * @author zhengwen
 **/
public class VisitorCountRichMapFunction extends
    RichMapFunction<FkRegisterVisitor, Tuple2<String, JgFkVisitorStatisticsInfo>> {


  @Override
  public Tuple2<String, JgFkVisitorStatisticsInfo> map(FkRegisterVisitor fkRegisterVisitor)
      throws Exception {
    JgFkVisitorStatisticsInfo jgFkVisitorStatisticsInfo = new JgFkVisitorStatisticsInfo();
    //统计类型直接设置为整栋
    jgFkVisitorStatisticsInfo.setStatisticsType(1);
    jgFkVisitorStatisticsInfo.setCollectDate(fkRegisterVisitor.getCollectDate());
    jgFkVisitorStatisticsInfo.setCollectStartTime(fkRegisterVisitor.getCollectStartTime());
    jgFkVisitorStatisticsInfo.setCollectEndTime(fkRegisterVisitor.getCollectEndTime());
    jgFkVisitorStatisticsInfo.setStatisticsNum(1);

    String key = fkRegisterVisitor.getCollectDate().toString();

    return new Tuple2<>(key, jgFkVisitorStatisticsInfo);
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
  }

  @Override
  public void close() throws Exception {
    super.close();
  }
}

TotalVisitorCountFunction


import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import java.util.Iterator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author zhengwen
 **/
public class TotalVisitorCountFunction extends
    ProcessWindowFunction<Tuple2<String, JgFkVisitorStatisticsInfo>, JgFkVisitorStatisticsInfo, String, TimeWindow> {

  /**
   * 使用计数器
   */
  private IntCounter visitorNum = new IntCounter();

  /**
   *   定义一个状态,保存当前的总count值
   */
  private ValueState<Long> totalCountState;
  /**
   * 定义一个状态,保存当前的统计key
   */
  private ValueState<String> collectTimeStr;

  @Override
  public void process(String key,
      Context context,
      Iterable<Tuple2<String, JgFkVisitorStatisticsInfo>> iterable,
      Collector<JgFkVisitorStatisticsInfo> out) throws Exception {

    JgFkVisitorStatisticsInfo vsi = new JgFkVisitorStatisticsInfo();
    Iterator<Tuple2<String, JgFkVisitorStatisticsInfo>> it = iterable.iterator();
    while (it.hasNext()) {
      visitorNum.add(1);
      Tuple2<String, JgFkVisitorStatisticsInfo> tuple2 = it.next();
      String tpKey = tuple2.f0;
      if (tpKey.equals(key)) {
        JgFkVisitorStatisticsInfo tmpVsi = tuple2.f1;

        BeanUtil.copyProperties(tmpVsi, vsi,
            CopyOptions.create().setIgnoreNullValue(Boolean.TRUE));
        if (totalCountState.value() == null){
          totalCountState.update(0L);
        }
        Long tmpCount = totalCountState.value() + 1;
        totalCountState.update(tmpCount);

        vsi.setStatisticsNum(visitorNum.getLocalValue());
        //vsi.setStatisticsNum(visitorNum.getLocalValue());
      } else {
        totalCountState.clear();
        collectTimeStr.clear();
        //totalCountState.update(0L);
        //collectTimeStr.update(key);
      }

    }
    visitorNum.resetLocal();

    //返回流
    out.collect(vsi);

    //更新总数、当期统计key
    totalCountState.update(vsi.getStatisticsNum().longValue());
    collectTimeStr.update(key);
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    //注册计数器
    getRuntimeContext().addAccumulator("visitorNum", this.visitorNum);
    super.open(parameters);
    totalCountState = getRuntimeContext().getState(
        new ValueStateDescriptor<>("totalCountState", Long.class));
    collectTimeStr = getRuntimeContext().getState(
        new ValueStateDescriptor<>("collectTimeStr", String.class));
  }

  @Override
  public void close() throws Exception {
    super.close();

  }
}

这里要说下,其实这个用计数器也行,不过用Value会更好,重启也不会丢失。
VisitorStatisticsMysqlSink


import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Date;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * 统计to mysql sink
 *
 * @author zhengwen
 **/
public class VisitorStatisticsMysqlSink extends RichSinkFunction<JgFkVisitorStatisticsInfo> {


  private PreparedStatement ps;
  private Connection connection;


  private final static String insertStartSql = "INSERT INTO 目标统计表 (";

  private final static String valueSql = ") VALUES( ";


  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    connection = getConnection();

  }

  @Override
  public void close() throws Exception {
    super.close();
    if (connection != null) {
      connection.close();
    }
    if (ps != null) {
      ps.close();
    }
  }

  @Override
  public void invoke(JgFkVisitorStatisticsInfo statisticsInfo, Context context) throws Exception {
    super.invoke(statisticsInfo, context);

    //Long generatedKey = SqlExecutor.executeForGeneratedKey(connection, "insert " + TABLE_NAME + " set field1 = ? where id = ?", 0, 0);
    //log.info("主键:{}", generatedKey);

    //设置存储时间
    statisticsInfo.setStorageDate(DateUtil.date());

    //组织sql,利用反射
    JSONObject dataJson = JSONUtil.parseObj(statisticsInfo);
    String sql = RefUtil.initInsertSql(insertStartSql, valueSql, dataJson,
        statisticsInfo.getClass());

    if (connection == null) {
      connection = getConnection();
    }

    ps = connection.prepareStatement(sql);
    ps.executeUpdate();
  }


  /**
   * 获取mysql jdbc连接
   *
   * @return
   */
  private static Connection getConnection() {
    Connection con = null;
    try {
      Class.forName(DbInfo.TARGET_JG_MYSQL.getClassName());
      con = DriverManager.getConnection(DbInfo.TARGET_JG_MYSQL.getJdbcUrl(),
          DbInfo.TARGET_JG_MYSQL.getUsername(), DbInfo.TARGET_JG_MYSQL.getPassword());
    } catch (Exception e) {
      System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
    }
    return con;
  }

}

???????整流程的能分享的都分享了,主结构,source、function、sink,都分享了。里面的注释也写的还算多。几乎是保姆级别的一个Task梳理了,当然里面还有很多奇淫巧计还没有用,后期我有机会跟大家再分享。


五、经典报错

  • The last packet successfully received from the server was 301,032 milliseconds ago. The last packet sent successfully to the server was 0 milliseconds ago
    在这里插入图片描述
    这个问题跟数据库的wait_time有一定关系,这里有重启策略,有这个错误也不怕。如果有条件改数据库配置,可以改改试试。
  • 通过Flink管理端上传启动提示在这里插入图片描述
    注意在总览检查下有没有槽位、在TaskManagers看看有没有Task管理实例。

总结

???????没啥可总结的,感觉就是香,用了这个感觉用sql做统计都不香了。
???????过程中也遇到一点坑,就差一步就ok的,坑的时间有点长,怕影响进度,同事也没遇到过,最后还问了下部门领导。不得不说领导还是强啊,年龄比我大,感觉学习能力、接受新事物还是棒棒哒啊。
???????时间也不早了,最近都是下班搞会,提加班太麻烦,还总被人事打回来,还不如下班了总结下近日所得,写出来也是一次对运用的巩固与梳理,就分享到这里。Up!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:02:54  更:2022-05-21 19:04:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:44:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码