IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Flink_06_ProcessAPI(个人总结) -> 正文阅读

[网络协议]Flink_06_ProcessAPI(个人总结)

????声明: 1. 本文为我的个人复习总结, 并那种从零基础开始普及知识?内容详细全面, 言辞官方的文章
??????????????2. 由于是个人总结, 所以用最精简的话语来写文章
??????????????3. 若有错误不当之处, 请指出

侧输出流(SideOutput)

即分支流, 可以用来接收迟到数据, 也可以用来将数据分类成多个支流

对于滑动窗口, 有很多窗口重叠, 当迟到数据被所有窗口都不接收时, 它才会进入侧输出流

只有Process这种最底层的API, 才能通过环境上下文去使用侧输出流

案例: 将温度值低于30度的数据输出到 SideOutput

// 定义侧输出流标签, 注意得是其匿名实现类
// 侧输出流标签必须是其子类才行, 不能直接使用OutputTagb
final OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp") { };

SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>( ) {
    @Override
    public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) {
        if (value.getTemperature( ) < 30) {
            ctx.output(lowTempTag, value);
        } else {
            out.collect(value);
        }
    }
});
DataStream<SensorReading> lowTempStream = highTempStream.getSideOutput(lowTempTag);
highTempStream.print("high");
lowTempStream.print("low");

8种ProcessAPI:

  1. ProcessFunction

  2. KeyedProcessFunction

    得先keyBy,

    会处理流的每一个元素, 以out.collect(xxx)的方式输出任意多个元素

    • ·processElement(I value, Context ctx, Collector<O> out)

      ctx 可以

      1. 访问元素的时间戳

      2. 访问元素的key

      3. 访问TimerService(ctx.timerService( ))

        TimerService:

        方法:

        1. EventTime相关
          • long currentWatermark( ) 返回当前数据的事件时间
          • void registerEventTimeTimer(long timestamp) 注册当前key的定时器
          • void deleteEventTimeTimer(long timestamp) 删除定时器, 如果没有则不执行
        2. ProcessingTime相关
          • long currentProcessingTime( ) 返回当前数据的处理时间
          • void registerProcessingTimeTimer(long timestamp) 注册当前key的定时器
          • void deleteProcessingTimeTimer(long timestamp) 删除定时器, 如果没有则不执行
        • 当定时器Timer触发后, 会执行回调函数onTimer( )

        • 若注册窗口关闭时启动的定时器, 最好在WindowEndTime的基础上延迟1s;

          因为到了临界点, 既要触发窗口计算, 又要触发定时器;

          定时器任务又依赖于先窗口计算完毕, 所以给个1s的延迟较好

        案例需求: 如果温度值在10秒钟之内(ProcessingTime)连续上升, 则报警

        public class TempIncreaseWarning extends KeyedProcessFunction<String, SensorReading, String> {
            private Integer interval;
        
            public TempIncreaseWarning(Integer interval) {
                this.interval = interval;
            }
        
            // 记录 上一次温度
            private ValueState<Double> lastTempState;
            // 记录 定时器触发时间
            private ValueState<Long> timerTsState;
        
            @Override
            public void open(Configuration parameters) throws Exception {
                lastTempState = getRuntimeContext( ).getState(new ValueStateDescriptor<Double>("last-temp", Double.class, Double.MIN_VALUE));
                timerTsState = getRuntimeContext( ).getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
            }
        
        
            @Override
            public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
                // 取出状态
                Double lastTemp = lastTempState.value( );
                Long timerTs = timerTsState.value( );
        
                // 更新温度状态
                lastTempState.update(value.getTemperature( ));
                // 每当温度上升时 && 暂无定时器
                if (value.getTemperature( ) > lastTemp && timerTs == null) {
                    long ts = ctx.timerService( ).currentProcessingTime( ) + interval * 1000L;
                    // 注册定时器
                    ctx.timerService( ).registerProcessingTimeTimer(ts);
                    // 为了后续删除定时器能找到注册时间戳
                    timerTsState.update(ts);
                }
                // 每当温度上升时 && 定时器为空
                else if (value.getTemperature( ) <= lastTemp && timerTs != null) {
                    // 清除定时器,注意不能用ts,我们要找的是注册定时器的那个时间戳才对
                    ctx.timerService( ).deleteProcessingTimeTimer(timerTs);
                    timerTsState.clear( );
                }
            }
        
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
                out.collect("传感器" + ctx.getCurrentKey( ) + "的温度连续" + interval + "秒上升");
                timerTsState.clear( );
            }
        }
        
      4. 将数据输出到侧输出流

    • .onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 是一个回调函数,当之前注册的定时器触发时调用。

    ? timestamp 是定时器所设定的触发运行的时间戳

    ? 如果注册一个已经过期的时间, 那么当再次输入数据时, 它才会触发定时器

  3. CoProcessFunction

    connect后的流再.process

    有processElement1( ) 和 processElement2( )

  4. ProcessJoinFunction

  5. BroadcastProcessFunction

    A流有1个分区, B流有4个分区, B流要用到A流的数据, 所以需要将A流1个分区的数据广播到B流的4个分区

    广播后再进行process处理

  6. KeyedBroadcastProcessFunction

  7. ProcessWindowFunction

    如 .aggregate(AggregateFunction<IN, ACC, OUT>aggFunction,ProcessWindowFunction<IN, OUT, KEY, W> windowFunction)

  8. ProcessAllWindowFunction

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-02-26 12:07:35  更:2022-02-26 12:08:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 19:59:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码