IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink入门第十课:借助ProcessFunction api实现定时器小案例和侧输出流小案例 -> 正文阅读

[大数据]Flink入门第十课:借助ProcessFunction api实现定时器小案例和侧输出流小案例

?1、ProcessFunction api实现定时器

package com.atguigu.FProcessFunctionApi_api;

import com.atguigu.Zbeans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 本类借助于processfunction api的定时器功能完成一个"监控温度连续十秒上升就报警"的小案例。
 *
 * 补充ProcessFunction api的基础知识:
 *  ProessFunction api常用来分流、获取时间戳、watermark和注册定时任务。
 *  用一个DataStream对象调用process方法,方法内部传入对应的匿名ProcessFunction类即可。
 *  ProcessFunction类都继承了富函数,都可以访问上下文。
 *  ProcessFunction类可细分为八个子类:
 *          ProcessFunction、KeyedProcessFunction、CoProcessFunction、
 *          BroadcastProcessFunction、KeyedBroadcastProcessFunction、
 *          ProcessWindowFunciton、ProcessAllWindowFunciton。
 *  其中KeyedProcessFunction用的最多,以下介绍大概用法:
 *      datastream.process(new KeyedProcessFunction(){必须实现processElement方法})
 *      在processElement中可以获取到上下文、注册定时任务、开启侧道输出等。
 *      KeyedProcessFunction继承了富函数,所以也可以访问到上下文。
 *  关于定时器:
 *     注册:context.timerService().registerProcessingTimeTimer(ts);   在processElement中注册定时器,传入处理/事件时间戳。
 *     删除:context.timerService().deleteEventTimeTimer(timerTS);   timerTSState.clear();
 *          定时器等待触发过程中不再满足触发条件,应删除定时器并清空定时器状态。定时器触发完毕,应清空定时器状态。
 *    状态:定时器的状态保存的是设置定时器时的时间戳,使用ValueState保存下来,一可以避免后续删除定时器无法获取到时间戳的问题,
 *          二可以通过状态的更新来设置新的定时器。
 *    onTimer方法:可以设置定时器触发时输出的信息,并且在这个方法中应清楚定时器的状态,因为定时器已经触发。
 *    open方法:完成对于状态的初始化。
 *    close方法:对一些状态的关闭、资源的清理。
 */

public class AProcessFunctionTest {
    public static void main(String[] args) throws Exception {
        //创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据并包装成pojo
        DataStreamSource<String> inputStream = env. socketTextStream("localhost",7777);
        DataStream<SensorReading> mapStream = inputStream.map(line -> {
            String[] splits = line.split(",");
            return new SensorReading(new String(splits[0]), new Long(splits[1]), new Double(splits[2]));
        });
        //处理
        mapStream.keyBy(SensorReading::getId)
                .process(new MyKeyedProcessFunction(10)) //传入定时器触发时间
                .print();
        //执行
        env.execute("测试ProcessFunction api 操作");
    }

    /**
     * 泛型一:分组后key的类型   泛型二:输入类型  泛型三:输出类型
     * KeyedProcessFunction必须重写processElement方法
     *
     */
    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {
        private Integer delayTime; //定时器触发间隔
        private ValueState<Double> lastTempState;//上一次的温度
        private ValueState<Long> timerTSState;//定时器时间戳

        public MyKeyedProcessFunction(Integer delayTime) { //接收定时器触发时间
            this.delayTime = delayTime;
        }

        /**
         * 第一个执行的方法,富函数初始化
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            //给lastTemp赋值,第一条数据来临之前初值为null
            lastTempState =getRuntimeContext().getState(new ValueStateDescriptor<Double>("last_temp",Double.class));
            //给timerTimeStamp赋值,第一条数据来临之前初值为null
            timerTSState =getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer_timestamp",Long.class));

        }

        @Override
        public void processElement(SensorReading sen, Context context, Collector<String> collector) throws Exception {
            //取出状态的值
            Double lastTemp = lastTempState.value();
            Long timerTS = timerTSState.value();

            /*
                如果是第一条数据,注册定时器
                如果不是第一条数据,但
                    温度上升且没有定时器的时候,则设置10秒后的定时器,开始等待
             */
            if((lastTemp == null) || (sen.getTemperature() >= lastTemp && timerTS == null )){
                //注册定时器并更新定时器状态
                long ts = context.timerService().currentProcessingTime() + delayTime * 1000L;
                context.timerService().registerProcessingTimeTimer(ts);
                timerTSState.update(ts);
            }


            //如果温度下降并且有定时器的时候,删除定时器并且清空状态
            if((lastTemp != null) && (sen.getTemperature() < lastTemp) && (timerTS != null)){
                context.timerService().deleteEventTimeTimer(timerTS);
                timerTSState.clear();
            }

            //更新温度状态
            lastTempState.update(sen.getTemperature());

//            collector.collect(sen.getId());
//            //context可以获取到的信息
//            context.timestamp(); //当前的时间戳
//            context.getCurrentKey();//当前key
//            //context.output(new OutputTag<SensorReading>(sen.getId()));
//            context.timerService().currentProcessingTime();//获取处理时间
//            context.timerService().currentWatermark();//获取事件时间
//            context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+delayTime*1000L);//按处理时间设置定时器
//            context.timerService().registerEventTimeTimer((sen.getTimestamp()+10)*1000);//按事件时间注册定时器
//            context.timerService().deleteProcessingTimeTimer(设置定时器用的时间戳);//删除定时器
//            context.timerService().deleteEventTimeTimer((设置定时器用的时间戳));//删除定时器
        }

        /**
         *配置定时器触发时输出的信息或者规则,同时清空定时器状态
         * @param timestamp  定时器触发的时间
         * @param ctx 上下文
         * @param out 输出收集器
         * @throws Exception
         */
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            out.collect("warning:传感器"+ ctx.getCurrentKey() + "温度值连续" + delayTime +"秒上升");
            timerTSState.clear();
        }

        /**
         * 清空温度
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

2、?ProcessFunction api实现侧输出流

package com.atguigu.FProcessFunctionApi_api;

import com.atguigu.Zbeans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 本类借助ProcessFunction来实现一个侧输出流
 *      1、初始化OutputTag对象的时候应初始化它的子类,防止泛型擦除。
 *      2、process操作后直接返回类型,便于使用SingleOutputStreamOperator类型调用getSideOutput方法。
 *      3、OutputTag对象和分类临界值应传入ProcessFunction中。
 */

public class BSideOutputTest {
    public static void main(String[] args) throws Exception {
        //创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据并包装成pojo
        DataStreamSource<String> inputStream = env. socketTextStream("localhost",7777);
        DataStream<SensorReading> mapStream = inputStream.map(line -> {
            String[] splits = line.split(",");
            return new SensorReading(new String(splits[0]), new Long(splits[1]), new Double(splits[2]));
        });
        /**
         * 定义outputtag,将低温流作为侧输出流,侧输出流必须传入ProcessFunction函数
         * outputTag定义为子类时为了防止泛型擦除
         */
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("low_temp") {};
        SingleOutputStreamOperator<SensorReading> highTempStream = mapStream.process(new MySideOutput<SensorReading, SensorReading>(outputTag, 40.0));

        //输出并执行
        highTempStream.print("不低于40.0的都是高温流");
        highTempStream.getSideOutput(outputTag).print("低于40.0的都是低温流");
        env.execute("ProcessFunction api实现侧道输出机制");
    }

    public static class MySideOutput<S, S1> extends ProcessFunction<SensorReading,SensorReading>{
        private OutputTag<SensorReading> outputTag; //侧输出流
        private Double temp;//temp为高温流低温流分界线

        public MySideOutput(OutputTag<SensorReading> outputTag, Double temp) {
            this.outputTag = outputTag;
            this.temp = temp;
        }

        @Override
        public void processElement(SensorReading sen, Context con, Collector<SensorReading> col) throws Exception {
            if(sen.getTemperature()>=temp)
                col.collect(sen);  //大于指定温度,正常输出
            else
                con.output(outputTag,sen); //小于指定温度,加入侧输出流输出
        }
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-20 15:11:21  更:2021-08-20 15:13:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:08:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码