IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 任务处理基类 -> 正文阅读

[大数据]任务处理基类

/**
?* 定义所有task作业的父类,在父类中实现公共的代码
?* 加载配置文件内容到ParameterTool对象中
?* 1)flink流处理环境的初始化
?* 2)flink接入kafka数据源消费数据
?*/
public abstract class BaseTask {
????//定义parameterTool工具类
????public static ParameterTool parameterTool;
????public static String appName;

????/**
?????* 定义静态代码块,加载配置文件数据到ParameterTool对象中
?????*/
????static {
????????try {
????????????parameterTool = ParameterTool.fromPropertiesFile(BaseTask.class.getClassLoader().getResourceAsStream("conf.properties"));
????????} catch (IOException e) {
????????????e.printStackTrace();
????????}
????}

????//todo 1)初始化flink流式处理的开发环境
????private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

????/**
?????* TODO 1)flink任务的初始化方法
?????* @return
?????*/
????public static StreamExecutionEnvironment getEnv(String className){
????????System.setProperty("HADOOP_USER_NAME", "root");
????????//设置全局的参数(使用的时候可以直接用法:getRuntimeContext())
????????env.getConfig().setGlobalJobParameters(parameterTool);
????????//todo ?2)按照事件时间处理数据(terminalTimeStamp)进行窗口的划分和水印的添加
????????env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
????????//为了后续进行测试方便,将并行度设置为1,在生产环境一定不要设置代码级别的并行度,可以设置client级别的并行度
????????env.setParallelism(1);

????????//todo 3)开启checkpoint
????????// ?3.1:设置每隔30s周期性开启checkpoint
????????env.enableCheckpointing(30*1000);
????????// ?3.2:设置检查点的model、exactly-once、保证数据一次性语义
????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
????????// ?3.3:设置两次checkpoint的时间间隔,避免两次间隔太近导致频繁的checkpoint,而出现业务处理能力下降
????????env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20 * 1000);
????????// ?3.4:设置checkpoint的超时时间
????????env.getCheckpointConfig().setCheckpointTimeout(20*1000);
????????// ?3.5:设置checkpoint的最大尝试次数,同一个时间有几个checkpoint在运行
????????env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
????????// ?3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除checkpoint
????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
????????// ?3.7:设置执行job过程中,保存检查点错误时,job不失败
????????env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
????????// ?3.8:设置检查点的存储位置,使用rocketDBStateBackend,存储本地+hdfs分布式文件系统,可以进行增量检查点
????????String hdfsBasePath = parameterTool.getRequired("hdfsUri");
????????try {
????????????env.setStateBackend(new RocksDBStateBackend(hdfsBasePath+"/flink/checkpoint/"+ className));
????????} catch (IOException e) {
????????????e.printStackTrace();
????????}
????????//todo ?4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
????????//todo ?4.1:如果开启了checkpoint,默认不停的重启,没有开启checkpoint,无重启策略
????????env.setRestartStrategy(RestartStrategies.fallBackRestart());

????????appName = className;
????????//返回env对象
????????return env;
????}

????/**
?????* TODO 2)flink接入kafka数据源消费数据
?????* @param clazz
?????* @param <T>
?????* @return
?????*/
????public static <T> DataStream<T> createKafkaStream(Class<? extends DeserializationSchema> clazz) throws IllegalAccessException, InstantiationException {
????????//todo ?5)创建flink消费kafka数据的对象,指定kafka的参数信息
????????Properties props = new Properties();
????????// 5.1:设置kafka的集群地址
????????props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.getRequired("bootstrap.servers"));
????????// 5.2:设置消费者组id
????????props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.getRequired("kafka.group.id")+appName);
????????// 5.3:设置kafka的分区感知(动态感知)
????????props.setProperty("flink.partition-discovery.interval-millis", "30000");
????????// 5.4:设置key和value的反序列化(可选)
????????// 5.5:设置是否自动递交offset
????????props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("enable.auto.reset", "earliest"));
????????// 5.6:创建kafka的消费者实例
????????FlinkKafkaConsumer011<T> kafkaConsumer011= new FlinkKafkaConsumer011<T>(
????????????????parameterTool.getRequired("kafka.topic"),
????????????????clazz.newInstance(),
????????????????props
????????);
????????// 5.7:设置自动递交offset保存到检查点
????????kafkaConsumer011.setCommitOffsetsOnCheckpoints(true);
????????//todo ?6)将kafka消费者对象添加到环境中
????????DataStreamSource streamSource = env.addSource(kafkaConsumer011);
????????//返回消费到的数据
????????return streamSource;
????}
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-22 11:00:15  更:2021-10-22 11:00:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 5:01:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码